kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2160; remove watcher list if empty in purgatory; remove join-group purgatory in coordinator; reviewed by Jun Rao
Date Wed, 20 May 2015 01:43:30 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 41ba26273 -> 0ad646620


KAFKA-2160; remove watcher list if empty in purgatory; remove join-group purgatory in coordinator; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0ad64662
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0ad64662
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0ad64662

Branch: refs/heads/trunk
Commit: 0ad646620614f257a0485ef3ff69ad8b83eeaf8d
Parents: 41ba262
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Tue May 19 18:43:18 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue May 19 18:43:18 2015 -0700

----------------------------------------------------------------------
 .../kafka/coordinator/ConsumerCoordinator.scala |  93 ++++------
 .../coordinator/ConsumerGroupMetadata.scala     | 131 ++++++++++++++
 .../kafka/coordinator/ConsumerMetadata.scala    |  49 ++++++
 .../kafka/coordinator/CoordinatorMetadata.scala |  10 +-
 .../kafka/coordinator/DelayedHeartbeat.scala    |   4 +-
 .../kafka/coordinator/DelayedJoinGroup.scala    |  39 -----
 .../kafka/coordinator/DelayedRebalance.scala    |   2 +-
 .../main/scala/kafka/coordinator/Group.scala    | 131 --------------
 .../kafka/coordinator/HeartbeatBucket.scala     |  43 -----
 .../scala/kafka/server/DelayedOperation.scala   |  43 ++++-
 .../other/kafka/TestPurgatoryPerformance.scala  |  25 ++-
 .../coordinator/ConsumerGroupMetadataTest.scala | 172 +++++++++++++++++++
 .../unit/kafka/coordinator/GroupTest.scala      | 172 -------------------
 13 files changed, 448 insertions(+), 466 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0ad64662/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
index 6f05488..af06ad4 100644
--- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
@@ -48,7 +48,6 @@ class ConsumerCoordinator(val config: KafkaConfig,
   private val isActive = new AtomicBoolean(false)
 
   private var heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat] = null
-  private var joinGroupPurgatory: DelayedOperationPurgatory[DelayedJoinGroup] = null
   private var rebalancePurgatory: DelayedOperationPurgatory[DelayedRebalance] = null
   private var coordinatorMetadata: CoordinatorMetadata = null
 
@@ -63,7 +62,6 @@ class ConsumerCoordinator(val config: KafkaConfig,
   def startup() {
     info("Starting up.")
     heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", config.brokerId)
-    joinGroupPurgatory = new DelayedOperationPurgatory[DelayedJoinGroup]("JoinGroup", config.brokerId)
     rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance", config.brokerId)
     coordinatorMetadata = new CoordinatorMetadata(config, zkClient, maybePrepareRebalance)
     isActive.set(true)
@@ -79,7 +77,6 @@ class ConsumerCoordinator(val config: KafkaConfig,
     isActive.set(false)
     coordinatorMetadata.shutdown()
     heartbeatPurgatory.shutdown()
-    joinGroupPurgatory.shutdown()
     rebalancePurgatory.shutdown()
     info("Shutdown complete.")
   }
@@ -113,7 +110,7 @@ class ConsumerCoordinator(val config: KafkaConfig,
     }
   }
 
-  private def doJoinGroup(group: Group,
+  private def doJoinGroup(group: ConsumerGroupMetadata,
                           consumerId: String,
                           topics: Set[String],
                           sessionTimeoutMs: Int,
@@ -154,14 +151,10 @@ class ConsumerCoordinator(val config: KafkaConfig,
           }
         }
 
-        consumer.awaitingRebalance = true
-
-        val delayedJoinGroup = new DelayedJoinGroup(this, group, consumer, 2 * MaxSessionTimeoutMs, responseCallback)
-        val consumerGroupKey = ConsumerGroupKey(group.groupId)
-        joinGroupPurgatory.tryCompleteElseWatch(delayedJoinGroup, Seq(consumerGroupKey))
+        consumer.awaitingRebalanceCallback = responseCallback
 
         if (group.is(PreparingRebalance))
-          rebalancePurgatory.checkAndComplete(consumerGroupKey)
+          rebalancePurgatory.checkAndComplete(ConsumerGroupKey(group.groupId))
       }
     }
   }
