kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5565: Add a broker metric specifying the number of consumer group rebalances in progress
Date Mon, 09 Oct 2017 16:28:44 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a1ea53606 -> 6d6080f13


KAFKA-5565: Add a broker metric specifying the number of consumer group rebalances in progress

…up rebalances in progress

Author: Colin P. Mccabe <cmccabe@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>,
Guozhang Wang <wangguoz@gmail.com>

Closes #3506 from cmccabe/KAFKA-5565


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6d6080f1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6d6080f1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6d6080f1

Branch: refs/heads/trunk
Commit: 6d6080f13633508c57e48cbc12788ce643af4953
Parents: a1ea536
Author: Colin P. Mccabe <cmccabe@confluent.io>
Authored: Mon Oct 9 09:28:42 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Oct 9 09:28:42 2017 -0700

----------------------------------------------------------------------
 .../common/requests/DescribeGroupsResponse.java |  2 +-
 .../common/requests/JoinGroupResponse.java      | 14 ++++++
 .../kafka/admin/ConsumerGroupCommand.scala      |  2 +-
 .../coordinator/group/GroupCoordinator.scala    | 22 ++++-----
 .../kafka/coordinator/group/GroupMetadata.scala | 14 +++---
 .../group/GroupMetadataManager.scala            | 50 +++++++++++++++++---
 .../scala/kafka/metrics/KafkaMetricsGroup.scala |  2 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  2 +-
 .../group/GroupCoordinatorTest.scala            |  4 +-
 .../group/GroupMetadataManagerTest.scala        | 29 +++++++++++-
 .../coordinator/group/GroupMetadataTest.scala   | 30 ++++++------
 11 files changed, 125 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6080f1/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
