kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4925: delay initial rebalance of consumer group
Date Thu, 04 May 2017 18:14:31 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 24e642342 -> ed59f742b


KAFKA-4925: delay initial rebalance of consumer group

Add new broker config, `group.initial.rebalance.delay.ms`, with a default of 3 seconds.
When a consumer creates a new group, set the group's state to InitialRebalance and delay the
rebalance until `min(group.initial.rebalance.delay.ms, rebalanceTimeout)`. As other members
join the group further delay the rebalance by `min(group.initial.rebalance.delay.ms, remainingRebalanceTimeout)`.
Once `rebalanceTimeout` is hit or no new members join the group within the delay, complete
the rebalance.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Ewen Cheslack-Postava, Guozhang Wang

Closes #2758 from dguy/kafka-4925


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ed59f742
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ed59f742
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ed59f742

Branch: refs/heads/trunk
Commit: ed59f742b0b2c7920444d56c44d0bc79dbc8a2ad
Parents: 24e6423
Author: Damian Guy <damian.guy@gmail.com>
Authored: Thu May 4 11:14:28 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu May 4 11:14:28 2017 -0700

----------------------------------------------------------------------
 .../apache/kafka/common/protocol/Protocol.java  |   2 +-
 .../kafka/coordinator/group/DelayedJoin.scala   |  41 +++++++-
 .../coordinator/group/GroupCoordinator.scala    |  31 ++++--
 .../kafka/coordinator/group/GroupMetadata.scala |   1 +
 .../main/scala/kafka/server/KafkaConfig.scala   |   5 +
 .../kafka/api/BaseConsumerTest.scala            |   1 +
 .../integration/kafka/api/BaseQuotaTest.scala   |   1 +
 .../kafka/api/ConsumerBounceTest.scala          |   1 +
 .../kafka/api/LegacyAdminClientTest.scala       |   1 +
 .../group/GroupCoordinatorResponseTest.scala    | 104 ++++++++++++++++---
 .../coordinator/group/GroupMetadataTest.scala   |  15 ++-
 .../unit/kafka/server/KafkaConfigTest.scala     |   1 +
 .../integration/utils/EmbeddedKafkaCluster.java |   1 +
 13 files changed, 183 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ed59f742/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 14471da..16ec9ea 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -785,7 +785,7 @@ public class Protocol {
                                                                                         
        STRING),
                                                                                        new
Field("state",
                                                                                         
        STRING,
-                                                                                        
        "The current state of the group (one of: Dead, Stable, AwaitingSync, or PreparingRebalance,
or empty if there is no active group)"),
+                                                                                        
        "The current state of the group (one of: Dead, Stable, AwaitingSync, PreparingRebalance,
or empty if there is no active group)"),
                                                                                        new
Field("protocol_type",
                                                                                         
        STRING,
                                                                                         
        "The current group protocol type (will be empty if there is no active group)"),

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed59f742/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala b/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
index 06a47da..6a81242 100644
--- a/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
+++ b/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
@@ -17,7 +17,9 @@
 
 package kafka.coordinator.group
 