@@ -199,34 +192,36 @@ class ConsumerCoordinator(val config: KafkaConfig,
   /**
    * Complete existing DelayedHeartbeats for the given consumer and schedule the next one
    */
-  private def completeAndScheduleNextHeartbeatExpiration(group: Group, consumer: Consumer) {
+  private def completeAndScheduleNextHeartbeatExpiration(group: ConsumerGroupMetadata, consumer: ConsumerMetadata) {
+    // complete current heartbeat expectation
     consumer.latestHeartbeat = SystemTime.milliseconds
     val consumerKey = ConsumerKey(consumer.groupId, consumer.consumerId)
-    // TODO: can we fix DelayedOperationPurgatory to remove keys in watchersForKey with empty watchers list?
     heartbeatPurgatory.checkAndComplete(consumerKey)
-    val heartbeatDeadline = consumer.latestHeartbeat + consumer.sessionTimeoutMs
-    val delayedHeartbeat = new DelayedHeartbeat(this, group, consumer, heartbeatDeadline, consumer.sessionTimeoutMs)
+
+    // reschedule the next heartbeat expiration deadline
+    val newHeartbeatDeadline = consumer.latestHeartbeat + consumer.sessionTimeoutMs
+    val delayedHeartbeat = new DelayedHeartbeat(this, group, consumer, newHeartbeatDeadline, consumer.sessionTimeoutMs)
     heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(consumerKey))
   }
 
   private def addConsumer(consumerId: String,
                           topics: Set[String],
                           sessionTimeoutMs: Int,
-                          group: Group) = {
-    val consumer = new Consumer(consumerId, group.groupId, topics, sessionTimeoutMs)
+                          group: ConsumerGroupMetadata) = {
+    val consumer = new ConsumerMetadata(consumerId, group.groupId, topics, sessionTimeoutMs)
     val topicsToBind = topics -- group.topics
     group.add(consumer.consumerId, consumer)
     coordinatorMetadata.bindGroupToTopics(group.groupId, topicsToBind)
     consumer
   }
 
-  private def removeConsumer(group: Group, consumer: Consumer) {
+  private def removeConsumer(group: ConsumerGroupMetadata, consumer: ConsumerMetadata) {
     group.remove(consumer.consumerId)
     val topicsToUnbind = consumer.topics -- group.topics
     coordinatorMetadata.unbindGroupFromTopics(group.groupId, topicsToUnbind)
   }
 
-  private def updateConsumer(group: Group, consumer: Consumer, topics: Set[String]) {
+  private def updateConsumer(group: ConsumerGroupMetadata, consumer: ConsumerMetadata, topics: Set[String]) {
     val topicsToBind = topics -- group.topics
     group.remove(consumer.consumerId)
     val topicsToUnbind = consumer.topics -- group.topics
@@ -235,14 +230,14 @@ class ConsumerCoordinator(val config: KafkaConfig,
     coordinatorMetadata.bindAndUnbindGroupFromTopics(group.groupId, topicsToBind, topicsToUnbind)
   }
 
-  private def maybePrepareRebalance(group: Group) {
+  private def maybePrepareRebalance(group: ConsumerGroupMetadata) {
     group synchronized {
       if (group.canRebalance)
         prepareRebalance(group)
     }
   }
 
-  private def prepareRebalance(group: Group) {
+  private def prepareRebalance(group: ConsumerGroupMetadata) {
     group.transitionTo(PreparingRebalance)
     group.generationId += 1
     info("Preparing to rebalance group %s generation %s".format(group.groupId, group.generationId))
@@ -253,7 +248,9 @@ class ConsumerCoordinator(val config: KafkaConfig,
     rebalancePurgatory.tryCompleteElseWatch(delayedRebalance, Seq(consumerGroupKey))
   }
 
-  private def rebalance(group: Group) {
+  private def rebalance(group: ConsumerGroupMetadata) {
+    assert(group.notYetRejoinedConsumers == List.empty[ConsumerMetadata])
+
     group.transitionTo(Rebalancing)
     info("Rebalancing group %s generation %s".format(group.groupId, group.generationId))
 
@@ -263,11 +260,9 @@ class ConsumerCoordinator(val config: KafkaConfig,
 
     group.transitionTo(Stable)
     info("Stabilized group %s generation %s".format(group.groupId, group.generationId))
-    val consumerGroupKey = ConsumerGroupKey(group.groupId)
-    joinGroupPurgatory.checkAndComplete(consumerGroupKey)
   }
 
-  private def onConsumerHeartbeatExpired(group: Group, consumer: Consumer) {
+  private def onConsumerHeartbeatExpired(group: ConsumerGroupMetadata, consumer: ConsumerMetadata) {
     trace("Consumer %s in group %s has failed".format(consumer.consumerId, group.groupId))
     removeConsumer(group, consumer)
     maybePrepareRebalance(group)
@@ -275,7 +270,7 @@ class ConsumerCoordinator(val config: KafkaConfig,
 
   private def isCoordinatorForGroup(groupId: String) = offsetManager.leaderIsLocal(offsetManager.partitionFor(groupId))
 
-  private def reassignPartitions(group: Group) = {
+  private def reassignPartitions(group: ConsumerGroupMetadata) = {
     val assignor = PartitionAssignor.createInstance(group.partitionAssignmentStrategy)
     val topicsPerConsumer = group.topicsPerConsumer
     val partitionsPerTopic = coordinatorMetadata.partitionsPerTopic
@@ -286,31 +281,9 @@ class ConsumerCoordinator(val config: KafkaConfig,
     assignedPartitionsPerConsumer
   }
 
-  def tryCompleteJoinGroup(group: Group, forceComplete: () => Boolean) = {
-    group synchronized {
-      if (group.is(Stable))
-        forceComplete()
-      else false
-    }
-  }
-
-  def onExpirationJoinGroup() {
-    throw new IllegalStateException("DelayedJoinGroup should never expire")
-  }
-
-  def onCompleteJoinGroup(group: Group,
-                          consumer: Consumer,
-                          responseCallback:(Set[TopicAndPartition], String, Int, Short) => Unit) {
-    group synchronized {
-      consumer.awaitingRebalance = false
-      completeAndScheduleNextHeartbeatExpiration(group, consumer)
-      responseCallback(consumer.assignedTopicPartitions, consumer.consumerId, group.generationId, Errors.NONE.code)
-    }
-  }
-
-  def tryCompleteRebalance(group: Group, forceComplete: () => Boolean) = {
+  def tryCompleteRebalance(group: ConsumerGroupMetadata, forceComplete: () => Boolean) = {
     group synchronized {
-      if (group.allConsumersRejoined)
+      if (group.notYetRejoinedConsumers == List.empty[ConsumerMetadata])
         forceComplete()
       else false
     }
@@ -320,7 +293,7 @@ class ConsumerCoordinator(val config: KafkaConfig,
     // TODO: add metrics for rebalance timeouts
   }
 
-  def onCompleteRebalance(group: Group) {
+  def onCompleteRebalance(group: ConsumerGroupMetadata) {
     group synchronized {
       val failedConsumers = group.notYetRejoinedConsumers
       if (group.isEmpty || !failedConsumers.isEmpty) {
@@ -335,12 +308,22 @@ class ConsumerCoordinator(val config: KafkaConfig,
           coordinatorMetadata.removeGroup(group.groupId, group.topics)
         }
       }
-      if (!group.is(Dead))
+      if (!group.is(Dead)) {
+        // assign partitions to existing consumers of the group according to the partitioning strategy
         rebalance(group)
+
+        // trigger the awaiting join group response callback for all the consumers after rebalancing
+        for (consumer <- group.allConsumers) {
+          assert(consumer.awaitingRebalanceCallback != null)
+          consumer.awaitingRebalanceCallback(consumer.assignedTopicPartitions, consumer.consumerId, group.generationId, Errors.NONE.code)
+          consumer.awaitingRebalanceCallback = null
+          completeAndScheduleNextHeartbeatExpiration(group, consumer)
+        }
+      }
     }
   }
 
-  def tryCompleteHeartbeat(group: Group, consumer: Consumer, heartbeatDeadline: Long, forceComplete: () => Boolean) = {
+  def tryCompleteHeartbeat(group: ConsumerGroupMetadata, consumer: ConsumerMetadata, heartbeatDeadline: Long, forceComplete: () => Boolean) = {
     group synchronized {
       if (shouldKeepConsumerAlive(consumer, heartbeatDeadline))
         forceComplete()
@@ -348,7 +331,7 @@ class ConsumerCoordinator(val config: KafkaConfig,
     }
   }
 
-  def onExpirationHeartbeat(group: Group, consumer: Consumer, heartbeatDeadline: Long) {
+  def onExpirationHeartbeat(group: ConsumerGroupMetadata, consumer: ConsumerMetadata, heartbeatDeadline: Long) {
     group synchronized {
       if (!shouldKeepConsumerAlive(consumer, heartbeatDeadline))
         onConsumerHeartbeatExpired(group, consumer)
@@ -357,6 +340,6 @@ class ConsumerCoordinator(val config: KafkaConfig,
 
   def onCompleteHeartbeat() {}
 
-  private def shouldKeepConsumerAlive(consumer: Consumer, heartbeatDeadline: Long) =
-    consumer.awaitingRebalance || consumer.latestHeartbeat > heartbeatDeadline - consumer.sessionTimeoutMs
+  private def shouldKeepConsumerAlive(consumer: ConsumerMetadata, heartbeatDeadline: Long) =
+    consumer.awaitingRebalanceCallback != null || consumer.latestHeartbeat + consumer.sessionTimeoutMs > heartbeatDeadline
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0ad64662/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala b/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala
new file mode 100644
index 0000000..47bdfa7
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator
+
+import kafka.utils.nonthreadsafe
+
+import java.util.UUID
+
+import collection.mutable
+
+private[coordinator] sealed trait GroupState { def state: Byte }
+
+/**
+ * Consumer group is preparing to rebalance
+ *
+ * action: respond to heartbeats with an ILLEGAL GENERATION error code
+ * transition: some consumers have joined by the timeout => Rebalancing
+ *             all consumers have left the group => Dead
+ */
+private[coordinator] case object PreparingRebalance extends GroupState { val state: Byte = 1 }
+
+/**
+ * Consumer group is rebalancing
+ *
+ * action: compute the group's partition assignment
+ *         send the join-group response with new partition assignment when rebalance is complete
+ * transition: partition assignment has been computed => Stable
+ */
+private[coordinator] case object Rebalancing extends GroupState { val state: Byte = 2 }
+
+/**
+ * Consumer group is stable
+ *
+ * action: respond to consumer heartbeats normally
+ * transition: consumer failure detected via heartbeat => PreparingRebalance
+ *             consumer join-group received => PreparingRebalance
+ *             zookeeper topic watcher fired => PreparingRebalance
+ */
+private[coordinator] case object Stable extends GroupState { val state: Byte = 3 }
+
+/**
+ * Consumer group has no more members
+ *
+ * action: none
+ * transition: none
+ */
+private[coordinator] case object Dead extends GroupState { val state: Byte = 4 }
+
+
+/**
+ * Group contains the following metadata:
+ *
+ *  Membership metadata:
+ *  1. Consumers registered in this group
+ *  2. Partition assignment strategy for this group
+ *
+ *  State metadata:
+ *  1. group state
+ *  2. generation id
+ */
+@nonthreadsafe
+private[coordinator] class ConsumerGroupMetadata(val groupId: String,
+                                                 val partitionAssignmentStrategy: String) {
+
+  private val validPreviousStates: Map[GroupState, Set[GroupState]] =
+    Map(Dead -> Set(PreparingRebalance),
+      Stable -> Set(Rebalancing),
+      PreparingRebalance -> Set(Stable),
+      Rebalancing -> Set(PreparingRebalance))
+
+  private val consumers = new mutable.HashMap[String, ConsumerMetadata]
+  private var state: GroupState = Stable
+  var generationId = 0
+
+  def is(groupState: GroupState) = state == groupState
+  def has(consumerId: String) = consumers.contains(consumerId)
+  def get(consumerId: String) = consumers(consumerId)
+
+  def add(consumerId: String, consumer: ConsumerMetadata) {
+    consumers.put(consumerId, consumer)
+  }
+
+  def remove(consumerId: String) {
+    consumers.remove(consumerId)
+  }
+
+  def isEmpty = consumers.isEmpty
+
+  def topicsPerConsumer = consumers.mapValues(_.topics).toMap
+
+  def topics = consumers.values.flatMap(_.topics).toSet
+
+  def notYetRejoinedConsumers = consumers.values.filter(_.awaitingRebalanceCallback == null).toList
+
+  def allConsumers = consumers.values.toList
+
+  def rebalanceTimeout = consumers.values.foldLeft(0) {(timeout, consumer) =>
+    timeout.max(consumer.sessionTimeoutMs)
+  }
+
+  // TODO: decide if ids should be predictable or random
+  def generateNextConsumerId = UUID.randomUUID().toString
+
+  def canRebalance = state == Stable
+
+  def transitionTo(groupState: GroupState) {
+    assertValidTransition(groupState)
+    state = groupState
+  }
+
+  private def assertValidTransition(targetState: GroupState) {
+    if (!validPreviousStates(targetState).contains(state))
+      throw new IllegalStateException("Group %s should be in the %s states before moving to %s state. Instead it is in %s state"
+        .format(groupId, validPreviousStates(targetState).mkString(","), targetState, state))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0ad64662/core/src/main/scala/kafka/coordinator/ConsumerMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ConsumerMetadata.scala b/core/src/main/scala/kafka/coordinator/ConsumerMetadata.scala
new file mode 100644
index 0000000..d5486cf
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/ConsumerMetadata.scala
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator
+
+import kafka.common.TopicAndPartition
+import kafka.utils.nonthreadsafe
+
+/**
+ * Consumer metadata contains the following metadata:
+ *
+ * Heartbeat metadata:
+ * 1. negotiated heartbeat session timeout
+ * 2. timestamp of the latest heartbeat
+ *
+ * Subscription metadata:
+ * 1. subscribed topics
+ * 2. assigned partitions for the subscribed topics
+ *
+ * In addition, it also contains the following state information:
+ *
+ * 1. Awaiting rebalance callback: when the consumer group is in the prepare-rebalance state,
+ *                                 its rebalance callback will be kept in the metadata if the
+ *                                 consumer has sent the join group request
+ */
+@nonthreadsafe
+private[coordinator] class ConsumerMetadata(val consumerId: String,
+                                            val groupId: String,
+                                            var topics: Set[String],
+                                            val sessionTimeoutMs: Int) {
+
+  var awaitingRebalanceCallback: (Set[TopicAndPartition], String, Int, Short) => Unit = null
+  var assignedTopicPartitions = Set.empty[TopicAndPartition]
+  var latestHeartbeat: Long = -1
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0ad64662/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala
index 88e82b6..c39e6de 100644
--- a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala
@@ -34,7 +34,7 @@ import scala.collection.mutable
 @threadsafe
 private[coordinator] class CoordinatorMetadata(config: KafkaConfig,
                                                zkClient: ZkClient,
-                                               maybePrepareRebalance: Group => Unit) {
+                                               maybePrepareRebalance: ConsumerGroupMetadata => Unit) {
 
   /**
    * NOTE: If a group lock and coordinatorLock are simultaneously needed,
@@ -45,7 +45,7 @@ private[coordinator] class CoordinatorMetadata(config: KafkaConfig,
   /**
    * These should be guarded by metadataLock
    */
-  private val groups = new mutable.HashMap[String, Group]
+  private val groups = new mutable.HashMap[String, ConsumerGroupMetadata]
   private val groupsPerTopic = new mutable.HashMap[String, Set[String]]
   private val topicPartitionCounts = new mutable.HashMap[String, Int]
   private val topicPartitionChangeListeners = new mutable.HashMap[String, TopicPartitionChangeListener]
@@ -80,7 +80,7 @@ private[coordinator] class CoordinatorMetadata(config: KafkaConfig,
    */
   def addGroup(groupId: String, partitionAssignmentStrategy: String) = {
     inWriteLock(metadataLock) {
-      groups.getOrElseUpdate(groupId, new Group(groupId, partitionAssignmentStrategy))
+      groups.getOrElseUpdate(groupId, new ConsumerGroupMetadata(groupId, partitionAssignmentStrategy))
     }
   }
 
@@ -195,7 +195,7 @@ private[coordinator] class CoordinatorMetadata(config: KafkaConfig,
           topicPartitionCounts.put(topic, numPartitions)
           groupsPerTopic(topic).map(groupId => groups(groupId))
         }
-        else Set.empty[Group]
+        else Set.empty[ConsumerGroupMetadata]
       }
       groupsToRebalance.foreach(maybePrepareRebalance)
     }
@@ -212,7 +212,7 @@ private[coordinator] class CoordinatorMetadata(config: KafkaConfig,
           topicPartitionCounts.put(topic, 0)
           groupsPerTopic(topic).map(groupId => groups(groupId))
         }
-        else Set.empty[Group]
+        else Set.empty[ConsumerGroupMetadata]
       }
       groupsToRebalance.foreach(maybePrepareRebalance)
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0ad64662/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
index b3360cc..70a710c 100644
--- a/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
+++ b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
@@ -24,8 +24,8 @@ import kafka.server.DelayedOperation
  * Heartbeats are paused during rebalance.
  */
 private[coordinator] class DelayedHeartbeat(consumerCoordinator: ConsumerCoordinator,
-                                            group: Group,
-                                            consumer: Consumer,
+                                            group: ConsumerGroupMetadata,
+                                            consumer: ConsumerMetadata,
                                             heartbeatDeadline: Long,
                                             sessionTimeout: Long)
   extends DelayedOperation(sessionTimeout) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0ad64662/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala b/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala
deleted file mode 100644
index 8f57d38..0000000
--- a/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.coordinator
-
-import kafka.common.TopicAndPartition
-import kafka.server.DelayedOperation
-
-/**
- * Delayed join-group operations that are kept in the purgatory before the partition assignment completed
- *
- * These operation should never expire; when the rebalance has completed, all consumer's
- * join-group operations will be completed by sending back the response with the
- * calculated partition assignment.
- */
-private[coordinator] class DelayedJoinGroup(consumerCoordinator: ConsumerCoordinator,
-                                            group: Group,
-                                            consumer: Consumer,
-                                            sessionTimeout: Long,
-                                            responseCallback:(Set[TopicAndPartition], String, Int, Short) => Unit)
-  extends DelayedOperation(sessionTimeout) {
-  override def tryComplete(): Boolean = consumerCoordinator.tryCompleteJoinGroup(group, forceComplete)
-  override def onExpiration() = consumerCoordinator.onExpirationJoinGroup()
-  override def onComplete() = consumerCoordinator.onCompleteJoinGroup(group, consumer, responseCallback)
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0ad64662/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala b/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala
index 689621c..8247d33 100644
--- a/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala
+++ b/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala
@@ -30,7 +30,7 @@ import kafka.server.DelayedOperation
  * the rest of the group.
  */
 private[coordinator] class DelayedRebalance(consumerCoordinator: ConsumerCoordinator,
-                                            group: Group,
+                                            group: ConsumerGroupMetadata,
                                             sessionTimeout: Long)
   extends DelayedOperation(sessionTimeout) {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0ad64662/core/src/main/scala/kafka/coordinator/Group.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/Group.scala b/core/src/main/scala/kafka/coordinator/Group.scala
deleted file mode 100644
index 048eeee..0000000
--- a/core/src/main/scala/kafka/coordinator/Group.scala
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.coordinator
-
-import kafka.utils.nonthreadsafe
-
-import java.util.UUID
-
-import collection.mutable
-
-private[coordinator] sealed trait GroupState { def state: Byte }
-
-/**
- * Consumer group is preparing to rebalance
- *
- * action: respond to heartbeats with an ILLEGAL GENERATION error code
- * transition: some consumers have joined by the timeout => Rebalancing
- *             all consumers have left the group => Dead
- */
-private[coordinator] case object PreparingRebalance extends GroupState { val state: Byte = 1 }
-
-/**
- * Consumer group is rebalancing
- *
- * action: compute the group's partition assignment
- *         send the join-group response with new partition assignment when rebalance is complete
- * transition: partition assignment has been computed => Stable
- */
-private[coordinator] case object Rebalancing extends GroupState { val state: Byte = 2 }
-
-/**
- * Consumer group is stable
- *
- * action: respond to consumer heartbeats normally
- * transition: consumer failure detected via heartbeat => PreparingRebalance
- *             consumer join-group received => PreparingRebalance
- *             zookeeper topic watcher fired => PreparingRebalance
- */
-private[coordinator] case object Stable extends GroupState { val state: Byte = 3 }
-
-/**
- * Consumer group has no more members
- *
- * action: none
- * transition: none
- */
-private[coordinator] case object Dead extends GroupState { val state: Byte = 4 }
-
-
-/**
- * A group contains the following metadata:
- *
- *  Membership metadata:
- *  1. Consumers registered in this group
- *  2. Partition assignment strategy for this group
- *
- *  State metadata:
- *  1. group state
- *  2. generation id
- */
-@nonthreadsafe
-private[coordinator] class Group(val groupId: String,
-                                 val partitionAssignmentStrategy: String) {
-
-  private val validPreviousStates: Map[GroupState, Set[GroupState]] =
-    Map(Dead -> Set(PreparingRebalance),
-      Stable -> Set(Rebalancing),
-      PreparingRebalance -> Set(Stable),
-      Rebalancing -> Set(PreparingRebalance))
-
-  private val consumers = new mutable.HashMap[String, Consumer]
-  private var state: GroupState = Stable
-  var generationId = 0
-
-  def is(groupState: GroupState) = state == groupState
-  def has(consumerId: String) = consumers.contains(consumerId)
-  def get(consumerId: String) = consumers(consumerId)
-
-  def add(consumerId: String, consumer: Consumer) {
-    consumers.put(consumerId, consumer)
-  }
-
-  def remove(consumerId: String) {
-    consumers.remove(consumerId)
-  }
-
-  def isEmpty = consumers.isEmpty
-
-  def topicsPerConsumer = consumers.mapValues(_.topics).toMap
-
-  def topics = consumers.values.flatMap(_.topics).toSet
-
-  def allConsumersRejoined = consumers.values.forall(_.awaitingRebalance)
-
-  def notYetRejoinedConsumers = consumers.values.filter(!_.awaitingRebalance).toList
-
-  def rebalanceTimeout = consumers.values.foldLeft(0) {(timeout, consumer) =>
-    timeout.max(consumer.sessionTimeoutMs)
-  }
-
-  // TODO: decide if ids should be predictable or random
-  def generateNextConsumerId = UUID.randomUUID().toString
-
-  def canRebalance = state == Stable
-
-  def transitionTo(groupState: GroupState) {
-    assertValidTransition(groupState)
-    state = groupState
-  }
-
-  private def assertValidTransition(targetState: GroupState) {
-    if (!validPreviousStates(targetState).contains(state))
-      throw new IllegalStateException("Group %s should be in the %s states before moving to %s state. Instead it is in %s state"
-        .format(groupId, validPreviousStates(targetState).mkString(","), targetState, state))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0ad64662/core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala b/core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala
deleted file mode 100644
index b6b9f5f..0000000
--- a/core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.coordinator
-
-import kafka.common.TopicAndPartition
-import kafka.utils.nonthreadsafe
-
-/**
- * A consumer contains the following metadata:
- *
- * Heartbeat metadata:
- * 1. negotiated heartbeat session timeout
- * 2. timestamp of the latest heartbeat
- *
- * Subscription metadata:
- * 1. subscribed topics
- * 2. assigned partitions for the subscribed topics
- */
-@nonthreadsafe
-private[coordinator] class Consumer(val consumerId: String,
-                                    val groupId: String,
-                                    var topics: Set[String],
-                                    val sessionTimeoutMs: Int) {
-
-  var awaitingRebalance = false
-  var assignedTopicPartitions = Set.empty[TopicAndPartition]
-  var latestHeartbeat: Long = -1
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0ad64662/core/src/main/scala/kafka/server/DelayedOperation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 2ed9b46..123078d 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -19,11 +19,13 @@ package kafka.server
 
 import kafka.utils._
 import kafka.utils.timer._
+import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.metrics.KafkaMetricsGroup
 
 import java.util.LinkedList
 import java.util.concurrent._
 import java.util.concurrent.atomic._
+import java.util.concurrent.locks.ReentrantReadWriteLock
 
 import org.apache.kafka.common.utils.Utils
 
@@ -122,7 +124,9 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br
   private[this] val timeoutTimer = new Timer(executor)
 
   /* a list of operation watching keys */
-  private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers))
+  private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key)))
+
+  private val removeWatchersLock = new ReentrantReadWriteLock()
 
   // the number of estimated total operations in the purgatory
   private[this] val estimatedTotalOperations = new AtomicInteger(0)
@@ -217,7 +221,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br
    * @return the number of completed operations during this process
    */
   def checkAndComplete(key: Any): Int = {
-    val watchers = watchersForKey.get(key)
+    val watchers = inReadLock(removeWatchersLock) { watchersForKey.get(key) }
     if(watchers == null)
       0
     else
@@ -229,7 +233,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br
    * on multiple lists, and some of its watched entries may still be in the watch lists
    * even when it has been completed, this number may be larger than the number of real operations watched
    */
-  def watched() = watchersForKey.values.map(_.watched).sum
+  def watched() = allWatchers.map(_.watched).sum
 
   /**
    * Return the number of delayed operations in the expiry queue
@@ -239,7 +243,22 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br
   /*
    * Return the watch list of the given key
    */
-  private def watchersFor(key: Any) = watchersForKey.getAndMaybePut(key)
+  private def watchersFor(key: Any) = inReadLock(removeWatchersLock) { watchersForKey.getAndMaybePut(key) }
+
+  /*
+   * Return all the current watcher lists
+   */
+  private def allWatchers = inReadLock(removeWatchersLock) { watchersForKey.values }
+
+  /*
+   * Remove the key from watcher lists if its list is empty
+   */
+  private def removeKeyIfEmpty(key: Any) = inWriteLock(removeWatchersLock) {
+    val watchers = watchersForKey.get(key)
+    if (watchers != null && watchers.watched == 0) {
+      watchersForKey.remove(key)
+    }
+  }
 
   /**
    * Shutdown the expire reaper thread
@@ -252,11 +271,11 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br
   /**
    * A linked list of watched delayed operations based on some key
    */
-  private class Watchers {
+  private class Watchers(val key: Any) {
 
     private[this] val operations = new LinkedList[T]()
 
-    def watched(): Int = operations synchronized operations.size
+    def watched: Int = operations synchronized operations.size
 
     // add the element to watch
     def watch(t: T) {
@@ -266,8 +285,8 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br
     // traverse the list and try to complete some watched elements
     def tryCompleteWatched(): Int = {
 
+      var completed = 0
       operations synchronized {
-        var completed = 0
         val iter = operations.iterator()
         while (iter.hasNext) {
           val curr = iter.next()
@@ -279,8 +298,11 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br
             iter.remove()
           }
         }
-        completed
+
+        if (operations.size == 0)
+          removeKeyIfEmpty(key)
       }
+      completed
     }
 
     // traverse the list and purge elements that are already completed by others
@@ -295,6 +317,9 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br
             purged += 1
           }
         }
+
+        if (operations.size == 0)
+          removeKeyIfEmpty(key)
       }
       purged
     }
@@ -319,7 +344,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br
         // a little overestimated total number of operations.
         estimatedTotalOperations.getAndSet(delayed)
         debug("Begin purging watch lists")
-        val purged = watchersForKey.values.map(_.purgeCompleted()).sum
+        val purged = allWatchers.map(_.purgeCompleted()).sum
         debug("Purged %d elements from watch lists.".format(purged))
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0ad64662/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
index 39d6d8a..744be3b 100644
--- a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
+++ b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
@@ -36,32 +36,37 @@ object TestPurgatoryPerformance {
 
   def main(args: Array[String]): Unit = {
     val parser = new OptionParser
+    val keySpaceSizeOpt = parser.accepts("key-space-size", "The total number of possible keys")
+      .withRequiredArg
+      .describedAs("total_num_possible_keys")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(100)
     val numRequestsOpt = parser.accepts("num", "The number of requests")
       .withRequiredArg
       .describedAs("num_requests")
       .ofType(classOf[java.lang.Double])
-    val requestRateOpt = parser.accepts("rate", "The request rate")
+    val requestRateOpt = parser.accepts("rate", "The request rate per second")
       .withRequiredArg
       .describedAs("request_per_second")
       .ofType(classOf[java.lang.Double])
-    val requestDataSizeOpt = parser.accepts("size", "The request data size")
+    val requestDataSizeOpt = parser.accepts("size", "The request data size in bytes")
       .withRequiredArg
       .describedAs("num_bytes")
       .ofType(classOf[java.lang.Long])
-    val numKeysOpt = parser.accepts("keys", "The number of keys")
+    val numKeysOpt = parser.accepts("keys", "The number of keys for each request")
       .withRequiredArg
       .describedAs("num_keys")
       .ofType(classOf[java.lang.Integer])
       .defaultsTo(3)
-    val timeoutOpt = parser.accepts("timeout", "The request timeout")
+    val timeoutOpt = parser.accepts("timeout", "The request timeout in ms")
       .withRequiredArg
       .describedAs("timeout_milliseconds")
       .ofType(classOf[java.lang.Long])
-    val pct75Opt = parser.accepts("pct75", "75th percentile of request latency (log-normal distribution)")
+    val pct75Opt = parser.accepts("pct75", "75th percentile of request latency in ms (log-normal distribution)")
       .withRequiredArg
       .describedAs("75th_percentile")
       .ofType(classOf[java.lang.Double])
-    val pct50Opt = parser.accepts("pct50", "50th percentile of request latency (log-normal distribution)")
+    val pct50Opt = parser.accepts("pct50", "50th percentile of request latency in ms (log-normal distribution)")
       .withRequiredArg
       .describedAs("50th_percentile")
       .ofType(classOf[java.lang.Double])
@@ -78,6 +83,7 @@ object TestPurgatoryPerformance {
     val numRequests = options.valueOf(numRequestsOpt).intValue
     val requestRate = options.valueOf(requestRateOpt).doubleValue
     val requestDataSize = options.valueOf(requestDataSizeOpt).intValue
+    val numPossibleKeys = options.valueOf(keySpaceSizeOpt).intValue
     val numKeys = options.valueOf(numKeysOpt).intValue
     val timeout = options.valueOf(timeoutOpt).longValue
     val pct75 = options.valueOf(pct75Opt).doubleValue
@@ -97,7 +103,8 @@ object TestPurgatoryPerformance {
     val initialCpuTimeNano = getProcessCpuTimeNanos(osMXBean)
     val latch = new CountDownLatch(numRequests)
     val start = System.currentTimeMillis
-    val keys = (0 until numKeys).map(i => "fakeKey%d".format(i))
+    val rand = new Random()
+    val keys = (0 until numKeys).map(i => "fakeKey%d".format(rand.nextInt(numPossibleKeys)))
     @volatile var requestArrivalTime = start
     @volatile var end = 0L
     val generator = new Runnable {
@@ -133,7 +140,7 @@ object TestPurgatoryPerformance {
       println("# enqueue rate (%d requests):".format(numRequests))
       val gcCountHeader = gcNames.map("<" + _ + " count>").mkString(" ")
       val gcTimeHeader = gcNames.map("<" + _ + " time ms>").mkString(" ")
-      println("# <elapsed time ms> <target rate> <actual rate> <process cpu time ms> %s %s".format(gcCountHeader, gcTimeHeader))
+      println("# <elapsed time ms>\t<target rate>\t<actual rate>\t<process cpu time ms>\t%s\t%s".format(gcCountHeader, gcTimeHeader))
     }
 
     val targetRate = numRequests.toDouble * 1000d / (requestArrivalTime - start).toDouble
@@ -143,7 +150,7 @@ object TestPurgatoryPerformance {
     val gcCounts = gcMXBeans.map(_.getCollectionCount)
     val gcTimes = gcMXBeans.map(_.getCollectionTime)
 
-    println("%d %f %f %d %s %s".format(done - start, targetRate, actualRate, cpuTime.getOrElse(-1L), gcCounts.mkString(" "), gcTimes.mkString(" ")))
+    println("%d\t%f\t%f\t%d\t%s\t%s".format(done - start, targetRate, actualRate, cpuTime.getOrElse(-1L), gcCounts.mkString(" "), gcTimes.mkString(" ")))
 
     purgatory.shutdown()
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0ad64662/core/src/test/scala/unit/kafka/coordinator/ConsumerGroupMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/ConsumerGroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/ConsumerGroupMetadataTest.scala
new file mode 100644
index 0000000..b69c993
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/coordinator/ConsumerGroupMetadataTest.scala
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator
+
+import junit.framework.Assert._
+import org.junit.{Before, Test}
+import org.scalatest.junit.JUnitSuite
+
+/**
+ * Test group state transitions
+ */
+class ConsumerGroupMetadataTest extends JUnitSuite {
+  var group: ConsumerGroupMetadata = null
+
+  @Before
+  def setUp() {
+    group = new ConsumerGroupMetadata("test", "range")
+  }
+
+  @Test
+  def testCanRebalanceWhenStable() {
+    assertTrue(group.canRebalance)
+  }
+
+  @Test
+  def testCannotRebalanceWhenPreparingRebalance() {
+    group.transitionTo(PreparingRebalance)
+    assertFalse(group.canRebalance)
+  }
+
+  @Test
+  def testCannotRebalanceWhenRebalancing() {
+    group.transitionTo(PreparingRebalance)
+    group.transitionTo(Rebalancing)
+    assertFalse(group.canRebalance)
+  }
+
+  @Test
+  def testCannotRebalanceWhenDead() {
+    group.transitionTo(PreparingRebalance)
+    group.transitionTo(Dead)
+    assertFalse(group.canRebalance)
+  }
+
+  @Test
+  def testStableToPreparingRebalanceTransition() {
+    group.transitionTo(PreparingRebalance)
+    assertState(group, PreparingRebalance)
+  }
+
+  @Test
+  def testPreparingRebalanceToRebalancingTransition() {
+    group.transitionTo(PreparingRebalance)
+    group.transitionTo(Rebalancing)
+    assertState(group, Rebalancing)
+  }
+
+  @Test
+  def testPreparingRebalanceToDeadTransition() {
+    group.transitionTo(PreparingRebalance)
+    group.transitionTo(Dead)
+    assertState(group, Dead)
+  }
+
+  @Test
+  def testRebalancingToStableTransition() {
+    group.transitionTo(PreparingRebalance)
+    group.transitionTo(Rebalancing)
+    group.transitionTo(Stable)
+    assertState(group, Stable)
+  }
+
+  @Test(expected = classOf[IllegalStateException])
+  def testStableToStableIllegalTransition() {
+    group.transitionTo(Stable)
+  }
+
+  @Test(expected = classOf[IllegalStateException])
+  def testStableToRebalancingIllegalTransition() {
+    group.transitionTo(Rebalancing)
+  }
+
+  @Test(expected = classOf[IllegalStateException])
+  def testStableToDeadIllegalTransition() {
+    group.transitionTo(Dead)
+  }
+
+  @Test(expected = classOf[IllegalStateException])
+  def testPreparingRebalanceToPreparingRebalanceIllegalTransition() {
+    group.transitionTo(PreparingRebalance)
+    group.transitionTo(PreparingRebalance)
+  }
+
+  @Test(expected = classOf[IllegalStateException])
+  def testPreparingRebalanceToStableIllegalTransition() {
+    group.transitionTo(PreparingRebalance)
+    group.transitionTo(Stable)
+  }
+
+  @Test(expected = classOf[IllegalStateException])
+  def testRebalancingToRebalancingIllegalTransition() {
+    group.transitionTo(PreparingRebalance)
+    group.transitionTo(Rebalancing)
+    group.transitionTo(Rebalancing)
+  }
+
+  @Test(expected = classOf[IllegalStateException])
+  def testRebalancingToPreparingRebalanceTransition() {
+    group.transitionTo(PreparingRebalance)
+    group.transitionTo(Rebalancing)
+    group.transitionTo(PreparingRebalance)
+  }
+
+  @Test(expected = classOf[IllegalStateException])
+  def testRebalancingToDeadIllegalTransition() {
+    group.transitionTo(PreparingRebalance)
+    group.transitionTo(Rebalancing)
+    group.transitionTo(Dead)
+  }
+
+  @Test(expected = classOf[IllegalStateException])
+  def testDeadToDeadIllegalTransition() {
+    group.transitionTo(PreparingRebalance)
+    group.transitionTo(Dead)
+    group.transitionTo(Dead)
+  }
+
+  @Test(expected = classOf[IllegalStateException])
+  def testDeadToStableIllegalTransition() {
+    group.transitionTo(PreparingRebalance)
+    group.transitionTo(Dead)
+    group.transitionTo(Stable)
+  }
+
+  @Test(expected = classOf[IllegalStateException])
+  def testDeadToPreparingRebalanceIllegalTransition() {
+    group.transitionTo(PreparingRebalance)
+    group.transitionTo(Dead)
+    group.transitionTo(PreparingRebalance)
+  }
+
+  @Test(expected = classOf[IllegalStateException])
+  def testDeadToRebalancingIllegalTransition() {
+    group.transitionTo(PreparingRebalance)
+    group.transitionTo(Dead)
+    group.transitionTo(Rebalancing)
+  }
+
+  private def assertState(group: ConsumerGroupMetadata, targetState: GroupState) {
+    val states: Set[GroupState] = Set(Stable, PreparingRebalance, Rebalancing, Dead)
+    val otherStates = states - targetState
+    otherStates.foreach { otherState =>
+      assertFalse(group.is(otherState))
+    }
+    assertTrue(group.is(targetState))
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0ad64662/core/src/test/scala/unit/kafka/coordinator/GroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupTest.scala
deleted file mode 100644
index 6561a1d..0000000
--- a/core/src/test/scala/unit/kafka/coordinator/GroupTest.scala
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.coordinator
-
-import junit.framework.Assert._
-import org.junit.{Before, Test}
-import org.scalatest.junit.JUnitSuite
-
-/**
- * Test group state transitions
- */
-class GroupTest extends JUnitSuite {
-  var group: Group = null
-
-  @Before
-  def setUp() {
-    group = new Group("test", "range")
-  }
-
-  @Test
-  def testCanRebalanceWhenStable() {
-    assertTrue(group.canRebalance)
-  }
-
-  @Test
-  def testCannotRebalanceWhenPreparingRebalance() {
-    group.transitionTo(PreparingRebalance)
-    assertFalse(group.canRebalance)
-  }
-
-  @Test
-  def testCannotRebalanceWhenRebalancing() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(Rebalancing)
-    assertFalse(group.canRebalance)
-  }
-
-  @Test
-  def testCannotRebalanceWhenDead() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(Dead)
-    assertFalse(group.canRebalance)
-  }
-
-  @Test
-  def testStableToPreparingRebalanceTransition() {
-    group.transitionTo(PreparingRebalance)
-    assertState(group, PreparingRebalance)
-  }
-
-  @Test
-  def testPreparingRebalanceToRebalancingTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(Rebalancing)
-    assertState(group, Rebalancing)
-  }
-
-  @Test
-  def testPreparingRebalanceToDeadTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(Dead)
-    assertState(group, Dead)
-  }
-
-  @Test
-  def testRebalancingToStableTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(Rebalancing)
-    group.transitionTo(Stable)
-    assertState(group, Stable)
-  }
-
-  @Test(expected = classOf[IllegalStateException])
-  def testStableToStableIllegalTransition() {
-    group.transitionTo(Stable)
-  }
-
-  @Test(expected = classOf[IllegalStateException])
-  def testStableToRebalancingIllegalTransition() {
-    group.transitionTo(Rebalancing)
-  }
-
-  @Test(expected = classOf[IllegalStateException])
-  def testStableToDeadIllegalTransition() {
-    group.transitionTo(Dead)
-  }
-
-  @Test(expected = classOf[IllegalStateException])
-  def testPreparingRebalanceToPreparingRebalanceIllegalTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(PreparingRebalance)
-  }
-
-  @Test(expected = classOf[IllegalStateException])
-  def testPreparingRebalanceToStableIllegalTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(Stable)
-  }
-
-  @Test(expected = classOf[IllegalStateException])
-  def testRebalancingToRebalancingIllegalTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(Rebalancing)
-    group.transitionTo(Rebalancing)
-  }
-
-  @Test(expected = classOf[IllegalStateException])
-  def testRebalancingToPreparingRebalanceTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(Rebalancing)
-    group.transitionTo(PreparingRebalance)
-  }
-
-  @Test(expected = classOf[IllegalStateException])
-  def testRebalancingToDeadIllegalTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(Rebalancing)
-    group.transitionTo(Dead)
-  }
-
-  @Test(expected = classOf[IllegalStateException])
-  def testDeadToDeadIllegalTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(Dead)
-    group.transitionTo(Dead)
-  }
-
-  @Test(expected = classOf[IllegalStateException])
-  def testDeadToStableIllegalTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(Dead)
-    group.transitionTo(Stable)
-  }
-
-  @Test(expected = classOf[IllegalStateException])
-  def testDeadToPreparingRebalanceIllegalTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(Dead)
-    group.transitionTo(PreparingRebalance)
-  }
-
-  @Test(expected = classOf[IllegalStateException])
-  def testDeadToRebalancingIllegalTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(Dead)
-    group.transitionTo(Rebalancing)
-  }
-
-  private def assertState(group: Group, targetState: GroupState) {
-    val states: Set[GroupState] = Set(Stable, PreparingRebalance, Rebalancing, Dead)
-    val otherStates = states - targetState
-    otherStates.foreach { otherState =>
-      assertFalse(group.is(otherState))
-    }
-    assertTrue(group.is(targetState))
-  }
-}


Mime
View raw message