kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/8] kafka git commit: KAFKA-2464: client-side assignment for new consumer
Date Wed, 21 Oct 2015 19:08:46 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/other/kafka/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala
index 549a96b..a77979a 100644
--- a/core/src/test/scala/other/kafka/TestOffsetManager.scala
+++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala
@@ -135,8 +135,8 @@ object TestOffsetManager {
       val id = random.nextInt().abs % numGroups
       val group = "group-" + id
       try {
-        metadataChannel.send(ConsumerMetadataRequest(group))
-        val coordinatorId = ConsumerMetadataResponse.readFrom(metadataChannel.receive().payload()).coordinatorOpt.map(_.id).getOrElse(-1)
+        metadataChannel.send(GroupMetadataRequest(group))
+        val coordinatorId = GroupMetadataResponse.readFrom(metadataChannel.receive().payload()).coordinatorOpt.map(_.id).getOrElse(-1)
 
         val channel = if (channels.contains(coordinatorId))
           channels(coordinatorId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
index e2a75e2..1266598 100644
--- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
@@ -35,18 +35,18 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
   private val HostsString = Hosts.mkString(AclCommand.Delimiter.toString)
 
   private val TopicResources = Set(new Resource(Topic, "test-1"), new Resource(Topic, "test-2"))
-  private val ConsumerGroupResources = Set(new Resource(ConsumerGroup, "testGroup-1"), new Resource(ConsumerGroup, "testGroup-2"))
+  private val GroupResources = Set(new Resource(Group, "testGroup-1"), new Resource(Group, "testGroup-2"))
 
   private val ResourceToCommand = Map[Set[Resource], Array[String]](
     TopicResources -> Array("--topic", "test-1,test-2"),
     Set(Resource.ClusterResource) -> Array("--cluster"),
-    ConsumerGroupResources -> Array("--consumer-group", "testGroup-1,testGroup-2")
+    GroupResources -> Array("--group", "testGroup-1,testGroup-2")
   )
 
   private val ResourceToOperations = Map[Set[Resource], (Set[Operation], Array[String])](
     TopicResources -> (Set(Read, Write, Describe), Array("--operations", "Read,Write,Describe")),
     Set(Resource.ClusterResource) -> (Set(Create, ClusterAction), Array("--operations", "Create,ClusterAction")),
-    ConsumerGroupResources -> (Set(Read).toSet[Operation], Array("--operations", "Read"))
+    GroupResources -> (Set(Read).toSet[Operation], Array("--operations", "Read"))
   )
 
   private val ProducerResourceToAcls = Map[Set[Resource], Set[Acl]](
@@ -56,7 +56,7 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
 
   private val ConsumerResourceToAcls = Map[Set[Resource], Set[Acl]](
     TopicResources -> AclCommand.getAcls(Users, Allow, Set(Read, Describe), Hosts),
-    ConsumerGroupResources -> AclCommand.getAcls(Users, Allow, Set(Read), Hosts)
+    GroupResources -> AclCommand.getAcls(Users, Allow, Set(Read), Hosts)
   )
 
   private val CmdToResourcesToAcl = Map[Array[String], Map[Set[Resource], Set[Acl]]](

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index cab4813..820a825 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -24,7 +24,7 @@ import kafka.zk.ZooKeeperTestHarness
 import kafka.server.ConfigType
 import kafka.admin.TopicCommand.TopicCommandOptions
 import kafka.utils.ZkUtils._
-import kafka.coordinator.ConsumerCoordinator
+import kafka.coordinator.GroupCoordinator
 
 class TopicCommandTest extends ZooKeeperTestHarness with Logging {
 
@@ -85,12 +85,12 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging {
     // create the offset topic
     val createOffsetTopicOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString,
       "--replication-factor", "1",
-      "--topic", ConsumerCoordinator.OffsetsTopicName))
+      "--topic", GroupCoordinator.OffsetsTopicName))
     TopicCommand.createTopic(zkUtils, createOffsetTopicOpts)
 
     // try to delete the OffsetManager.OffsetsTopicName and make sure it doesn't
-    val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", ConsumerCoordinator.OffsetsTopicName))
-    val deleteOffsetTopicPath = getDeleteTopicPath(ConsumerCoordinator.OffsetsTopicName)
+    val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", GroupCoordinator.OffsetsTopicName))
+    val deleteOffsetTopicPath = getDeleteTopicPath(GroupCoordinator.OffsetsTopicName)
     assertFalse("Delete path for topic shouldn't exist before deletion.", zkUtils.zkClient.exists(deleteOffsetTopicPath))
     intercept[AdminOperationException] {
         TopicCommand.deleteTopic(zkUtils, deleteOffsetTopicOpts)

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index b7e7967..09e9ce3 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -231,12 +231,12 @@ object SerializationTestUtils {
     ))
   }
 
-  def createConsumerMetadataRequest: ConsumerMetadataRequest = {
-    ConsumerMetadataRequest("group 1", clientId = "client 1")
+  def createConsumerMetadataRequest: GroupMetadataRequest = {
+    GroupMetadataRequest("group 1", clientId = "client 1")
   }
 
-  def createConsumerMetadataResponse: ConsumerMetadataResponse = {
-    ConsumerMetadataResponse(Some(brokers.head.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), ErrorMapping.NoError, 0)
+  def createConsumerMetadataResponse: GroupMetadataResponse = {
+    GroupMetadataResponse(Some(brokers.head.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), ErrorMapping.NoError, 0)
   }
 
   def createUpdateMetadataRequest(versionId: Short): UpdateMetadataRequest = {
@@ -276,7 +276,7 @@ class RequestResponseSerializationTest extends JUnitSuite {
   private val offsetFetchResponse = SerializationTestUtils.createTestOffsetFetchResponse
   private val consumerMetadataRequest = SerializationTestUtils.createConsumerMetadataRequest
   private val consumerMetadataResponse = SerializationTestUtils.createConsumerMetadataResponse
-  private val consumerMetadataResponseNoCoordinator = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, 0)
+  private val consumerMetadataResponseNoCoordinator = GroupMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, 0)
   private val updateMetadataRequestV0 = SerializationTestUtils.createUpdateMetadataRequest(0)
   private val updateMetadataRequestV1 = SerializationTestUtils.createUpdateMetadataRequest(1)
   private val updateMetdataResponse = SerializationTestUtils.createUpdateMetadataResponse

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
index 2e18e92..24fba45 100644
--- a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
@@ -22,7 +22,7 @@ import org.junit.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.Test
 import kafka.server.OffsetManager
-import kafka.coordinator.ConsumerCoordinator
+import kafka.coordinator.GroupCoordinator
 
 
 class TopicFilterTest extends JUnitSuite {
@@ -38,8 +38,8 @@ class TopicFilterTest extends JUnitSuite {
 
     val topicFilter2 = new Whitelist(".+")
     assertTrue(topicFilter2.isTopicAllowed("alltopics", excludeInternalTopics = true))
-    assertFalse(topicFilter2.isTopicAllowed(ConsumerCoordinator.OffsetsTopicName, excludeInternalTopics = true))
-    assertTrue(topicFilter2.isTopicAllowed(ConsumerCoordinator.OffsetsTopicName, excludeInternalTopics = false))
+    assertFalse(topicFilter2.isTopicAllowed(GroupCoordinator.OffsetsTopicName, excludeInternalTopics = true))
+    assertTrue(topicFilter2.isTopicAllowed(GroupCoordinator.OffsetsTopicName, excludeInternalTopics = false))
 
     val topicFilter3 = new Whitelist("white_listed-topic.+")
     assertTrue(topicFilter3.isTopicAllowed("white_listed-topic1", excludeInternalTopics = true))
@@ -58,8 +58,8 @@ class TopicFilterTest extends JUnitSuite {
     assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = true))
     assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = false))
 
-    assertFalse(topicFilter1.isTopicAllowed(ConsumerCoordinator.OffsetsTopicName, excludeInternalTopics = true))
-    assertTrue(topicFilter1.isTopicAllowed(ConsumerCoordinator.OffsetsTopicName, excludeInternalTopics = false))
+    assertFalse(topicFilter1.isTopicAllowed(GroupCoordinator.OffsetsTopicName, excludeInternalTopics = true))
+    assertTrue(topicFilter1.isTopicAllowed(GroupCoordinator.OffsetsTopicName, excludeInternalTopics = false))
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
deleted file mode 100644
index c108955..0000000
--- a/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
+++ /dev/null
@@ -1,447 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.coordinator
-
-
-import java.util.concurrent.TimeUnit
-
-import org.junit.Assert._
-import kafka.common.{OffsetAndMetadata, TopicAndPartition}
-import kafka.server.{OffsetManager, KafkaConfig}
-import kafka.utils.TestUtils
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{OffsetCommitRequest, JoinGroupRequest}
-import org.easymock.{IAnswer, EasyMock}
-import org.junit.{After, Before, Test}
-import org.scalatest.junit.JUnitSuite
-
-import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, Future, Promise}
-
-/**
- * Test ConsumerCoordinator responses
- */
-class ConsumerCoordinatorResponseTest extends JUnitSuite {
-  type JoinGroupCallbackParams = (Set[TopicAndPartition], String, Int, Short)
-  type JoinGroupCallback = (Set[TopicAndPartition], String, Int, Short) => Unit
-  type HeartbeatCallbackParams = Short
-  type HeartbeatCallback = Short => Unit
-  type CommitOffsetCallbackParams = Map[TopicAndPartition, Short]
-  type CommitOffsetCallback = Map[TopicAndPartition, Short] => Unit
-  type LeaveGroupCallbackParams = Short
-  type LeaveGroupCallback = Short => Unit
-
-  val ConsumerMinSessionTimeout = 10
-  val ConsumerMaxSessionTimeout = 200
-  val DefaultSessionTimeout = 100
-  var consumerCoordinator: ConsumerCoordinator = null
-  var offsetManager : OffsetManager = null
-
-  @Before
-  def setUp() {
-    val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")
-    props.setProperty(KafkaConfig.ConsumerMinSessionTimeoutMsProp, ConsumerMinSessionTimeout.toString)
-    props.setProperty(KafkaConfig.ConsumerMaxSessionTimeoutMsProp, ConsumerMaxSessionTimeout.toString)
-    offsetManager = EasyMock.createStrictMock(classOf[OffsetManager])
-    consumerCoordinator = ConsumerCoordinator.create(KafkaConfig.fromProps(props), null, offsetManager)
-    consumerCoordinator.startup()
-  }
-
-  @After
-  def tearDown() {
-    EasyMock.reset(offsetManager)
-    consumerCoordinator.shutdown()
-  }
-
-  @Test
-  def testJoinGroupWrongCoordinator() {
-    val groupId = "groupId"
-    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
-    val partitionAssignmentStrategy = "range"
-
-    val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = false)
-    val joinGroupErrorCode = joinGroupResult._4
-    assertEquals(Errors.NOT_COORDINATOR_FOR_CONSUMER.code, joinGroupErrorCode)
-  }
-
-  @Test
-  def testJoinGroupUnknownPartitionAssignmentStrategy() {
-    val groupId = "groupId"
-    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
-    val partitionAssignmentStrategy = "foo"
-
-    val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
-    val joinGroupErrorCode = joinGroupResult._4
-    assertEquals(Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code, joinGroupErrorCode)
-  }
-
-  @Test
-  def testJoinGroupSessionTimeoutTooSmall() {
-    val groupId = "groupId"
-    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
-    val partitionAssignmentStrategy = "range"
-
-    val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, ConsumerMinSessionTimeout - 1, isCoordinatorForGroup = true)
-    val joinGroupErrorCode = joinGroupResult._4
-    assertEquals(Errors.INVALID_SESSION_TIMEOUT.code, joinGroupErrorCode)
-  }
-
-  @Test
-  def testJoinGroupSessionTimeoutTooLarge() {
-    val groupId = "groupId"
-    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
-    val partitionAssignmentStrategy = "range"
-
-    val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, ConsumerMaxSessionTimeout + 1, isCoordinatorForGroup = true)
-    val joinGroupErrorCode = joinGroupResult._4
-    assertEquals(Errors.INVALID_SESSION_TIMEOUT.code, joinGroupErrorCode)
-  }
-
-  @Test
-  def testJoinGroupUnknownConsumerNewGroup() {
-    val groupId = "groupId"
-    val consumerId = "consumerId"
-    val partitionAssignmentStrategy = "range"
-
-    val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
-    val joinGroupErrorCode = joinGroupResult._4
-    assertEquals(Errors.UNKNOWN_CONSUMER_ID.code, joinGroupErrorCode)
-  }
-
-  @Test
-  def testValidJoinGroup() {
-    val groupId = "groupId"
-    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
-    val partitionAssignmentStrategy = "range"
-
-    val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
-    val joinGroupErrorCode = joinGroupResult._4
-    assertEquals(Errors.NONE.code, joinGroupErrorCode)
-  }
-
-  @Test
-  def testJoinGroupInconsistentPartitionAssignmentStrategy() {
-    val groupId = "groupId"
-    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
-    val otherConsumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
-    val partitionAssignmentStrategy = "range"
-    val otherPartitionAssignmentStrategy = "roundrobin"
-
-    val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
-    val joinGroupErrorCode = joinGroupResult._4
-    assertEquals(Errors.NONE.code, joinGroupErrorCode)
-
-    EasyMock.reset(offsetManager)
-    val otherJoinGroupResult = joinGroup(groupId, otherConsumerId, otherPartitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
-    val otherJoinGroupErrorCode = otherJoinGroupResult._4
-    assertEquals(Errors.INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY.code, otherJoinGroupErrorCode)
-  }
-
-  @Test
-  def testJoinGroupUnknownConsumerExistingGroup() {
-    val groupId = "groupId"
-    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
-    val otherConsumerId = "consumerId"
-    val partitionAssignmentStrategy = "range"
-
-    val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
-    val joinGroupErrorCode = joinGroupResult._4
-    assertEquals(Errors.NONE.code, joinGroupErrorCode)
-
-    EasyMock.reset(offsetManager)
-    val otherJoinGroupResult = joinGroup(groupId, otherConsumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
-    val otherJoinGroupErrorCode = otherJoinGroupResult._4
-    assertEquals(Errors.UNKNOWN_CONSUMER_ID.code, otherJoinGroupErrorCode)
-  }
-
-  @Test
-  def testHeartbeatWrongCoordinator() {
-    val groupId = "groupId"
-    val consumerId = "consumerId"
-
-    val heartbeatResult = heartbeat(groupId, consumerId, -1, isCoordinatorForGroup = false)
-    assertEquals(Errors.NOT_COORDINATOR_FOR_CONSUMER.code, heartbeatResult)
-  }
-
-  @Test
-  def testHeartbeatUnknownGroup() {
-    val groupId = "groupId"
-    val consumerId = "consumerId"
-
-    val heartbeatResult = heartbeat(groupId, consumerId, -1, isCoordinatorForGroup = true)
-    assertEquals(Errors.UNKNOWN_CONSUMER_ID.code, heartbeatResult)
-  }
-
-  @Test
-  def testHeartbeatUnknownConsumerExistingGroup() {
-    val groupId = "groupId"
-    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
-    val otherConsumerId = "consumerId"
-    val partitionAssignmentStrategy = "range"
-
-    val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
-    val joinGroupErrorCode = joinGroupResult._4
-    assertEquals(Errors.NONE.code, joinGroupErrorCode)
-
-    EasyMock.reset(offsetManager)
-    val heartbeatResult = heartbeat(groupId, otherConsumerId, 1, isCoordinatorForGroup = true)
-    assertEquals(Errors.UNKNOWN_CONSUMER_ID.code, heartbeatResult)
-  }
-
-  @Test
-  def testHeartbeatIllegalGeneration() {
-    val groupId = "groupId"
-    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
-    val partitionAssignmentStrategy = "range"
-
-    val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
-    val assignedConsumerId = joinGroupResult._2
-    val joinGroupErrorCode = joinGroupResult._4
-    assertEquals(Errors.NONE.code, joinGroupErrorCode)
-
-    EasyMock.reset(offsetManager)
-    val heartbeatResult = heartbeat(groupId, assignedConsumerId, 2, isCoordinatorForGroup = true)
-    assertEquals(Errors.ILLEGAL_GENERATION.code, heartbeatResult)
-  }
-
-  @Test
-  def testValidHeartbeat() {
-    val groupId = "groupId"
-    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
-    val partitionAssignmentStrategy = "range"
-
-    val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
-    val assignedConsumerId = joinGroupResult._2
-    val joinGroupErrorCode = joinGroupResult._4
-    assertEquals(Errors.NONE.code, joinGroupErrorCode)
-
-    EasyMock.reset(offsetManager)
-    val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1, isCoordinatorForGroup = true)
-    assertEquals(Errors.NONE.code, heartbeatResult)
-  }
-
-  @Test
-  def testCommitOffsetFromUnknownGroup() {
-    val groupId = "groupId"
-    val consumerId = "consumer"
-    val generationId = 1
-    val tp = new TopicAndPartition("topic", 0)
-    val offset = OffsetAndMetadata(0)
-
-    val commitOffsetResult = commitOffsets(groupId, consumerId, generationId, Map(tp -> offset), true)
-    assertEquals(Errors.ILLEGAL_GENERATION.code, commitOffsetResult(tp))
-  }
-
-  @Test
-  def testCommitOffsetWithDefaultGeneration() {
-    val groupId = "groupId"
-    val tp = new TopicAndPartition("topic", 0)
-    val offset = OffsetAndMetadata(0)
-
-    val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_CONSUMER_ID,
-      OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset), true)
-    assertEquals(Errors.NONE.code, commitOffsetResult(tp))
-  }
-
-  @Test
-  def testHeartbeatDuringRebalanceCausesRebalanceInProgress() {
-    val groupId = "groupId"
-    val partitionAssignmentStrategy = "range"
-
-    // First start up a group (with a slightly larger timeout to give us time to heartbeat when the rebalance starts)
-    val joinGroupResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_CONSUMER_ID, partitionAssignmentStrategy,
-      DefaultSessionTimeout, isCoordinatorForGroup = true)
-    val assignedConsumerId = joinGroupResult._2
-    val initialGenerationId = joinGroupResult._3
-    val joinGroupErrorCode = joinGroupResult._4
-    assertEquals(Errors.NONE.code, joinGroupErrorCode)
-
-    // Then join with a new consumer to trigger a rebalance
-    EasyMock.reset(offsetManager)
-    sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_CONSUMER_ID, partitionAssignmentStrategy,
-      DefaultSessionTimeout, isCoordinatorForGroup = true)
-
-    // We should be in the middle of a rebalance, so the heartbeat should return rebalance in progress
-    EasyMock.reset(offsetManager)
-    val heartbeatResult = heartbeat(groupId, assignedConsumerId, initialGenerationId, isCoordinatorForGroup = true)
-    assertEquals(Errors.REBALANCE_IN_PROGRESS.code, heartbeatResult)
-  }
-
-  @Test
-  def testGenerationIdIncrementsOnRebalance() {
-    val groupId = "groupId"
-    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
-    val otherConsumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
-    val partitionAssignmentStrategy = "range"
-
-    val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
-    val initialGenerationId = joinGroupResult._3
-    val joinGroupErrorCode = joinGroupResult._4
-    assertEquals(1, initialGenerationId)
-    assertEquals(Errors.NONE.code, joinGroupErrorCode)
-
-    EasyMock.reset(offsetManager)
-    val otherJoinGroupResult = joinGroup(groupId, otherConsumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
-    val nextGenerationId = otherJoinGroupResult._3
-    val otherJoinGroupErrorCode = otherJoinGroupResult._4
-    assertEquals(2, nextGenerationId)
-    assertEquals(Errors.NONE.code, otherJoinGroupErrorCode)
-  }
-
-  @Test
-  def testLeaveGroupWrongCoordinator() {
-    val groupId = "groupId"
-    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
-
-    val leaveGroupResult = leaveGroup(groupId, consumerId, isCoordinatorForGroup = false)
-    assertEquals(Errors.NOT_COORDINATOR_FOR_CONSUMER.code, leaveGroupResult)
-  }
-
-  @Test
-  def testLeaveGroupUnknownGroup() {
-    val groupId = "groupId"
-    val consumerId = "consumerId"
-
-    val leaveGroupResult = leaveGroup(groupId, consumerId, isCoordinatorForGroup = true)
-    assertEquals(Errors.UNKNOWN_CONSUMER_ID.code, leaveGroupResult)
-  }
-
-  @Test
-  def testLeaveGroupUnknownConsumerExistingGroup() {
-    val groupId = "groupId"
-    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
-    val otherConsumerId = "consumerId"
-    val partitionAssignmentStrategy = "range"
-
-    val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
-    val joinGroupErrorCode = joinGroupResult._4
-    assertEquals(Errors.NONE.code, joinGroupErrorCode)
-
-    EasyMock.reset(offsetManager)
-    val leaveGroupResult = leaveGroup(groupId, otherConsumerId, isCoordinatorForGroup = true)
-    assertEquals(Errors.UNKNOWN_CONSUMER_ID.code, leaveGroupResult)
-  }
-
-  @Test
-  def testValidLeaveGroup() {
-    val groupId = "groupId"
-    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
-    val partitionAssignmentStrategy = "range"
-
-    val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
-    val assignedConsumerId = joinGroupResult._2
-    val joinGroupErrorCode = joinGroupResult._4
-    assertEquals(Errors.NONE.code, joinGroupErrorCode)
-
-    EasyMock.reset(offsetManager)
-    val leaveGroupResult = leaveGroup(groupId, assignedConsumerId, isCoordinatorForGroup = true)
-    assertEquals(Errors.NONE.code, leaveGroupResult)
-  }
-
-  private def setupJoinGroupCallback: (Future[JoinGroupCallbackParams], JoinGroupCallback) = {
-    val responsePromise = Promise[JoinGroupCallbackParams]
-    val responseFuture = responsePromise.future
-    val responseCallback: JoinGroupCallback = (partitions, consumerId, generationId, errorCode) =>
-      responsePromise.success((partitions, consumerId, generationId, errorCode))
-    (responseFuture, responseCallback)
-  }
-
-  private def setupHeartbeatCallback: (Future[HeartbeatCallbackParams], HeartbeatCallback) = {
-    val responsePromise = Promise[HeartbeatCallbackParams]
-    val responseFuture = responsePromise.future
-    val responseCallback: HeartbeatCallback = errorCode => responsePromise.success(errorCode)
-    (responseFuture, responseCallback)
-  }
-
-  private def setupCommitOffsetsCallback: (Future[CommitOffsetCallbackParams], CommitOffsetCallback) = {
-    val responsePromise = Promise[CommitOffsetCallbackParams]
-    val responseFuture = responsePromise.future
-    val responseCallback: CommitOffsetCallback = offsets => responsePromise.success(offsets)
-    (responseFuture, responseCallback)
-  }
-
-  private def setupLeaveGroupCallback: (Future[LeaveGroupCallbackParams], LeaveGroupCallback) = {
-    val responsePromise = Promise[LeaveGroupCallbackParams]
-    val responseFuture = responsePromise.future
-    val responseCallback: LeaveGroupCallback = errorCode => responsePromise.success(errorCode)
-    (responseFuture, responseCallback)
-  }
-
-  private def sendJoinGroup(groupId: String,
-                            consumerId: String,
-                            partitionAssignmentStrategy: String,
-                            sessionTimeout: Int,
-                            isCoordinatorForGroup: Boolean): Future[JoinGroupCallbackParams] = {
-    val (responseFuture, responseCallback) = setupJoinGroupCallback
-    EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1)
-    EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup)
-    EasyMock.replay(offsetManager)
-    consumerCoordinator.handleJoinGroup(groupId, consumerId, Set.empty, sessionTimeout, partitionAssignmentStrategy, responseCallback)
-    responseFuture
-  }
-
-  private def joinGroup(groupId: String,
-                        consumerId: String,
-                        partitionAssignmentStrategy: String,
-                        sessionTimeout: Int,
-                        isCoordinatorForGroup: Boolean): JoinGroupCallbackParams = {
-    val responseFuture = sendJoinGroup(groupId, consumerId, partitionAssignmentStrategy, sessionTimeout, isCoordinatorForGroup)
-    // should only have to wait as long as session timeout, but allow some extra time in case of an unexpected delay
-    Await.result(responseFuture, Duration(sessionTimeout+100, TimeUnit.MILLISECONDS))
-  }
-
-  private def heartbeat(groupId: String,
-                        consumerId: String,
-                        generationId: Int,
-                        isCoordinatorForGroup: Boolean): HeartbeatCallbackParams = {
-    val (responseFuture, responseCallback) = setupHeartbeatCallback
-    EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1)
-    EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup)
-    EasyMock.replay(offsetManager)
-    consumerCoordinator.handleHeartbeat(groupId, consumerId, generationId, responseCallback)
-    Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
-  }
-
-  private def commitOffsets(groupId: String,
-                            consumerId: String,
-                            generationId: Int,
-                            offsets: Map[TopicAndPartition, OffsetAndMetadata],
-                            isCoordinatorForGroup: Boolean): CommitOffsetCallbackParams = {
-    val (responseFuture, responseCallback) = setupCommitOffsetsCallback
-    EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1)
-    EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup)
-    val storeOffsetAnswer = new IAnswer[Unit] {
-      override def answer = responseCallback.apply(offsets.mapValues(_ => Errors.NONE.code))
-    }
-    EasyMock.expect(offsetManager.storeOffsets(groupId, consumerId, generationId, offsets, responseCallback))
-      .andAnswer(storeOffsetAnswer)
-    EasyMock.replay(offsetManager)
-    consumerCoordinator.handleCommitOffsets(groupId, consumerId, generationId, offsets, responseCallback)
-    Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
-    Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
-  }
-
-  private def leaveGroup(groupId: String, consumerId: String, isCoordinatorForGroup: Boolean): LeaveGroupCallbackParams = {
-    val (responseFuture, responseCallback) = setupHeartbeatCallback
-    EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1)
-    EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup)
-    EasyMock.replay(offsetManager)
-    consumerCoordinator.handleLeaveGroup(groupId, consumerId, responseCallback)
-    Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/unit/kafka/coordinator/ConsumerGroupMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/ConsumerGroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/ConsumerGroupMetadataTest.scala
deleted file mode 100644
index 5d812c2..0000000
--- a/core/src/test/scala/unit/kafka/coordinator/ConsumerGroupMetadataTest.scala
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.coordinator
-
-import org.junit.Assert._
-import org.junit.{Before, Test}
-import org.scalatest.junit.JUnitSuite
-
-/**
- * Test group state transitions
- */
-class ConsumerGroupMetadataTest extends JUnitSuite {
-  var group: ConsumerGroupMetadata = null
-
-  @Before
-  def setUp() {
-    group = new ConsumerGroupMetadata("test", "range")
-  }
-
-  @Test
-  def testCanRebalanceWhenStable() {
-    assertTrue(group.canRebalance)
-  }
-
-  @Test
-  def testCannotRebalanceWhenPreparingRebalance() {
-    group.transitionTo(PreparingRebalance)
-    assertFalse(group.canRebalance)
-  }
-
-  @Test
-  def testCannotRebalanceWhenRebalancing() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(Rebalancing)
-    assertFalse(group.canRebalance)
-  }
-
-  @Test
-  def testCannotRebalanceWhenDead() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(Dead)
-    assertFalse(group.canRebalance)
-  }
-
-  @Test
-  def testStableToPreparingRebalanceTransition() {
-    group.transitionTo(PreparingRebalance)
-    assertState(group, PreparingRebalance)
-  }
-
-  @Test
-  def testPreparingRebalanceToRebalancingTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(Rebalancing)
-    assertState(group, Rebalancing)
-  }
-
-  @Test
-  def testPreparingRebalanceToDeadTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(Dead)
-    assertState(group, Dead)
-  }
-
-  @Test
-  def testRebalancingToStableTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(Rebalancing)
-    group.transitionTo(Stable)
-    assertState(group, Stable)
-  }
-
-  @Test(expected = classOf[IllegalStateException])
-  def testStableToStableIllegalTransition() {
-    group.transitionTo(Stable)
-  }
-
-  @Test(expected = classOf[IllegalStateException])
-  def testStableToRebalancingIllegalTransition() {
-    group.transitionTo(Rebalancing)
-  }
-
-  @Test(expected = classOf[IllegalStateException])
-  def testStableToDeadIllegalTransition() {
-    group.transitionTo(Dead)
-  }
-
-  @Test(expected = classOf[IllegalStateException])
-  def testPreparingRebalanceToPreparingRebalanceIllegalTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(PreparingRebalance)
-  }
-
-  @Test(expected = classOf[IllegalStateException])
-  def testPreparingRebalanceToStableIllegalTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(Stable)
-  }
-
-  @Test(expected = classOf[IllegalStateException])
-  def testRebalancingToRebalancingIllegalTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(Rebalancing)
-    group.transitionTo(Rebalancing)
-  }
-
-  @Test(expected = classOf[IllegalStateException])
-  def testRebalancingToPreparingRebalanceTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(Rebalancing)
-    group.transitionTo(PreparingRebalance)
-  }
-
-  @Test(expected = classOf[IllegalStateException])
-  def testRebalancingToDeadIllegalTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(Rebalancing)
-    group.transitionTo(Dead)
-  }
-
-  @Test(expected = classOf[IllegalStateException])
-  def testDeadToDeadIllegalTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(Dead)
-    group.transitionTo(Dead)
-  }
-
-  @Test(expected = classOf[IllegalStateException])
-  def testDeadToStableIllegalTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(Dead)
-    group.transitionTo(Stable)
-  }
-
-  @Test(expected = classOf[IllegalStateException])
-  def testDeadToPreparingRebalanceIllegalTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(Dead)
-    group.transitionTo(PreparingRebalance)
-  }
-
-  @Test(expected = classOf[IllegalStateException])
-  def testDeadToRebalancingIllegalTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(Dead)
-    group.transitionTo(Rebalancing)
-  }
-
-  private def assertState(group: ConsumerGroupMetadata, targetState: GroupState) {
-    val states: Set[GroupState] = Set(Stable, PreparingRebalance, Rebalancing, Dead)
-    val otherStates = states - targetState
-    otherStates.foreach { otherState =>
-      assertFalse(group.is(otherState))
-    }
-    assertTrue(group.is(targetState))
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala
index 3bc37e5..49a237b 100644
--- a/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala
@@ -18,13 +18,9 @@
 package kafka.coordinator
 
 import kafka.server.KafkaConfig
-import kafka.utils.{TestUtils, ZkUtils}
-import kafka.utils.ZkUtils._
+import kafka.utils.TestUtils
 
 import org.junit.Assert._
-import org.I0Itec.zkclient.{IZkDataListener, ZkClient}
-import org.apache.zookeeper.data.Stat
-import org.easymock.EasyMock
 import org.junit.{Before, Test}
 import org.scalatest.junit.JUnitSuite
 
@@ -34,15 +30,12 @@ import org.scalatest.junit.JUnitSuite
 class CoordinatorMetadataTest extends JUnitSuite {
   val DefaultNumPartitions = 8
   val DefaultNumReplicas = 2
-  var zkUtils: ZkUtils = null
   var coordinatorMetadata: CoordinatorMetadata = null
 
   @Before
   def setUp() {
     val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")
-    val zkClient = EasyMock.createStrictMock(classOf[ZkClient])
-    zkUtils = ZkUtils(zkClient, false)
-    coordinatorMetadata = new CoordinatorMetadata(KafkaConfig.fromProps(props).brokerId, zkUtils, null)
+    coordinatorMetadata = new CoordinatorMetadata(KafkaConfig.fromProps(props).brokerId)
   }
 
   @Test
@@ -53,7 +46,8 @@ class CoordinatorMetadataTest extends JUnitSuite {
   @Test
   def testGetGroup() {
     val groupId = "group"
-    val expected = coordinatorMetadata.addGroup(groupId, "range")
+    val protocolType = "consumer"
+    val expected = coordinatorMetadata.addGroup(groupId, protocolType)
     val actual = coordinatorMetadata.getGroup(groupId)
     assertEquals(expected, actual)
   }
@@ -61,155 +55,17 @@ class CoordinatorMetadataTest extends JUnitSuite {
   @Test
   def testAddGroupReturnsPreexistingGroupIfItAlreadyExists() {
     val groupId = "group"
-    val group1 = coordinatorMetadata.addGroup(groupId, "range")
-    val group2 = coordinatorMetadata.addGroup(groupId, "range")
+    val protocolType = "consumer"
+    val group1 = coordinatorMetadata.addGroup(groupId, protocolType)
+    val group2 = coordinatorMetadata.addGroup(groupId, protocolType)
     assertEquals(group1, group2)
   }
 
   @Test(expected = classOf[IllegalArgumentException])
-  def testBindNonexistentGroupToTopics() {
-    val groupId = "group"
-    val topics = Set("a")
-    coordinatorMetadata.bindGroupToTopics(groupId, topics)
-  }
-
-  @Test
-  def testBindGroupToTopicsNotListenedOn() {
-    val groupId = "group"
-    val topics = Set("a")
-    coordinatorMetadata.addGroup(groupId, "range")
-
-    expectZkClientSubscribeDataChanges(zkUtils, topics)
-    EasyMock.replay(zkUtils.zkClient)
-    coordinatorMetadata.bindGroupToTopics(groupId, topics)
-    assertEquals(Map("a" -> DefaultNumPartitions), coordinatorMetadata.partitionsPerTopic)
-  }
-
-  @Test
-  def testBindGroupToTopicsAlreadyListenedOn() {
-    val group1 = "group1"
-    val group2 = "group2"
-    val topics = Set("a")
-    coordinatorMetadata.addGroup(group1, "range")
-    coordinatorMetadata.addGroup(group2, "range")
-
-    expectZkClientSubscribeDataChanges(zkUtils, topics)
-    EasyMock.replay(zkUtils.zkClient)
-    coordinatorMetadata.bindGroupToTopics(group1, topics)
-    coordinatorMetadata.bindGroupToTopics(group2, topics)
-    assertEquals(Map("a" -> DefaultNumPartitions), coordinatorMetadata.partitionsPerTopic)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testUnbindNonexistentGroupFromTopics() {
-    val groupId = "group"
-    val topics = Set("a")
-    coordinatorMetadata.unbindGroupFromTopics(groupId, topics)
-  }
-
-  @Test
-  def testUnbindGroupFromTopicsNotListenedOn() {
-    val groupId = "group"
-    val topics = Set("a")
-    coordinatorMetadata.addGroup(groupId, "range")
-
-    expectZkClientSubscribeDataChanges(zkUtils, topics)
-    EasyMock.replay(zkUtils.zkClient)
-    coordinatorMetadata.bindGroupToTopics(groupId, topics)
-    coordinatorMetadata.unbindGroupFromTopics(groupId, Set("b"))
-    assertEquals(Map("a" -> DefaultNumPartitions), coordinatorMetadata.partitionsPerTopic)
-  }
-
-  @Test
-  def testUnbindGroupFromTopicsListenedOnByOtherGroups() {
-    val group1 = "group1"
-    val group2 = "group2"
-    val topics = Set("a")
-    coordinatorMetadata.addGroup(group1, "range")
-    coordinatorMetadata.addGroup(group2, "range")
-
-    expectZkClientSubscribeDataChanges(zkUtils, topics)
-    EasyMock.replay(zkUtils.zkClient)
-    coordinatorMetadata.bindGroupToTopics(group1, topics)
-    coordinatorMetadata.bindGroupToTopics(group2, topics)
-    coordinatorMetadata.unbindGroupFromTopics(group1, topics)
-    assertEquals(Map("a" -> DefaultNumPartitions), coordinatorMetadata.partitionsPerTopic)
-  }
-
-  @Test
-  def testUnbindGroupFromTopicsListenedOnByNoOtherGroup() {
-    val groupId = "group"
-    val topics = Set("a")
-    coordinatorMetadata.addGroup(groupId, "range")
-
-    expectZkClientSubscribeDataChanges(zkUtils, topics)
-    expectZkClientUnsubscribeDataChanges(zkUtils.zkClient, topics)
-    EasyMock.replay(zkUtils.zkClient)
-    coordinatorMetadata.bindGroupToTopics(groupId, topics)
-    coordinatorMetadata.unbindGroupFromTopics(groupId, topics)
-    assertEquals(Map.empty[String, Int], coordinatorMetadata.partitionsPerTopic)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
   def testRemoveNonexistentGroup() {
     val groupId = "group"
     val topics = Set("a")
-    coordinatorMetadata.removeGroup(groupId, topics)
-  }
-
-  @Test
-  def testRemoveGroupWithOtherGroupsBoundToItsTopics() {
-    val group1 = "group1"
-    val group2 = "group2"
-    val topics = Set("a")
-    coordinatorMetadata.addGroup(group1, "range")
-    coordinatorMetadata.addGroup(group2, "range")
-
-    expectZkClientSubscribeDataChanges(zkUtils, topics)
-    EasyMock.replay(zkUtils.zkClient)
-    coordinatorMetadata.bindGroupToTopics(group1, topics)
-    coordinatorMetadata.bindGroupToTopics(group2, topics)
-    coordinatorMetadata.removeGroup(group1, topics)
-    assertNull(coordinatorMetadata.getGroup(group1))
-    assertNotNull(coordinatorMetadata.getGroup(group2))
-    assertEquals(Map("a" -> DefaultNumPartitions), coordinatorMetadata.partitionsPerTopic)
-  }
-
-  @Test
-  def testRemoveGroupWithNoOtherGroupsBoundToItsTopics() {
-    val groupId = "group"
-    val topics = Set("a")
-    coordinatorMetadata.addGroup(groupId, "range")
-
-    expectZkClientSubscribeDataChanges(zkUtils, topics)
-    expectZkClientUnsubscribeDataChanges(zkUtils.zkClient, topics)
-    EasyMock.replay(zkUtils.zkClient)
-    coordinatorMetadata.bindGroupToTopics(groupId, topics)
-    coordinatorMetadata.removeGroup(groupId, topics)
-    assertNull(coordinatorMetadata.getGroup(groupId))
-    assertEquals(Map.empty[String, Int], coordinatorMetadata.partitionsPerTopic)
+    coordinatorMetadata.removeGroup(groupId)
   }
 
-  private def expectZkClientSubscribeDataChanges(zkUtils: ZkUtils, topics: Set[String]) {
-    topics.foreach(topic => expectZkClientSubscribeDataChange(zkUtils.zkClient, topic))
-  }
-
-  private def expectZkClientUnsubscribeDataChanges(zkClient: ZkClient, topics: Set[String]) {
-    topics.foreach(topic => expectZkClientUnsubscribeDataChange(zkClient, topic))
-  }
-
-  private def expectZkClientSubscribeDataChange(zkClient: ZkClient, topic: String) {
-    val replicaAssignment =
-      (0 until DefaultNumPartitions)
-      .map(partition => partition.toString -> (0 until DefaultNumReplicas).toSeq).toMap
-    val topicPath = getTopicPath(topic)
-    EasyMock.expect(zkClient.readData(topicPath, new Stat()))
-      .andReturn(zkUtils.replicaAssignmentZkData(replicaAssignment))
-    zkClient.subscribeDataChanges(EasyMock.eq(topicPath), EasyMock.isA(classOf[IZkDataListener]))
-  }
-
-  private def expectZkClientUnsubscribeDataChange(zkClient: ZkClient, topic: String) {
-    val topicPath = getTopicPath(topic)
-    zkClient.unsubscribeDataChanges(EasyMock.eq(topicPath), EasyMock.isA(classOf[IZkDataListener]))
-  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
new file mode 100644
index 0000000..cdd78ef
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -0,0 +1,907 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator
+
+
+import java.util.concurrent.TimeUnit
+
+import org.junit.Assert._
+import kafka.common.{OffsetAndMetadata, TopicAndPartition}
+import kafka.server.{OffsetManager, KafkaConfig}
+import kafka.utils.TestUtils
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{OffsetCommitRequest, JoinGroupRequest}
+import org.easymock.{IAnswer, EasyMock}
+import org.junit.{After, Before, Test}
+import org.scalatest.junit.JUnitSuite
+
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future, Promise}
+
+/**
+ * Test GroupCoordinator responses
+ */
+class GroupCoordinatorResponseTest extends JUnitSuite {
+  type JoinGroupCallback = JoinGroupResult => Unit
+  type SyncGroupCallbackParams = (Array[Byte], Short)
+  type SyncGroupCallback = (Array[Byte], Short) => Unit
+  type HeartbeatCallbackParams = Short
+  type HeartbeatCallback = Short => Unit
+  type CommitOffsetCallbackParams = Map[TopicAndPartition, Short]
+  type CommitOffsetCallback = Map[TopicAndPartition, Short] => Unit
+  type LeaveGroupCallbackParams = Short
+  type LeaveGroupCallback = Short => Unit
+
+  val ConsumerMinSessionTimeout = 10
+  val ConsumerMaxSessionTimeout = 1000
+  val DefaultSessionTimeout = 500
+  var consumerCoordinator: GroupCoordinator = null
+  var offsetManager : OffsetManager = null
+
+  @Before
+  def setUp() {
+    val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")
+    props.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, ConsumerMinSessionTimeout.toString)
+    props.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, ConsumerMaxSessionTimeout.toString)
+    offsetManager = EasyMock.createStrictMock(classOf[OffsetManager])
+    consumerCoordinator = GroupCoordinator.create(KafkaConfig.fromProps(props), null, offsetManager)
+    consumerCoordinator.startup()
+  }
+
+  @After
+  def tearDown() {
+    EasyMock.reset(offsetManager)
+    consumerCoordinator.shutdown()
+  }
+
+  @Test
+  def testJoinGroupWrongCoordinator() {
+    val groupId = "groupId"
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val metadata = Array[Byte]()
+    val protocolType = "consumer"
+    val protocols = List(("range", metadata))
+
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType,
+      protocols, isCoordinatorForGroup = false)
+    val joinGroupErrorCode = joinGroupResult.errorCode
+    assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.code, joinGroupErrorCode)
+  }
+
+  @Test
+  def testJoinGroupSessionTimeoutTooSmall() {
+    val groupId = "groupId"
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val metadata = Array[Byte]()
+    val protocolType = "consumer"
+    val protocols = List(("range", metadata))
+
+    val joinGroupResult = joinGroup(groupId, memberId, ConsumerMinSessionTimeout - 1, protocolType, protocols,
+      isCoordinatorForGroup = true)
+    val joinGroupErrorCode = joinGroupResult.errorCode
+    assertEquals(Errors.INVALID_SESSION_TIMEOUT.code, joinGroupErrorCode)
+  }
+
+  @Test
+  def testJoinGroupSessionTimeoutTooLarge() {
+    val groupId = "groupId"
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val metadata = Array[Byte]()
+    val protocolType = "consumer"
+    val protocols = List(("range", metadata))
+
+    val joinGroupResult = joinGroup(groupId, memberId, ConsumerMaxSessionTimeout + 1, protocolType, protocols,
+      isCoordinatorForGroup = true)
+    val joinGroupErrorCode = joinGroupResult.errorCode
+    assertEquals(Errors.INVALID_SESSION_TIMEOUT.code, joinGroupErrorCode)
+  }
+
+  @Test
+  def testJoinGroupUnknownConsumerNewGroup() {
+    val groupId = "groupId"
+    val memberId = "memberId"
+    val metadata = Array[Byte]()
+    val protocolType = "consumer"
+    val protocols = List(("range", metadata))
+
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols,
+      isCoordinatorForGroup = true)
+    val joinGroupErrorCode = joinGroupResult.errorCode
+    assertEquals(Errors.UNKNOWN_MEMBER_ID.code, joinGroupErrorCode)
+  }
+
+  @Test
+  def testValidJoinGroup() {
+    val groupId = "groupId"
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val metadata = Array[Byte]()
+    val protocolType = "consumer"
+    val protocols = List(("range", metadata))
+
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType,
+      protocols, isCoordinatorForGroup = true)
+    val joinGroupErrorCode = joinGroupResult.errorCode
+    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+  }
+
+  @Test
+  def testJoinGroupInconsistentProtocolType() {
+    val groupId = "groupId"
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val metadata = Array[Byte]()
+
+    val otherMemberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, "consumer", List(("range", metadata)),
+      isCoordinatorForGroup = true)
+    assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
+
+    EasyMock.reset(offsetManager)
+    val otherJoinGroupResult = joinGroup(groupId, otherMemberId, DefaultSessionTimeout, "copycat",
+      List(("range", metadata)), isCoordinatorForGroup = true)
+    assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL.code, otherJoinGroupResult.errorCode)
+  }
+
+  @Test
+  def testJoinGroupInconsistentGroupProtocol() {
+    val groupId = "groupId"
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val protocolType = "consumer"
+    val metadata = Array[Byte]()
+
+    val otherMemberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, List(("range", metadata)),
+      isCoordinatorForGroup = true)
+    assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
+
+    EasyMock.reset(offsetManager)
+    val otherJoinGroupResult = joinGroup(groupId, otherMemberId, DefaultSessionTimeout, protocolType,
+      List(("roundrobin", metadata)), isCoordinatorForGroup = true)
+    assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL.code, otherJoinGroupResult.errorCode)
+  }
+
+  @Test
+  def testJoinGroupUnknownConsumerExistingGroup() {
+    val groupId = "groupId"
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val otherMemberId = "memberId"
+    val metadata = Array[Byte]()
+    val protocolType = "consumer"
+    val protocols = List(("range", metadata))
+
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols,
+      isCoordinatorForGroup = true)
+    assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
+
+    EasyMock.reset(offsetManager)
+    val otherJoinGroupResult = joinGroup(groupId, otherMemberId, DefaultSessionTimeout, protocolType, protocols,
+      isCoordinatorForGroup = true)
+    assertEquals(Errors.UNKNOWN_MEMBER_ID.code, otherJoinGroupResult.errorCode)
+  }
+
+  @Test
+  def testHeartbeatWrongCoordinator() {
+    val groupId = "groupId"
+    val consumerId = "memberId"
+
+    val heartbeatResult = heartbeat(groupId, consumerId, -1, isCoordinatorForGroup = false)
+    assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.code, heartbeatResult)
+  }
+
+  @Test
+  def testHeartbeatUnknownGroup() {
+    val groupId = "groupId"
+    val consumerId = "memberId"
+
+    val heartbeatResult = heartbeat(groupId, consumerId, -1, isCoordinatorForGroup = true)
+    assertEquals(Errors.UNKNOWN_MEMBER_ID.code, heartbeatResult)
+  }
+
+  @Test
+  def testHeartbeatUnknownConsumerExistingGroup() {
+    val groupId = "groupId"
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val otherMemberId = "memberId"
+    val metadata = Array[Byte]()
+    val protocolType = "consumer"
+    val protocols = List(("range", metadata))
+
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols,
+      isCoordinatorForGroup = true)
+    val joinGroupErrorCode = joinGroupResult.errorCode
+    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+    EasyMock.reset(offsetManager)
+    val syncGroupResult = syncGroupLeader(groupId, joinGroupResult.generationId, joinGroupResult.memberId, Map.empty, true)
+    val syncGroupErrorCode = syncGroupResult._2
+    assertEquals(Errors.NONE.code, syncGroupErrorCode)
+
+    EasyMock.reset(offsetManager)
+    val heartbeatResult = heartbeat(groupId, otherMemberId, 1, isCoordinatorForGroup = true)
+    assertEquals(Errors.UNKNOWN_MEMBER_ID.code, heartbeatResult)
+  }
+
+  @Test
+  def testHeartbeatRebalanceInProgress() {
+    val groupId = "groupId"
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val metadata = Array[Byte]()
+    val protocolType = "consumer"
+    val protocols = List(("range", metadata))
+
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols,
+      isCoordinatorForGroup = true)
+    val assignedMemberId = joinGroupResult.memberId
+    val joinGroupErrorCode = joinGroupResult.errorCode
+    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+    EasyMock.reset(offsetManager)
+    val heartbeatResult = heartbeat(groupId, assignedMemberId, 2, isCoordinatorForGroup = true)
+    assertEquals(Errors.REBALANCE_IN_PROGRESS.code, heartbeatResult)
+  }
+
+  @Test
+  def testHeartbeatIllegalGeneration() {
+    val groupId = "groupId"
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val metadata = Array[Byte]()
+    val protocolType = "consumer"
+    val protocols = List(("range", metadata))
+
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols,
+      isCoordinatorForGroup = true)
+    val assignedMemberId = joinGroupResult.memberId
+    val joinGroupErrorCode = joinGroupResult.errorCode
+    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+    EasyMock.reset(offsetManager)
+    val syncGroupResult = syncGroupLeader(groupId, joinGroupResult.generationId, assignedMemberId, Map.empty, true)
+    val syncGroupErrorCode = syncGroupResult._2
+    assertEquals(Errors.NONE.code, syncGroupErrorCode)
+
+    EasyMock.reset(offsetManager)
+    val heartbeatResult = heartbeat(groupId, assignedMemberId, 2, isCoordinatorForGroup = true)
+    assertEquals(Errors.ILLEGAL_GENERATION.code, heartbeatResult)
+  }
+
+  @Test
+  def testValidHeartbeat() {
+    val groupId = "groupId"
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val metadata = Array[Byte]()
+    val protocolType = "consumer"
+    val protocols = List(("range", metadata))
+
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols,
+      isCoordinatorForGroup = true)
+    val assignedConsumerId = joinGroupResult.memberId
+    val generationId = joinGroupResult.generationId
+    val joinGroupErrorCode = joinGroupResult.errorCode
+    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+    EasyMock.reset(offsetManager)
+    val syncGroupResult = syncGroupLeader(groupId, generationId, assignedConsumerId, Map.empty, true)
+    val syncGroupErrorCode = syncGroupResult._2
+    assertEquals(Errors.NONE.code, syncGroupErrorCode)
+
+    EasyMock.reset(offsetManager)
+    val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1, isCoordinatorForGroup = true)
+    assertEquals(Errors.NONE.code, heartbeatResult)
+  }
+
+  @Test
+  def testSyncGroupNotCoordinator() {
+    val groupId = "groupId"
+    val memberId = "member"
+    val generation = 1
+
+    val syncGroupResult = syncGroupFollower(groupId, generation, memberId, false)
+    assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.code, syncGroupResult._2)
+  }
+
+  @Test
+  def testSyncGroupFromUnknownGroup() {
+    val groupId = "groupId"
+    val memberId = "member"
+    val generation = 1
+
+    val syncGroupResult = syncGroupFollower(groupId, generation, memberId, true)
+    assertEquals(Errors.UNKNOWN_MEMBER_ID.code, syncGroupResult._2)
+  }
+
+  @Test
+  def testSyncGroupFromUnknownMember() {
+    val groupId = "groupId"
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val metadata = Array[Byte]()
+    val protocolType = "consumer"
+    val protocols = List(("range", metadata))
+
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols,
+      isCoordinatorForGroup = true)
+    val assignedConsumerId = joinGroupResult.memberId
+    val generationId = joinGroupResult.generationId
+    assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
+
+    EasyMock.reset(offsetManager)
+    val syncGroupResult = syncGroupLeader(groupId, generationId, assignedConsumerId, Map.empty, true)
+    val syncGroupErrorCode = syncGroupResult._2
+    assertEquals(Errors.NONE.code, syncGroupErrorCode)
+
+    EasyMock.reset(offsetManager)
+    val unknownMemberId = "blah"
+    val unknownMemberSyncResult = syncGroupFollower(groupId, generationId, unknownMemberId, true)
+    assertEquals(Errors.UNKNOWN_MEMBER_ID.code, unknownMemberSyncResult._2)
+  }
+
+  @Test
+  def testSyncGroupFromIllegalGeneration() {
+    val groupId = "groupId"
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val metadata = Array[Byte]()
+    val protocolType = "consumer"
+    val protocols = List(("range", metadata))
+
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols,
+      isCoordinatorForGroup = true)
+    val assignedConsumerId = joinGroupResult.memberId
+    val generationId = joinGroupResult.generationId
+    assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
+
+    EasyMock.reset(offsetManager)
+    // send the sync group with an invalid generation
+    val syncGroupResult = syncGroupLeader(groupId, generationId+1, assignedConsumerId, Map.empty, true)
+    assertEquals(Errors.ILLEGAL_GENERATION.code, syncGroupResult._2)
+  }
+
+  @Test
+  def testJoinGroupFromUnchangedFollowerDoesNotRebalance() {
+    val groupId = "groupId"
+    val protocolType = "consumer"
+    val protocols = List(("range", Array[Byte]()))
+
+    // to get a group of two members:
+    // 1. join and sync with a single member (because we can't immediately join with two members)
+    // 2. join and sync with the first member and a new member
+
+    val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout,
+      protocolType, protocols, isCoordinatorForGroup = true)
+    val firstMemberId = firstJoinResult.memberId
+    val firstGenerationId = firstJoinResult.generationId
+    assertEquals(firstMemberId, firstJoinResult.leaderId)
+    assertEquals(Errors.NONE.code, firstJoinResult.errorCode)
+
+    EasyMock.reset(offsetManager)
+    val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map.empty, true)
+    assertEquals(Errors.NONE.code, firstSyncResult._2)
+
+    EasyMock.reset(offsetManager)
+    val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout,
+      protocolType, protocols, isCoordinatorForGroup = true)
+
+    EasyMock.reset(offsetManager)
+    val joinFuture = sendJoinGroup(groupId, firstMemberId, DefaultSessionTimeout, protocolType, protocols,
+      isCoordinatorForGroup = true);
+
+    val joinResult = await(joinFuture, DefaultSessionTimeout+100)
+    val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
+    assertEquals(Errors.NONE.code, joinResult.errorCode)
+    assertEquals(Errors.NONE.code, otherJoinResult.errorCode)
+    assertTrue(joinResult.generationId == otherJoinResult.generationId)
+
+    assertEquals(firstMemberId, joinResult.leaderId)
+    assertEquals(firstMemberId, otherJoinResult.leaderId)
+
+    val nextGenerationId = joinResult.generationId
+
+    // this shouldn't cause a rebalance since protocol information hasn't changed
+    EasyMock.reset(offsetManager)
+    val followerJoinResult = joinGroup(groupId, otherJoinResult.memberId, DefaultSessionTimeout, protocolType, protocols,
+      isCoordinatorForGroup = true)
+
+    assertEquals(Errors.NONE.code, followerJoinResult.errorCode)
+    assertEquals(nextGenerationId, followerJoinResult.generationId)
+  }
+
+  @Test
+  def testJoinGroupFromUnchangedLeaderShouldRebalance() {
+    val groupId = "groupId"
+    val protocolType = "consumer"
+    val protocols = List(("range", Array[Byte]()))
+
+    val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout,
+      protocolType, protocols, isCoordinatorForGroup = true)
+    val firstMemberId = firstJoinResult.memberId
+    val firstGenerationId = firstJoinResult.generationId
+    assertEquals(firstMemberId, firstJoinResult.leaderId)
+    assertEquals(Errors.NONE.code, firstJoinResult.errorCode)
+
+    EasyMock.reset(offsetManager)
+    val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map.empty, true)
+    assertEquals(Errors.NONE.code, firstSyncResult._2)
+
+    // join groups from the leader should force the group to rebalance, which allows the
+    // leader to push new assignments when local metadata changes
+
+    EasyMock.reset(offsetManager)
+    val secondJoinResult = joinGroup(groupId, firstMemberId, DefaultSessionTimeout, protocolType, protocols,
+      isCoordinatorForGroup = true)
+
+    assertEquals(Errors.NONE.code, secondJoinResult.errorCode)
+    assertNotEquals(firstGenerationId, secondJoinResult.generationId)
+  }
+
+  @Test
+  def testLeaderFailureInSyncGroup() {
+    val groupId = "groupId"
+    val protocolType = "consumer"
+    val protocols = List(("range", Array[Byte]()))
+
+    // to get a group of two members:
+    // 1. join and sync with a single member (because we can't immediately join with two members)
+    // 2. join and sync with the first member and a new member
+
+    val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout,
+      protocolType, protocols, isCoordinatorForGroup = true)
+    val firstMemberId = firstJoinResult.memberId
+    val firstGenerationId = firstJoinResult.generationId
+    assertEquals(firstMemberId, firstJoinResult.leaderId)
+    assertEquals(Errors.NONE.code, firstJoinResult.errorCode)
+
+    EasyMock.reset(offsetManager)
+    val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map.empty, true)
+    assertEquals(Errors.NONE.code, firstSyncResult._2)
+
+    EasyMock.reset(offsetManager)
+    val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout,
+      protocolType, protocols, isCoordinatorForGroup = true)
+
+    EasyMock.reset(offsetManager)
+    val joinFuture = sendJoinGroup(groupId, firstMemberId, DefaultSessionTimeout, protocolType, protocols,
+      isCoordinatorForGroup = true);
+
+    val joinResult = await(joinFuture, DefaultSessionTimeout+100)
+    val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
+    assertEquals(Errors.NONE.code, joinResult.errorCode)
+    assertEquals(Errors.NONE.code, otherJoinResult.errorCode)
+    assertTrue(joinResult.generationId == otherJoinResult.generationId)
+
+    assertEquals(firstMemberId, joinResult.leaderId)
+    assertEquals(firstMemberId, otherJoinResult.leaderId)
+
+    val nextGenerationId = joinResult.generationId
+
+    // with no leader SyncGroup, the follower's request should failure with an error indicating
+    // that it should rejoin
+    EasyMock.reset(offsetManager)
+    val followerSyncFuture= sendSyncGroupFollower(groupId, nextGenerationId, otherJoinResult.memberId,
+      isCoordinatorForGroup = true)
+    val followerSyncResult = await(followerSyncFuture, DefaultSessionTimeout+100)
+    assertEquals(Errors.REBALANCE_IN_PROGRESS.code, followerSyncResult._2)
+  }
+
+  @Test
+  def testSyncGroupFollowerAfterLeader() {
+    val groupId = "groupId"
+    val protocolType = "consumer"
+    val protocols = List(("range", Array[Byte]()))
+
+    // to get a group of two members:
+    // 1. join and sync with a single member (because we can't immediately join with two members)
+    // 2. join and sync with the first member and a new member
+
+    val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout,
+      protocolType, protocols, isCoordinatorForGroup = true)
+    val firstMemberId = firstJoinResult.memberId
+    val firstGenerationId = firstJoinResult.generationId
+    assertEquals(firstMemberId, firstJoinResult.leaderId)
+    assertEquals(Errors.NONE.code, firstJoinResult.errorCode)
+
+    EasyMock.reset(offsetManager)
+    val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map.empty, true)
+    assertEquals(Errors.NONE.code, firstSyncResult._2)
+
+    EasyMock.reset(offsetManager)
+    val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout,
+      protocolType, protocols, isCoordinatorForGroup = true)
+
+    EasyMock.reset(offsetManager)
+    val joinFuture = sendJoinGroup(groupId, firstMemberId, DefaultSessionTimeout, protocolType, protocols,
+      isCoordinatorForGroup = true);
+
+    val joinResult = await(joinFuture, DefaultSessionTimeout+100)
+    val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
+    assertEquals(Errors.NONE.code, joinResult.errorCode)
+    assertEquals(Errors.NONE.code, otherJoinResult.errorCode)
+    assertTrue(joinResult.generationId == otherJoinResult.generationId)
+
+    assertEquals(firstMemberId, joinResult.leaderId)
+    assertEquals(firstMemberId, otherJoinResult.leaderId)
+
+    val nextGenerationId = joinResult.generationId
+    val leaderId = firstMemberId
+    val leaderAssignment = Array[Byte](0)
+    val followerId = otherJoinResult.memberId
+    val followerAssignment = Array[Byte](1)
+
+    EasyMock.reset(offsetManager)
+    val leaderSyncResult = syncGroupLeader(groupId, nextGenerationId, leaderId,
+      Map(leaderId -> leaderAssignment, followerId -> followerAssignment), isCoordinatorForGroup = true)
+    assertEquals(Errors.NONE.code, leaderSyncResult._2)
+    assertEquals(leaderAssignment, leaderSyncResult._1)
+
+    EasyMock.reset(offsetManager)
+    val followerSyncResult = syncGroupFollower(groupId, nextGenerationId, otherJoinResult.memberId,
+      isCoordinatorForGroup = true)
+    assertEquals(Errors.NONE.code, followerSyncResult._2)
+    assertEquals(followerAssignment, followerSyncResult._1)
+  }
+
+  @Test
+  def testSyncGroupLeaderAfterFollower() {
+    val groupId = "groupId"
+    val protocolType = "consumer"
+    val protocols = List(("range", Array[Byte]()))
+
+    // to get a group of two members:
+    // 1. join and sync with a single member (because we can't immediately join with two members)
+    // 2. join and sync with the first member and a new member
+
+    val joinGroupResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout,
+      protocolType, protocols, isCoordinatorForGroup = true)
+    val firstMemberId = joinGroupResult.memberId
+    val firstGenerationId = joinGroupResult.generationId
+    assertEquals(firstMemberId, joinGroupResult.leaderId)
+    assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
+
+    EasyMock.reset(offsetManager)
+    val syncGroupResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map.empty, true)
+    val syncGroupErrorCode = syncGroupResult._2
+    assertEquals(Errors.NONE.code, syncGroupErrorCode)
+
+    EasyMock.reset(offsetManager)
+    val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout,
+      protocolType, protocols, isCoordinatorForGroup = true)
+
+    EasyMock.reset(offsetManager)
+    val joinFuture = sendJoinGroup(groupId, firstMemberId, DefaultSessionTimeout, protocolType, protocols,
+      isCoordinatorForGroup = true);
+
+    val joinResult = await(joinFuture, DefaultSessionTimeout+100)
+    val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
+    assertEquals(Errors.NONE.code, joinResult.errorCode)
+    assertEquals(Errors.NONE.code, otherJoinResult.errorCode)
+    assertTrue(joinResult.generationId == otherJoinResult.generationId)
+
+    val nextGenerationId = joinResult.generationId
+    val leaderId = joinResult.leaderId
+    val leaderAssignment = Array[Byte](0)
+    val followerId = otherJoinResult.memberId
+    val followerAssignment = Array[Byte](1)
+
+    assertEquals(firstMemberId, joinResult.leaderId)
+    assertEquals(firstMemberId, otherJoinResult.leaderId)
+
+    EasyMock.reset(offsetManager)
+    val followerSyncFuture = sendSyncGroupFollower(groupId, nextGenerationId, followerId, isCoordinatorForGroup = true)
+
+    EasyMock.reset(offsetManager)
+    val leaderSyncResult = syncGroupLeader(groupId, nextGenerationId, leaderId,
+      Map(leaderId -> leaderAssignment, followerId -> followerAssignment), isCoordinatorForGroup = true)
+    assertEquals(Errors.NONE.code, leaderSyncResult._2)
+    assertEquals(leaderAssignment, leaderSyncResult._1)
+
+    val followerSyncResult = await(followerSyncFuture, DefaultSessionTimeout+100)
+    assertEquals(Errors.NONE.code, followerSyncResult._2)
+    assertEquals(followerAssignment, followerSyncResult._1)
+  }
+
+  @Test
+  def testCommitOffsetFromUnknownGroup() {
+    val groupId = "groupId"
+    val consumerId = "consumer"
+    val generationId = 1
+    val tp = new TopicAndPartition("topic", 0)
+    val offset = OffsetAndMetadata(0)
+
+    val commitOffsetResult = commitOffsets(groupId, consumerId, generationId, Map(tp -> offset), true)
+    assertEquals(Errors.ILLEGAL_GENERATION.code, commitOffsetResult(tp))
+  }
+
+  @Test
+  def testCommitOffsetWithDefaultGeneration() {
+    val groupId = "groupId"
+    val tp = new TopicAndPartition("topic", 0)
+    val offset = OffsetAndMetadata(0)
+
+    val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
+      OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset), true)
+    assertEquals(Errors.NONE.code, commitOffsetResult(tp))
+  }
+
+  @Test
+  def testCommitOffsetInAwaitingSync() {
+    val groupId = "groupId"
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val metadata = Array[Byte]()
+    val protocolType = "consumer"
+    val protocols = List(("range", metadata))
+    val tp = new TopicAndPartition("topic", 0)
+    val offset = OffsetAndMetadata(0)
+
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols,
+      isCoordinatorForGroup = true)
+    val assignedMemberId = joinGroupResult.memberId
+    val generationId = joinGroupResult.generationId
+    val joinGroupErrorCode = joinGroupResult.errorCode
+    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+    EasyMock.reset(offsetManager)
+    val commitOffsetResult = commitOffsets(groupId, assignedMemberId, generationId, Map(tp -> offset), true)
+    assertEquals(Errors.REBALANCE_IN_PROGRESS.code, commitOffsetResult(tp))
+  }
+
+  @Test
+  def testHeartbeatDuringRebalanceCausesRebalanceInProgress() {
+    val groupId = "groupId"
+    val metadata = Array[Byte]()
+    val protocolType = "consumer"
+    val protocols = List(("range", metadata))
+
+    // First start up a group (with a slightly larger timeout to give us time to heartbeat when the rebalance starts)
+    val joinGroupResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout,
+      protocolType, protocols, isCoordinatorForGroup = true)
+    val assignedConsumerId = joinGroupResult.memberId
+    val initialGenerationId = joinGroupResult.generationId
+    val joinGroupErrorCode = joinGroupResult.errorCode
+    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+    // Then join with a new consumer to trigger a rebalance
+    EasyMock.reset(offsetManager)
+    sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout, protocolType, protocols,
+      isCoordinatorForGroup = true)
+
+    // We should be in the middle of a rebalance, so the heartbeat should return rebalance in progress
+    EasyMock.reset(offsetManager)
+    val heartbeatResult = heartbeat(groupId, assignedConsumerId, initialGenerationId, isCoordinatorForGroup = true)
+    assertEquals(Errors.REBALANCE_IN_PROGRESS.code, heartbeatResult)
+  }
+
+  @Test
+  def testGenerationIdIncrementsOnRebalance() {
+    val groupId = "groupId"
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val otherMemberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val metadata = Array[Byte]()
+    val protocolType = "consumer"
+    val protocols = List(("range", metadata))
+
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols,
+      isCoordinatorForGroup = true)
+    val initialGenerationId = joinGroupResult.generationId
+    val joinGroupErrorCode = joinGroupResult.errorCode
+    assertEquals(1, initialGenerationId)
+    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+    EasyMock.reset(offsetManager)
+    val otherJoinGroupResult = joinGroup(groupId, otherMemberId, DefaultSessionTimeout, protocolType, protocols,
+      isCoordinatorForGroup = true)
+    val nextGenerationId = otherJoinGroupResult.generationId
+    val otherJoinGroupErrorCode = otherJoinGroupResult.errorCode
+    assertEquals(2, nextGenerationId)
+    assertEquals(Errors.NONE.code, otherJoinGroupErrorCode)
+  }
+
+  @Test
+  def testLeaveGroupWrongCoordinator() {
+    val groupId = "groupId"
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+    val leaveGroupResult = leaveGroup(groupId, memberId, isCoordinatorForGroup = false)
+    assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.code, leaveGroupResult)
+  }
+
+  @Test
+  def testLeaveGroupUnknownGroup() {
+    val groupId = "groupId"
+    val memberId = "consumerId"
+
+    val leaveGroupResult = leaveGroup(groupId, memberId, isCoordinatorForGroup = true)
+    assertEquals(Errors.UNKNOWN_MEMBER_ID.code, leaveGroupResult)
+  }
+
+  @Test
+  def testLeaveGroupUnknownConsumerExistingGroup() {
+    val groupId = "groupId"
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val otherMemberId = "consumerId"
+    val metadata = Array[Byte]()
+    val protocolType = "consumer"
+    val protocols = List(("range", metadata))
+
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols,
+      isCoordinatorForGroup = true)
+    val joinGroupErrorCode = joinGroupResult.errorCode
+    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+    EasyMock.reset(offsetManager)
+    val leaveGroupResult = leaveGroup(groupId, otherMemberId, isCoordinatorForGroup = true)
+    assertEquals(Errors.UNKNOWN_MEMBER_ID.code, leaveGroupResult)
+  }
+
+  @Test
+  def testValidLeaveGroup() {
+    val groupId = "groupId"
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val metadata = Array[Byte]()
+    val protocolType = "consumer"
+    val protocols = List(("range", metadata))
+
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols,
+      isCoordinatorForGroup = true)
+    val assignedMemberId = joinGroupResult.memberId
+    val joinGroupErrorCode = joinGroupResult.errorCode
+    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+    EasyMock.reset(offsetManager)
+    val leaveGroupResult = leaveGroup(groupId, assignedMemberId, isCoordinatorForGroup = true)
+    assertEquals(Errors.NONE.code, leaveGroupResult)
+  }
+
+  private def setupJoinGroupCallback: (Future[JoinGroupResult], JoinGroupCallback) = {
+    val responsePromise = Promise[JoinGroupResult]
+    val responseFuture = responsePromise.future
+    val responseCallback: JoinGroupCallback = responsePromise.success(_)
+    (responseFuture, responseCallback)
+  }
+
+  private def setupSyncGroupCallback: (Future[SyncGroupCallbackParams], SyncGroupCallback) = {
+    val responsePromise = Promise[SyncGroupCallbackParams]
+    val responseFuture = responsePromise.future
+    val responseCallback: SyncGroupCallback = (assignment, errorCode) =>
+      responsePromise.success((assignment, errorCode))
+    (responseFuture, responseCallback)
+  }
+
+  private def setupHeartbeatCallback: (Future[HeartbeatCallbackParams], HeartbeatCallback) = {
+    val responsePromise = Promise[HeartbeatCallbackParams]
+    val responseFuture = responsePromise.future
+    val responseCallback: HeartbeatCallback = errorCode => responsePromise.success(errorCode)
+    (responseFuture, responseCallback)
+  }
+
+  private def setupCommitOffsetsCallback: (Future[CommitOffsetCallbackParams], CommitOffsetCallback) = {
+    val responsePromise = Promise[CommitOffsetCallbackParams]
+    val responseFuture = responsePromise.future
+    val responseCallback: CommitOffsetCallback = offsets => responsePromise.success(offsets)
+    (responseFuture, responseCallback)
+  }
+
+  private def sendJoinGroup(groupId: String,
+                            memberId: String,
+                            sessionTimeout: Int,
+                            protocolType: String,
+                            protocols: List[(String, Array[Byte])],
+                            isCoordinatorForGroup: Boolean): Future[JoinGroupResult] = {
+    val (responseFuture, responseCallback) = setupJoinGroupCallback
+    EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1)
+    EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup)
+    EasyMock.replay(offsetManager)
+    consumerCoordinator.handleJoinGroup(groupId, memberId, sessionTimeout, protocolType, protocols, responseCallback)
+    responseFuture
+  }
+
+
+  private def sendSyncGroupLeader(groupId: String,
+                                  generation: Int,
+                                  leaderId: String,
+                                  assignment: Map[String, Array[Byte]],
+                                  isCoordinatorForGroup: Boolean): Future[SyncGroupCallbackParams] = {
+    val (responseFuture, responseCallback) = setupSyncGroupCallback
+    EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1)
+    EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup)
+    EasyMock.replay(offsetManager)
+    consumerCoordinator.handleSyncGroup(groupId, generation, leaderId, assignment, responseCallback)
+    responseFuture
+  }
+
+  private def sendSyncGroupFollower(groupId: String,
+                                    generation: Int,
+                                    memberId: String,
+                                    isCoordinatorForGroup: Boolean): Future[SyncGroupCallbackParams] = {
+    val (responseFuture, responseCallback) = setupSyncGroupCallback
+    EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1)
+    EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup)
+    EasyMock.replay(offsetManager)
+    consumerCoordinator.handleSyncGroup(groupId, generation, memberId, Map.empty[String, Array[Byte]], responseCallback)
+    responseFuture
+  }
+
+  private def joinGroup(groupId: String,
+                        memberId: String,
+                        sessionTimeout: Int,
+                        protocolType: String,
+                        protocols: List[(String, Array[Byte])],
+                        isCoordinatorForGroup: Boolean): JoinGroupResult = {
+    val responseFuture = sendJoinGroup(groupId, memberId, sessionTimeout, protocolType, protocols, isCoordinatorForGroup)
+    // should only have to wait as long as session timeout, but allow some extra time in case of an unexpected delay
+    Await.result(responseFuture, Duration(sessionTimeout+100, TimeUnit.MILLISECONDS))
+  }
+
+
+  private def syncGroupFollower(groupId: String,
+                                generationId: Int,
+                                memberId: String,
+                                isCoordinatorForGroup: Boolean): SyncGroupCallbackParams = {
+    val responseFuture = sendSyncGroupFollower(groupId, generationId, memberId, isCoordinatorForGroup)
+    Await.result(responseFuture, Duration(DefaultSessionTimeout+100, TimeUnit.MILLISECONDS))
+  }
+
+  private def syncGroupLeader(groupId: String,
+                              generationId: Int,
+                              memberId: String,
+                              assignment: Map[String, Array[Byte]],
+                              isCoordinatorForGroup: Boolean): SyncGroupCallbackParams = {
+    val responseFuture = sendSyncGroupLeader(groupId, generationId, memberId, assignment, isCoordinatorForGroup)
+    Await.result(responseFuture, Duration(DefaultSessionTimeout+100, TimeUnit.MILLISECONDS))
+  }
+
+  private def heartbeat(groupId: String,
+                        consumerId: String,
+                        generationId: Int,
+                        isCoordinatorForGroup: Boolean): HeartbeatCallbackParams = {
+    val (responseFuture, responseCallback) = setupHeartbeatCallback
+    EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1)
+    EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup)
+    EasyMock.replay(offsetManager)
+    consumerCoordinator.handleHeartbeat(groupId, consumerId, generationId, responseCallback)
+    Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
+  }
+
+  private def await[T](future: Future[T], millis: Long): T = {
+    Await.result(future, Duration(millis, TimeUnit.MILLISECONDS))
+  }
+
+  private def commitOffsets(groupId: String,
+                            consumerId: String,
+                            generationId: Int,
+                            offsets: Map[TopicAndPartition, OffsetAndMetadata],
+                            isCoordinatorForGroup: Boolean): CommitOffsetCallbackParams = {
+    val (responseFuture, responseCallback) = setupCommitOffsetsCallback
+    EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1)
+    EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup)
+    val storeOffsetAnswer = new IAnswer[Unit] {
+      override def answer = responseCallback.apply(offsets.mapValues(_ => Errors.NONE.code))
+    }
+    EasyMock.expect(offsetManager.storeOffsets(groupId, consumerId, generationId, offsets, responseCallback))
+      .andAnswer(storeOffsetAnswer)
+    EasyMock.replay(offsetManager)
+    consumerCoordinator.handleCommitOffsets(groupId, consumerId, generationId, offsets, responseCallback)
+    Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
+  }
+
+  private def leaveGroup(groupId: String, consumerId: String, isCoordinatorForGroup: Boolean): LeaveGroupCallbackParams = {
+    val (responseFuture, responseCallback) = setupHeartbeatCallback
+    EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1)
+    EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup)
+    EasyMock.replay(offsetManager)
+    consumerCoordinator.handleLeaveGroup(groupId, consumerId, responseCallback)
+    Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
+  }
+}


Mime
View raw message