-import kafka.server.DelayedOperation
+import kafka.server.{DelayedOperation, DelayedOperationPurgatory, GroupKey}
+
+import scala.math.{max, min}
 
 /**
  * Delayed rebalance operations that are added to the purgatory when group is preparing for
rebalance
@@ -41,3 +43,40 @@ private[group] class DelayedJoin(coordinator: GroupCoordinator,
   override def onExpiration() = coordinator.onExpireJoin()
   override def onComplete() = coordinator.onCompleteJoin(group)
 }
+
+/**
+  * Delayed rebalance operation that is added to the purgatory when a group is transitioning
from
+  * Empty to PreparingRebalance
+  *
+  * When onComplete is triggered we check if any new members have been added and if there
is still time remaining
+  * before the rebalance timeout. If both are true we then schedule a further delay. Otherwise
we complete the
+  * rebalance.
+  */
+private[group] class InitialDelayedJoin(coordinator: GroupCoordinator,
+                                        purgatory: DelayedOperationPurgatory[DelayedJoin],
+                                        group: GroupMetadata,
+                                        configuredRebalanceDelay: Int,
+                                        delayMs: Int,
+                                        remainingMs: Int) extends DelayedJoin(coordinator,
group, delayMs) {
+
+  override def tryComplete(): Boolean = false
+
+  override def onComplete(): Unit = {
+    group synchronized  {
+      if (group.newMemberAdded && remainingMs != 0) {
+        group.newMemberAdded = false
+        val delay = min(configuredRebalanceDelay, remainingMs)
+        val remaining = max(remainingMs - delayMs, 0)
+        purgatory.tryCompleteElseWatch(new InitialDelayedJoin(coordinator,
+          purgatory,
+          group,
+          configuredRebalanceDelay,
+          delay,
+          remaining
+        ), Seq(GroupKey(group.groupId)))
+      } else
+        super.onComplete()
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed59f742/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index e814717..f5b1a29 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetFetchResponse}
 import org.apache.kafka.common.utils.Time
 
 import scala.collection.{Map, Seq, immutable}
+import scala.math.max
 
 
 /**
@@ -153,7 +154,6 @@ class GroupCoordinator(val brokerId: Int,
             // coordinator OR the group is in a transient unstable phase. Let the member
retry
             // joining without the specified member id,
             responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
-
           case PreparingRebalance =>
             if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
               addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost,
protocolType, protocols, group, responseCallback)
@@ -616,11 +616,14 @@ class GroupCoordinator(val brokerId: Int,
                                     protocols: List[(String, Array[Byte])],
                                     group: GroupMetadata,
                                     callback: JoinCallback) = {
-    // use the client-id with a random id suffix as the member-id
     val memberId = clientId + "-" + group.generateMemberIdSuffix
     val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, rebalanceTimeoutMs,
       sessionTimeoutMs, protocolType, protocols)
     member.awaitingJoinCallback = callback
+    // update the newMemberAdded flag to indicate that the join group can be further delayed
+    if (group.is(PreparingRebalance) && group.generationId == 0)
+      group.newMemberAdded = true
+
     group.add(member)
     maybePrepareRebalance(group)
     member
@@ -647,11 +650,19 @@ class GroupCoordinator(val brokerId: Int,
     if (group.is(AwaitingSync))
       resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)
 
+    val delayedRebalance = if (group.is(Empty))
+      new InitialDelayedJoin(this,
+        joinPurgatory,
+        group,
+        groupConfig.groupInitialRebalanceDelayMs,
+        groupConfig.groupInitialRebalanceDelayMs,
+        max(group.rebalanceTimeoutMs - groupConfig.groupInitialRebalanceDelayMs, 0))
+    else
+      new DelayedJoin(this, group, group.rebalanceTimeoutMs)
+
     group.transitionTo(PreparingRebalance)
     info("Preparing to restabilize group %s with old generation %s".format(group.groupId,
group.generationId))
 
-    val rebalanceTimeout = group.rebalanceTimeoutMs
-    val delayedRebalance = new DelayedJoin(this, group, rebalanceTimeout)
     val groupKey = GroupKey(group.groupId)
     joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
   }
@@ -707,7 +718,11 @@ class GroupCoordinator(val brokerId: Int,
           for (member <- group.allMemberMetadata) {
             assert(member.awaitingJoinCallback != null)
             val joinResult = JoinGroupResult(
-              members = if (member.memberId == group.leaderId) { group.currentMemberMetadata
} else { Map.empty },
+              members = if (member.memberId == group.leaderId) {
+                group.currentMemberMetadata
+              } else {
+                Map.empty
+              },
               memberId = member.memberId,
               generationId = group.generationId,
               subProtocol = group.protocol,
@@ -797,7 +812,8 @@ object GroupCoordinator {
             time: Time): GroupCoordinator = {
     val offsetConfig = this.offsetConfig(config)
     val groupConfig = GroupConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs,
-      groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs)
+      groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs,
+      groupInitialRebalanceDelayMs = config.groupInitialRebalanceDelay)
 
     val groupMetadataManager = new GroupMetadataManager(config.brokerId, config.interBrokerProtocolVersion,
       offsetConfig, replicaManager, zkUtils, time)
@@ -807,7 +823,8 @@ object GroupCoordinator {
 }
 
 case class GroupConfig(groupMinSessionTimeoutMs: Int,
-                       groupMaxSessionTimeoutMs: Int)
+                       groupMaxSessionTimeoutMs: Int,
+                       groupInitialRebalanceDelayMs: Int)
 
 case class JoinGroupResult(members: Map[String, Array[Byte]],
                            memberId: String,

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed59f742/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index b433284..44f3f2b 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -151,6 +151,7 @@ private[group] class GroupMetadata(val groupId: String, initialState:
GroupState
   var generationId = 0
   var leaderId: String = null
   var protocol: String = null
+  var newMemberAdded: Boolean = false
 
   def is(groupState: GroupState) = state == groupState
   def not(groupState: GroupState) = state != groupState

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed59f742/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 47046ce..a941fb7 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -143,6 +143,7 @@ object Defaults {
   /** ********* Group coordinator configuration ***********/
   val GroupMinSessionTimeoutMs = 6000
   val GroupMaxSessionTimeoutMs = 300000
+  val GroupInitialRebalanceDelayMs = 3000
 
   /** ********* Offset management configuration ***********/
   val OffsetMetadataMaxSize = OffsetConfig.DefaultMaxMetadataSize
@@ -328,6 +329,7 @@ object KafkaConfig {
   /** ********* Group coordinator configuration ***********/
   val GroupMinSessionTimeoutMsProp = "group.min.session.timeout.ms"
   val GroupMaxSessionTimeoutMsProp = "group.max.session.timeout.ms"
+  val GroupInitialRebalanceDelayMsProp = "group.initial.rebalance.delay.ms"
   /** ********* Offset management configuration ***********/
   val OffsetMetadataMaxSizeProp = "offset.metadata.max.bytes"
   val OffsetsLoadBufferSizeProp = "offsets.load.buffer.size"
@@ -568,6 +570,7 @@ object KafkaConfig {
   /** ********* Consumer coordinator configuration ***********/
   val GroupMinSessionTimeoutMsDoc = "The minimum allowed session timeout for registered consumers.
Shorter timeouts result in quicker failure detection at the cost of more frequent consumer
heartbeating, which can overwhelm broker resources."
   val GroupMaxSessionTimeoutMsDoc = "The maximum allowed session timeout for registered consumers.
Longer timeouts give consumers more time to process messages in between heartbeats at the
cost of a longer time to detect failures."
+  val GroupInitialRebalanceDelayMsDoc = "The amount of time the group coordinator will wait
for more consumers to join a new group before performing the first rebalance. A longer delay
means potentially fewer rebalances, but increases the time until processing begins."
   /** ********* Offset management configuration ***********/
   val OffsetMetadataMaxSizeDoc = "The maximum size for a metadata entry associated with an
offset commit"
   val OffsetsLoadBufferSizeDoc = "Batch size for reading from the offsets segments when loading
offsets into the cache."
@@ -772,6 +775,7 @@ object KafkaConfig {
       /** ********* Group coordinator configuration ***********/
       .define(GroupMinSessionTimeoutMsProp, INT, Defaults.GroupMinSessionTimeoutMs, MEDIUM,
GroupMinSessionTimeoutMsDoc)
       .define(GroupMaxSessionTimeoutMsProp, INT, Defaults.GroupMaxSessionTimeoutMs, MEDIUM,
GroupMaxSessionTimeoutMsDoc)
+      .define(GroupInitialRebalanceDelayMsProp, INT, Defaults.GroupInitialRebalanceDelayMs,
MEDIUM, GroupInitialRebalanceDelayMsDoc)
 
       /** ********* Offset management configuration ***********/
       .define(OffsetMetadataMaxSizeProp, INT, Defaults.OffsetMetadataMaxSize, HIGH, OffsetMetadataMaxSizeDoc)
@@ -984,6 +988,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends
Abstra
   /** ********* Group coordinator configuration ***********/
   val groupMinSessionTimeoutMs = getInt(KafkaConfig.GroupMinSessionTimeoutMsProp)
   val groupMaxSessionTimeoutMs = getInt(KafkaConfig.GroupMaxSessionTimeoutMsProp)
+  val groupInitialRebalanceDelay = getInt(KafkaConfig.GroupInitialRebalanceDelayMsProp)
 
   /** ********* Offset management configuration ***********/
   val offsetMetadataMaxSize = getInt(KafkaConfig.OffsetMetadataMaxSizeProp)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed59f742/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index d3535c6..8d394c6 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -52,6 +52,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
   this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
   this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") // set small
enough session timeout
   this.serverConfig.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, "30000")
+  this.serverConfig.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
   this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
   this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
   this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed59f742/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index f21c1df..9fcdd9b 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -50,6 +50,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
   this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
   this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100")
   this.serverConfig.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, "30000")
+  this.serverConfig.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
   this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "0")
   this.producerConfig.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, producerBufferSize.toString)
   this.producerConfig.setProperty(ProducerConfig.CLIENT_ID_CONFIG, producerClientId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed59f742/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index fcfe0da..6635d7f 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -49,6 +49,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
   this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't
want to lose offset
   this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
   this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "10") // set small
enough session timeout
+  this.serverConfig.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
   this.serverConfig.setProperty(KafkaConfig.UncleanLeaderElectionEnableProp, "true")
   this.serverConfig.setProperty(KafkaConfig.AutoCreateTopicsEnableProp, "false")
   this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed59f742/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
index 434a47f..ddac764 100644
--- a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
@@ -57,6 +57,7 @@ class LegacyAdminClientTest extends IntegrationTestHarness with Logging
{
   this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't
want to lose offset
   this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
   this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") // set small
enough session timeout
+  this.serverConfig.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0") // do
initial rebalance immediately
   this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
   this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
   this.consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed59f742/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala
index 25fb713..b4aa56f 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala
@@ -17,7 +17,6 @@
 
 package kafka.coordinator.group
 
-import java.util.concurrent.TimeUnit
 
 import kafka.common.{OffsetAndMetadata, Topic}
 import kafka.server.{DelayedOperationPurgatory, KafkaConfig, ReplicaManager}
@@ -25,17 +24,19 @@ import kafka.utils._
 import kafka.utils.timer.MockTimer
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.record.{RecordBatch, MemoryRecords, TimestampType}
+import org.apache.kafka.common.record.{RecordBatch, MemoryRecords}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetCommitRequest, OffsetFetchResponse}
 import org.easymock.{Capture, EasyMock, IAnswer}
+import java.util.concurrent.TimeUnit
+
 import org.junit.Assert._
-import org.junit.{After, Before, Test}
+import org.junit.{After, Assert, Before, Test}
 import org.scalatest.junit.JUnitSuite
 
 import scala.collection._
 import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, Future, Promise}