index 61c5a36..313e113 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -64,7 +64,7 @@ public class DescribeGroupsResponse extends AbstractResponse {
     private static final Schema DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0 = new Schema(
             ERROR_CODE,
             new Field(GROUP_ID_KEY_NAME, STRING),
-            new Field(GROUP_STATE_KEY_NAME, STRING, "The current state of the group (one
of: Dead, Stable, AwaitingSync, " +
+            new Field(GROUP_STATE_KEY_NAME, STRING, "The current state of the group (one
of: Dead, Stable, CompletingRebalance, " +
                     "PreparingRebalance, or empty if there is no active group)"),
             new Field(PROTOCOL_TYPE_KEY_NAME, STRING, "The current group protocol type (will
be empty if there is no active group)"),
             new Field(PROTOCOL_KEY_NAME, STRING, "The current group protocol (only provided
if the group is Stable)"),

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6080f1/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index 56491eb..d9f987b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -202,4 +203,17 @@ public class JoinGroupResponse extends AbstractResponse {
 
         return struct;
     }
+
+    @Override
+    public String toString() {
+        return "JoinGroupResponse" +
+            "(throttleTimeMs=" + throttleTimeMs +
+            ", error=" + error +
+            ", generationId=" + generationId +
+            ", groupProtocol=" + groupProtocol +
+            ", memberId=" + memberId +
+            ", leaderId=" + leaderId +
+            ", members=" + ((members == null) ? "null" :
+                Utils.join(members.keySet(), ",")) + ")";
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6080f1/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 2120657..d71f062 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -89,7 +89,7 @@ object ConsumerGroupCommand extends Logging {
                 case Some("Empty") =>
                   System.err.println(s"Consumer group '$groupId' has no active members.")
                   printAssignment(assignments, true)
-                case Some("PreparingRebalance") | Some("AwaitingSync") =>
+                case Some("PreparingRebalance") | Some("CompletingRebalance") =>
                   System.err.println(s"Warning: Consumer group '$groupId' is rebalancing.")
                   printAssignment(assignments, true)
                 case Some("Stable") =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6080f1/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index bb59bcd..dd4d52d 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -169,7 +169,7 @@ class GroupCoordinator(val brokerId: Int,
               updateMemberAndRebalance(group, member, protocols, responseCallback)
             }
 
-          case AwaitingSync =>
+          case CompletingRebalance =>
             if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
               addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost,
protocolType, protocols, group, responseCallback)
             } else {
@@ -261,7 +261,7 @@ class GroupCoordinator(val brokerId: Int,
           case PreparingRebalance =>
             responseCallback(Array.empty, Errors.REBALANCE_IN_PROGRESS)
 
-          case AwaitingSync =>
+          case CompletingRebalance =>
             group.get(memberId).awaitingSyncCallback = responseCallback
 
             // if this is the leader, then we can attempt to persist state and transition
to stable
@@ -275,9 +275,9 @@ class GroupCoordinator(val brokerId: Int,
               groupManager.storeGroup(group, assignment, (error: Errors) => {
                 group synchronized {
                   // another member may have joined the group while we were awaiting this
callback,
-                  // so we must ensure we are still in the AwaitingSync state and the same
generation
+                  // so we must ensure we are still in the CompletingRebalance state and
the same generation
                   // when it gets invoked. if we have transitioned to another state, then
do nothing
-                  if (group.is(AwaitingSync) && generationId == group.generationId)
{
+                  if (group.is(CompletingRebalance) && generationId == group.generationId)
{
                     if (error != Errors.NONE) {
                       resetAndPropagateAssignmentError(group, error)
                       maybePrepareRebalance(group)
@@ -361,7 +361,7 @@ class GroupCoordinator(val brokerId: Int,
               case Empty =>
                 responseCallback(Errors.UNKNOWN_MEMBER_ID)
 
-              case AwaitingSync =>
+              case CompletingRebalance =>
                 if (!group.has(memberId))
                   responseCallback(Errors.UNKNOWN_MEMBER_ID)
                 else
@@ -456,7 +456,7 @@ class GroupCoordinator(val brokerId: Int,
         // the group is only using Kafka to store offsets
         // Also, for transactional offset commits we don't need to validate group membership
and the generation.
         groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback, producerId,
producerEpoch)
-      } else if (group.is(AwaitingSync)) {
+      } else if (group.is(CompletingRebalance)) {
         responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS))
       } else if (!group.has(memberId)) {
         responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID))
@@ -545,7 +545,7 @@ class GroupCoordinator(val brokerId: Int,
           }
           joinPurgatory.checkAndComplete(GroupKey(group.groupId))
 
-        case Stable | AwaitingSync =>
+        case Stable | CompletingRebalance =>
           for (member <- group.allMemberMetadata) {
             if (member.awaitingSyncCallback != null) {
               member.awaitingSyncCallback(Array.empty[Byte], Errors.NOT_COORDINATOR)
@@ -574,13 +574,13 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   private def setAndPropagateAssignment(group: GroupMetadata, assignment: Map[String, Array[Byte]])
{
-    assert(group.is(AwaitingSync))
+    assert(group.is(CompletingRebalance))
     group.allMemberMetadata.foreach(member => member.assignment = assignment(member.memberId))
     propagateAssignment(group, Errors.NONE)
   }
 
   private def resetAndPropagateAssignmentError(group: GroupMetadata, error: Errors) {
-    assert(group.is(AwaitingSync))
+    assert(group.is(CompletingRebalance))
     group.allMemberMetadata.foreach(_.assignment = Array.empty[Byte])
     propagateAssignment(group, error)
   }
@@ -674,7 +674,7 @@ class GroupCoordinator(val brokerId: Int,
 
   private def prepareRebalance(group: GroupMetadata) {
     // if any members are awaiting sync, cancel their request and have them rejoin
-    if (group.is(AwaitingSync))
+    if (group.is(CompletingRebalance))
       resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)
 
     val delayedRebalance = if (group.is(Empty))
@@ -700,7 +700,7 @@ class GroupCoordinator(val brokerId: Int,
     group.remove(member.memberId)
     group.currentState match {
       case Dead | Empty =>
-      case Stable | AwaitingSync => maybePrepareRebalance(group)
+      case Stable | CompletingRebalance => maybePrepareRebalance(group)
       case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6080f1/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index 18096bb..c4e071d 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -35,7 +35,7 @@ private[group] sealed trait GroupState
  *         park join group requests from new or existing members until all expected members
have joined
  *         allow offset commits from previous generation
  *         allow offset fetch requests
- * transition: some members have joined by the timeout => AwaitingSync
+ * transition: some members have joined by the timeout => CompletingRebalance
  *             all members have left the group => Empty
  *             group is removed by partition emigration => Dead
  */
@@ -54,7 +54,7 @@ private[group] case object PreparingRebalance extends GroupState
  *             member failure detected => PreparingRebalance
  *             group is removed by partition emigration => Dead
  */
-private[group] case object AwaitingSync extends GroupState
+private[group] case object CompletingRebalance extends GroupState
 
 /**
  * Group is stable
@@ -105,10 +105,10 @@ private[group] case object Empty extends GroupState
 
 private object GroupMetadata {
   private val validPreviousStates: Map[GroupState, Set[GroupState]] =
-    Map(Dead -> Set(Stable, PreparingRebalance, AwaitingSync, Empty, Dead),
-      AwaitingSync -> Set(PreparingRebalance),
-      Stable -> Set(AwaitingSync),
-      PreparingRebalance -> Set(Stable, AwaitingSync, Empty),
+    Map(Dead -> Set(Stable, PreparingRebalance, CompletingRebalance, Empty, Dead),
+      CompletingRebalance -> Set(PreparingRebalance),
+      Stable -> Set(CompletingRebalance),
+      PreparingRebalance -> Set(Stable, CompletingRebalance, Empty),
       Empty -> Set(PreparingRebalance))
 }
 
@@ -256,7 +256,7 @@ private[group] class GroupMetadata(val groupId: String, initialState:
GroupState
     if (members.nonEmpty) {
       generationId += 1
       protocol = selectProtocol
-      transitionTo(AwaitingSync)
+      transitionTo(CompletingRebalance)
     } else {
       generationId += 1
       protocol = null

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6080f1/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 7519dc4..8ef4894 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -82,19 +82,57 @@ class GroupMetadataManager(brokerId: Int,
 
   this.logIdent = s"[GroupMetadataManager brokerId=$brokerId] "
 
-  newGauge("NumOffsets",
+  private def recreateGauge[T](name: String, gauge: Gauge[T]): Gauge[T] = {
+    removeMetric(name)
+    newGauge(name, gauge)
+  }
+
+  recreateGauge("NumOffsets",
     new Gauge[Int] {
       def value = groupMetadataCache.values.map(group => {
         group synchronized { group.numOffsets }
       }).sum
-    }
-  )
+    })
 
-  newGauge("NumGroups",
+  recreateGauge("NumGroups",
     new Gauge[Int] {
       def value = groupMetadataCache.size
-    }
-  )
+    })
+
+  recreateGauge("NumGroupsPreparingRebalance",
+    new Gauge[Int] {
+      def value(): Int = groupMetadataCache.values.count(group => {
+        group synchronized { group.is(PreparingRebalance) }
+      })
+    })
+
+  recreateGauge("NumGroupsCompletingRebalance",
+    new Gauge[Int] {
+      def value(): Int = groupMetadataCache.values.count(group => {
+        group synchronized { group.is(CompletingRebalance) }
+      })
+    })
+
+  recreateGauge("NumGroupsStable",
+    new Gauge[Int] {
+      def value(): Int = groupMetadataCache.values.count(group => {
+        group synchronized { group.is(Stable) }
+      })
+    })
+
+  recreateGauge("NumGroupsDead",
+    new Gauge[Int] {
+      def value(): Int = groupMetadataCache.values.count(group => {
+        group synchronized { group.is(Dead) }
+      })
+    })
+
+  recreateGauge("NumGroupsEmpty",
+    new Gauge[Int] {
+      def value(): Int = groupMetadataCache.values.count(group => {
+        group synchronized { group.is(Empty) }
+      })
+    })
 
   def enableMetadataExpiration() {
     scheduler.startup()

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6080f1/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index b9ab486..46db8bc 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -40,7 +40,7 @@ trait KafkaMetricsGroup extends Logging {
    * @param tags Additional attributes which mBean will have.
    * @return Sanitized metric name object.
    */
-  protected def metricName(name: String, tags: scala.collection.Map[String, String]): MetricName
= {
+  def metricName(name: String, tags: scala.collection.Map[String, String]): MetricName =
{
     val klass = this.getClass
     val pkg = if (klass.getPackage == null) "" else klass.getPackage.getName
     val simpleName = klass.getSimpleName.replaceAll("\\$$", "")

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6080f1/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 755f500..0b5d43e 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -234,7 +234,7 @@ class ZooKeeperClientMetrics(zkClient: ZkClient, val time: Time)
     extends ZooKeeperClientWrapper(zkClient) with KafkaMetricsGroup {
   val latencyMetric = newHistogram("ZooKeeperRequestLatencyMs")
 
-  override protected def metricName(name: String, metricTags: scala.collection.Map[String,
String]): MetricName = {
+  override def metricName(name: String, metricTags: scala.collection.Map[String, String]):
MetricName = {
     explicitMetricName("kafka.server", "ZooKeeperClientMetrics", name, metricTags)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6080f1/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 3fed45d..c9f2ec6 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -1049,7 +1049,7 @@ class GroupCoordinatorTest extends JUnitSuite {
   }
 
   @Test
-  def testCommitOffsetInAwaitingSync() {
+  def testCommitOffsetInCompletingRebalance() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
     val tp = new TopicPartition("topic", 0)
     val offset = OffsetAndMetadata(0)
@@ -1232,7 +1232,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     assertEquals(Errors.NONE, error)
     assertEquals(protocolType, summary.protocolType)
     assertEquals(GroupCoordinator.NoProtocol, summary.protocol)
-    assertEquals(AwaitingSync.toString, summary.state)
+    assertEquals(CompletingRebalance.toString, summary.state)
     assertTrue(summary.members.map(_.memberId).contains(joinGroupResult.memberId))
     assertTrue(summary.members.forall(_.metadata.isEmpty))
     assertTrue(summary.members.forall(_.assignment.isEmpty))

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6080f1/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 46a1878..8e5b593 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -23,7 +23,7 @@ import kafka.common.OffsetAndMetadata
 import kafka.log.{Log, LogAppendInfo}
 import kafka.server.{FetchDataInfo, KafkaConfig, LogOffsetMetadata, ReplicaManager}
 import kafka.utils.TestUtils.fail
-import kafka.utils.{KafkaScheduler, MockTime, TestUtils, ZkUtils}
+import kafka.utils.{KafkaScheduler, Logging, MockTime, TestUtils, ZkUtils}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record._
@@ -34,6 +34,8 @@ import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
 import org.junit.{Before, Test}
 import java.nio.ByteBuffer
 
+import com.yammer.metrics.Metrics
+import com.yammer.metrics.core.Gauge
 import org.apache.kafka.common.internals.Topic
 
 import scala.collection.JavaConverters._
@@ -1392,4 +1394,29 @@ class GroupMetadataManagerTest {
     EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
   }
 
+  private def getGauge(manager: GroupMetadataManager, name: String): Gauge[Int]  = {
+    Metrics.defaultRegistry().allMetrics().get(manager.metricName(name, Map.empty)).asInstanceOf[Gauge[Int]]
+  }
+
+  private def expectMetrics(manager: GroupMetadataManager,
+                            expectedNumGroups: Int,
+                            expectedNumGroupsPreparingRebalance: Int,
+                            expectedNumGroupsCompletingRebalance: Int): Unit = {
+    assertEquals(expectedNumGroups, getGauge(manager, "NumGroups").value)
+    assertEquals(expectedNumGroupsPreparingRebalance, getGauge(manager, "NumGroupsPreparingRebalance").value)
+    assertEquals(expectedNumGroupsCompletingRebalance, getGauge(manager, "NumGroupsCompletingRebalance").value)
+  }
+
+  @Test
+  def testMetrics() {
+    groupMetadataManager.cleanupGroupMetadata()
+    expectMetrics(groupMetadataManager, 0, 0, 0)
+    val group = new GroupMetadata("foo2", Stable)
+    groupMetadataManager.addGroup(group)
+    expectMetrics(groupMetadataManager, 1, 0, 0)
+    group.transitionTo(PreparingRebalance)
+    expectMetrics(groupMetadataManager, 1, 1, 0)
+    group.transitionTo(CompletingRebalance)
+    expectMetrics(groupMetadataManager, 1, 0, 1)
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6080f1/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
index 2db6603..ca62bf8 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
@@ -49,9 +49,9 @@ class GroupMetadataTest extends JUnitSuite {
   }
 
   @Test
-  def testCanRebalanceWhenAwaitingSync() {
+  def testCanRebalanceWhenCompletingRebalance() {
     group.transitionTo(PreparingRebalance)
-    group.transitionTo(AwaitingSync)
+    group.transitionTo(CompletingRebalance)
     assertTrue(group.canRebalance)
   }
 
@@ -82,9 +82,9 @@ class GroupMetadataTest extends JUnitSuite {
   }
 
   @Test
-  def testAwaitingSyncToPreparingRebalanceTransition() {
+  def testAwaitingRebalanceToPreparingRebalanceTransition() {
     group.transitionTo(PreparingRebalance)
-    group.transitionTo(AwaitingSync)
+    group.transitionTo(CompletingRebalance)
     group.transitionTo(PreparingRebalance)
     assertState(group, PreparingRebalance)
   }
@@ -112,9 +112,9 @@ class GroupMetadataTest extends JUnitSuite {
   }
 
   @Test
-  def testAwaitingSyncToStableTransition() {
+  def testAwaitingRebalanceToStableTransition() {
     group.transitionTo(PreparingRebalance)
-    group.transitionTo(AwaitingSync)
+    group.transitionTo(CompletingRebalance)
     group.transitionTo(Stable)
     assertState(group, Stable)
   }
@@ -127,7 +127,7 @@ class GroupMetadataTest extends JUnitSuite {
   @Test
   def testStableToStableIllegalTransition() {
     group.transitionTo(PreparingRebalance)
-    group.transitionTo(AwaitingSync)
+    group.transitionTo(CompletingRebalance)
     group.transitionTo(Stable)
     try {
       group.transitionTo(Stable)
@@ -138,8 +138,8 @@ class GroupMetadataTest extends JUnitSuite {
   }
 
   @Test(expected = classOf[IllegalStateException])
-  def testEmptyToAwaitingSyncIllegalTransition() {
-    group.transitionTo(AwaitingSync)
+  def testEmptyToAwaitingRebalanceIllegalTransition() {
+    group.transitionTo(CompletingRebalance)
   }
 
   @Test(expected = classOf[IllegalStateException])
@@ -155,10 +155,10 @@ class GroupMetadataTest extends JUnitSuite {
   }
 
   @Test(expected = classOf[IllegalStateException])
-  def testAwaitingSyncToAwaitingSyncIllegalTransition() {
+  def testAwaitingRebalanceToAwaitingRebalanceIllegalTransition() {
     group.transitionTo(PreparingRebalance)
-    group.transitionTo(AwaitingSync)
-    group.transitionTo(AwaitingSync)
+    group.transitionTo(CompletingRebalance)
+    group.transitionTo(CompletingRebalance)
   }
 
   def testDeadToDeadIllegalTransition() {
@@ -183,10 +183,10 @@ class GroupMetadataTest extends JUnitSuite {
   }
 
   @Test(expected = classOf[IllegalStateException])
-  def testDeadToAwaitingSyncIllegalTransition() {
+  def testDeadToAwaitingRebalanceIllegalTransition() {
     group.transitionTo(PreparingRebalance)
     group.transitionTo(Dead)
-    group.transitionTo(AwaitingSync)
+    group.transitionTo(CompletingRebalance)
   }
 
   @Test
@@ -466,7 +466,7 @@ class GroupMetadataTest extends JUnitSuite {
   }
 
   private def assertState(group: GroupMetadata, targetState: GroupState) {
-    val states: Set[GroupState] = Set(Stable, PreparingRebalance, AwaitingSync, Dead)
+    val states: Set[GroupState] = Set(Stable, PreparingRebalance, CompletingRebalance, Dead)
     val otherStates = states - targetState
     otherStates.foreach { otherState =>
       assertFalse(group.is(otherState))


Mime
View raw message