+import scala.concurrent.{Await, Future, Promise, TimeoutException}
 
 /**
  * Test GroupCoordinator responses
@@ -57,6 +58,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
   val ConsumerMaxSessionTimeout = 1000
   val DefaultRebalanceTimeout = 500
   val DefaultSessionTimeout = 500
+  val GroupInitialRebalanceDelay = 50
   var timer: MockTimer = null
   var groupCoordinator: GroupCoordinator = null
   var replicaManager: ReplicaManager = null
@@ -78,6 +80,10 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")
     props.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, ConsumerMinSessionTimeout.toString)
     props.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, ConsumerMaxSessionTimeout.toString)
+    props.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, GroupInitialRebalanceDelay.toString)
+    // make two partitions of the group topic to make sure some partitions are not owned
by the coordinator
+    val ret = mutable.Map[String, Map[Int, Seq[Int]]]()
+    ret += (Topic.GroupMetadataTopicName -> Map(0 -> Seq(1), 1 -> Seq(1)))
 
     replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
 
@@ -169,7 +175,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     assertEquals(Errors.NONE, joinGroupResult.error)
 
     EasyMock.reset(replicaManager)
-    val otherJoinGroupResult = joinGroup(groupId, otherMemberId, "connect", protocols)
+    val otherJoinGroupResult = await(sendJoinGroup(groupId, otherMemberId, "connect", protocols),
1)
     assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, otherJoinGroupResult.error)
   }
 
@@ -179,11 +185,13 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
     val otherMemberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
-    val joinGroupResult = joinGroup(groupId, memberId, protocolType, List(("range", metadata)))
-    assertEquals(Errors.NONE, joinGroupResult.error)
+    val joinGroupFuture = sendJoinGroup(groupId, memberId, protocolType, List(("range", metadata)))
 
     EasyMock.reset(replicaManager)
     val otherJoinGroupResult = joinGroup(groupId, otherMemberId, protocolType, List(("roundrobin",
metadata)))
+
+    val joinGroupResult = await(joinGroupFuture, 1)
+    assertEquals(Errors.NONE, joinGroupResult.error)
     assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, otherJoinGroupResult.error)
   }
 
@@ -196,7 +204,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     assertEquals(Errors.NONE, joinGroupResult.error)
 
     EasyMock.reset(replicaManager)
-    val otherJoinGroupResult = joinGroup(groupId, otherMemberId, protocolType, protocols)
+    val otherJoinGroupResult = await(sendJoinGroup(groupId, otherMemberId, protocolType,
protocols), 1)
     assertEquals(Errors.UNKNOWN_MEMBER_ID, otherJoinGroupResult.error)
   }
 
@@ -552,7 +560,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
     // this shouldn't cause a rebalance since protocol information hasn't changed
     EasyMock.reset(replicaManager)
-    val followerJoinResult = joinGroup(groupId, otherJoinResult.memberId, protocolType, protocols)
+    val followerJoinResult = await(sendJoinGroup(groupId, otherJoinResult.memberId, protocolType,
protocols), 1)
 
     assertEquals(Errors.NONE, followerJoinResult.error)
     assertEquals(nextGenerationId, followerJoinResult.generationId)
@@ -574,7 +582,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     // leader to push new assignments when local metadata changes
 
     EasyMock.reset(replicaManager)
-    val secondJoinResult = joinGroup(groupId, firstMemberId, protocolType, protocols)
+    val secondJoinResult = await(sendJoinGroup(groupId, firstMemberId, protocolType, protocols),
1)
 
     assertEquals(Errors.NONE, secondJoinResult.error)
     assertNotEquals(firstGenerationId, secondJoinResult.generationId)
@@ -852,7 +860,9 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     assertEquals(Errors.NONE, syncGroupError)
 
     EasyMock.reset(replicaManager)
-    val otherJoinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val joinGroupFuture = sendJoinGroup(groupId, memberId, protocolType, protocols)
+    val otherJoinGroupResult = await(joinGroupFuture, 1)
+
     val nextGenerationId = otherJoinGroupResult.generationId
     val otherJoinGroupError = otherJoinGroupResult.error
     assertEquals(2, nextGenerationId)
@@ -989,6 +999,76 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     assertTrue(summary.members.forall(_.assignment.isEmpty))
   }
 
+  @Test
+  def shouldDelayInitialRebalanceByGroupInitialRebalanceDelayOnEmptyGroup() {
+    val firstJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType,
protocols)
+    timer.advanceClock(GroupInitialRebalanceDelay / 2)
+    verifyDelayedTaskNotCompleted(firstJoinFuture)
+    timer.advanceClock((GroupInitialRebalanceDelay / 2) + 1)
+    val joinGroupResult = await(firstJoinFuture, 1)
+    assertEquals(Errors.NONE, joinGroupResult.error)
+  }
+
+  private def verifyDelayedTaskNotCompleted(firstJoinFuture: Future[JoinGroupResult]) = {
+    try {
+      await(firstJoinFuture, 1)
+      Assert.fail("should have timed out as rebalance delay not expired")
+    } catch {
+      case _: TimeoutException => // ok
+    }
+  }
+
+  @Test
+  def shouldResetRebalanceDelayWhenNewMemberJoinsGroupInInitialRebalance() {
+    val rebalanceTimeout = GroupInitialRebalanceDelay * 3
+    val firstMemberJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID,
protocolType, protocols, rebalanceTimeout)
+    EasyMock.reset(replicaManager)
+    timer.advanceClock(GroupInitialRebalanceDelay - 1)
+    val secondMemberJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID,
protocolType, protocols, rebalanceTimeout)
+    EasyMock.reset(replicaManager)
+    timer.advanceClock(2)
+
+    // advance past initial rebalance delay and make sure that tasks
+    // haven't been completed
+    timer.advanceClock(GroupInitialRebalanceDelay / 2 + 1)
+    verifyDelayedTaskNotCompleted(firstMemberJoinFuture)
+    verifyDelayedTaskNotCompleted(secondMemberJoinFuture)
+    // advance clock beyond updated delay and make sure the
+    // tasks have completed
+    timer.advanceClock(GroupInitialRebalanceDelay / 2)
+    val firstResult = await(firstMemberJoinFuture, 1)
+    val secondResult = await(secondMemberJoinFuture, 1)
+    assertEquals(Errors.NONE, firstResult.error)
+    assertEquals(Errors.NONE, secondResult.error)
+  }
+
+  @Test
+  def shouldDelayRebalanceUptoRebalanceTimeout() {
+    val rebalanceTimeout = GroupInitialRebalanceDelay * 2
+    val firstMemberJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID,
protocolType, protocols, rebalanceTimeout)
+    EasyMock.reset(replicaManager)
+    val secondMemberJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID,
protocolType, protocols, rebalanceTimeout)
+    timer.advanceClock(GroupInitialRebalanceDelay + 1)
+    EasyMock.reset(replicaManager)
+    val thirdMemberJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID,
protocolType, protocols, rebalanceTimeout)
+    timer.advanceClock(GroupInitialRebalanceDelay)
+    EasyMock.reset(replicaManager)
+
+    verifyDelayedTaskNotCompleted(firstMemberJoinFuture)
+    verifyDelayedTaskNotCompleted(secondMemberJoinFuture)
+    verifyDelayedTaskNotCompleted(thirdMemberJoinFuture)
+
+    // advance clock beyond rebalanceTimeout
+    timer.advanceClock(1)
+
+    val firstResult = await(firstMemberJoinFuture, 1)
+    val secondResult = await(secondMemberJoinFuture, 1)
+    val thirdResult = await(thirdMemberJoinFuture, 1)
+    assertEquals(Errors.NONE, firstResult.error)
+    assertEquals(Errors.NONE, secondResult.error)
+    assertEquals(Errors.NONE, thirdResult.error)
+  }
+
   private def setupJoinGroupCallback: (Future[JoinGroupResult], JoinGroupCallback) = {
     val responsePromise = Promise[JoinGroupResult]
     val responseFuture = responsePromise.future
@@ -1077,7 +1157,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
                         sessionTimeout: Int = DefaultSessionTimeout,
                         rebalanceTimeout: Int = DefaultRebalanceTimeout): JoinGroupResult
= {
     val responseFuture = sendJoinGroup(groupId, memberId, protocolType, protocols, rebalanceTimeout,
sessionTimeout)
-    timer.advanceClock(10)
+    timer.advanceClock(GroupInitialRebalanceDelay + 1)
     // should only have to wait as long as session timeout, but allow some extra time in
case of an unexpected delay
     Await.result(responseFuture, Duration(rebalanceTimeout + 100, TimeUnit.MILLISECONDS))
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed59f742/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
index ab893cd..e62c0d3 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
@@ -120,12 +120,25 @@ class GroupMetadataTest extends JUnitSuite {
   }
 
   @Test(expected = classOf[IllegalStateException])
+  def testEmptyToStableIllegalTransition() {
+    group.transitionTo(Stable)
+  }
+
+  @Test
   def testStableToStableIllegalTransition() {
+    group.transitionTo(PreparingRebalance)
+    group.transitionTo(AwaitingSync)
     group.transitionTo(Stable)
+    try {
+      group.transitionTo(Stable)
+      fail("should have failed due to illegal transition")
+    } catch {
+      case e: IllegalStateException => // ok
+    }
   }
 
   @Test(expected = classOf[IllegalStateException])
-  def testStableToAwaitingSyncIllegalTransition() {
+  def testEmptyToAwaitingSyncIllegalTransition() {
     group.transitionTo(AwaitingSync)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed59f742/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 23c11aa..bf89533 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -614,6 +614,7 @@ class KafkaConfigTest {
         case KafkaConfig.ControlledShutdownEnableProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_boolean", "0")
         case KafkaConfig.GroupMinSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number")
         case KafkaConfig.GroupMaxSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number")
+        case KafkaConfig.GroupInitialRebalanceDelayMsProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number")
         case KafkaConfig.OffsetMetadataMaxSizeProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number")
         case KafkaConfig.OffsetsLoadBufferSizeProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number", "0")
         case KafkaConfig.OffsetsTopicReplicationFactorProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number", "0")

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed59f742/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index 25dd0af..70d271c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -67,6 +67,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
         putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true);
         putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(),
2 * 1024 * 1024L);
         putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.GroupMinSessionTimeoutMsProp(), 0);
+        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.GroupInitialRebalanceDelayMsProp(),
0);
         putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(),
(short) 1);
         putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), true);
 


Mime
View raw message