kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/3] kafka git commit: KAFKA-2017: Persist Group Metadata and Assignment before Responding
Date Tue, 03 Nov 2015 07:36:20 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk e466ccd71 -> 7c3347527


http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index 5eaaea8..5e6bd03 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -18,18 +18,22 @@
 package kafka.coordinator
 
 
-import java.util.concurrent.TimeUnit
-
 import org.junit.Assert._
+
+import kafka.api.ProducerResponseStatus
 import kafka.common.{OffsetAndMetadata, TopicAndPartition}
-import kafka.server.{OffsetManager, KafkaConfig}
-import kafka.utils.TestUtils
+import kafka.message.MessageSet
+import kafka.server.{ReplicaManager, KafkaConfig}
+import kafka.utils._
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{OffsetCommitRequest, JoinGroupRequest}
-import org.easymock.{IAnswer, EasyMock}
+import org.easymock.{Capture, IAnswer, EasyMock}
 import org.junit.{After, Before, Test}
 import org.scalatest.junit.JUnitSuite
 
+import java.util.concurrent.TimeUnit
+
+import scala.collection._
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, Future, Promise}
 
@@ -51,76 +55,81 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
   val ConsumerMaxSessionTimeout = 1000
   val DefaultSessionTimeout = 500
   var consumerCoordinator: GroupCoordinator = null
-  var offsetManager : OffsetManager = null
+  var replicaManager: ReplicaManager = null
+  var scheduler: KafkaScheduler = null
+  var zkUtils: ZkUtils = null
+
+  private val groupId = "groupId"
+  private val protocolType = "consumer"
+  private val memberId = "memberId"
+  private val metadata = Array[Byte]()
+  private val protocols = List(("range", metadata))
+  private var groupPartitionId: Int = -1
+
+  // we use this string value since its hashcode % #.partitions is different
+  private val otherGroupId = "otherGroup"
 
   @Before
   def setUp() {
     val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")
     props.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, ConsumerMinSessionTimeout.toString)
     props.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, ConsumerMaxSessionTimeout.toString)
-    offsetManager = EasyMock.createStrictMock(classOf[OffsetManager])
-    consumerCoordinator = GroupCoordinator.create(KafkaConfig.fromProps(props), null, offsetManager)
+
+    // make two partitions of the group topic to make sure some partitions are not owned by the coordinator
+    val ret = mutable.Map[String, Map[Int, Seq[Int]]]()
+    ret += (GroupCoordinator.GroupMetadataTopicName -> Map(0 -> Seq(1), 1 -> Seq(1)))
+
+    replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
+
+    zkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
+    EasyMock.expect(zkUtils.getPartitionAssignmentForTopics(Seq(GroupCoordinator.GroupMetadataTopicName))).andReturn(ret)
+    EasyMock.replay(zkUtils)
+
+    consumerCoordinator = GroupCoordinator.create(KafkaConfig.fromProps(props), zkUtils, replicaManager, new MockScheduler(new MockTime()))
     consumerCoordinator.startup()
+
+    // add the partition into the owned partition list
+    groupPartitionId = consumerCoordinator.partitionFor(groupId)
+    consumerCoordinator.groupManager.addPartitionOwnership(groupPartitionId)
   }
 
   @After
   def tearDown() {
-    EasyMock.reset(offsetManager)
+    EasyMock.reset(replicaManager)
     consumerCoordinator.shutdown()
   }
 
   @Test
   def testJoinGroupWrongCoordinator() {
-    val groupId = "groupId"
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
-    val metadata = Array[Byte]()
-    val protocolType = "consumer"
-    val protocols = List(("range", metadata))
 
-    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType,
-      protocols, isCoordinatorForGroup = false)
+    val joinGroupResult = joinGroup(otherGroupId, memberId, DefaultSessionTimeout, protocolType,
+      protocols)
     val joinGroupErrorCode = joinGroupResult.errorCode
     assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.code, joinGroupErrorCode)
   }
 
   @Test
   def testJoinGroupSessionTimeoutTooSmall() {
-    val groupId = "groupId"
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
-    val metadata = Array[Byte]()
-    val protocolType = "consumer"
-    val protocols = List(("range", metadata))
 
-    val joinGroupResult = joinGroup(groupId, memberId, ConsumerMinSessionTimeout - 1, protocolType, protocols,
-      isCoordinatorForGroup = true)
+    val joinGroupResult = joinGroup(groupId, memberId, ConsumerMinSessionTimeout - 1, protocolType, protocols)
     val joinGroupErrorCode = joinGroupResult.errorCode
     assertEquals(Errors.INVALID_SESSION_TIMEOUT.code, joinGroupErrorCode)
   }
 
   @Test
   def testJoinGroupSessionTimeoutTooLarge() {
-    val groupId = "groupId"
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
-    val metadata = Array[Byte]()
-    val protocolType = "consumer"
-    val protocols = List(("range", metadata))
 
-    val joinGroupResult = joinGroup(groupId, memberId, ConsumerMaxSessionTimeout + 1, protocolType, protocols,
-      isCoordinatorForGroup = true)
+    val joinGroupResult = joinGroup(groupId, memberId, ConsumerMaxSessionTimeout + 1, protocolType, protocols)
     val joinGroupErrorCode = joinGroupResult.errorCode
     assertEquals(Errors.INVALID_SESSION_TIMEOUT.code, joinGroupErrorCode)
   }
 
   @Test
   def testJoinGroupUnknownConsumerNewGroup() {
-    val groupId = "groupId"
-    val memberId = "memberId"
-    val metadata = Array[Byte]()
-    val protocolType = "consumer"
-    val protocols = List(("range", metadata))
-
-    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols,
-      isCoordinatorForGroup = true)
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
     val joinGroupErrorCode = joinGroupResult.errorCode
     assertEquals(Errors.UNKNOWN_MEMBER_ID.code, joinGroupErrorCode)
   }
@@ -129,288 +138,226 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
   def testInvalidGroupId() {
     val groupId = ""
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
-    val metadata = Array[Byte]()
-    val protocolType = "consumer"
-    val protocols = List(("range", metadata))
 
     val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType,
-      protocols, isCoordinatorForGroup = true)
+      protocols)
     assertEquals(Errors.INVALID_GROUP_ID.code, joinGroupResult.errorCode)
   }
 
   @Test
   def testValidJoinGroup() {
-    val groupId = "groupId"
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
-    val metadata = Array[Byte]()
-    val protocolType = "consumer"
-    val protocols = List(("range", metadata))
 
     val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType,
-      protocols, isCoordinatorForGroup = true)
+      protocols)
     val joinGroupErrorCode = joinGroupResult.errorCode
     assertEquals(Errors.NONE.code, joinGroupErrorCode)
   }
 
   @Test
   def testJoinGroupInconsistentProtocolType() {
-    val groupId = "groupId"
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
-    val metadata = Array[Byte]()
-
     val otherMemberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
-    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, "consumer", List(("range", metadata)),
-      isCoordinatorForGroup = true)
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType,
+      protocols)
     assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
 
-    EasyMock.reset(offsetManager)
+    EasyMock.reset(replicaManager)
     val otherJoinGroupResult = joinGroup(groupId, otherMemberId, DefaultSessionTimeout, "copycat",
-      List(("range", metadata)), isCoordinatorForGroup = true)
+      protocols)
     assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL.code, otherJoinGroupResult.errorCode)
   }
 
   @Test
   def testJoinGroupInconsistentGroupProtocol() {
-    val groupId = "groupId"
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
-    val protocolType = "consumer"
-    val metadata = Array[Byte]()
 
     val otherMemberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
-    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, List(("range", metadata)),
-      isCoordinatorForGroup = true)
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, List(("range", metadata)))
     assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
 
-    EasyMock.reset(offsetManager)
+    EasyMock.reset(replicaManager)
     val otherJoinGroupResult = joinGroup(groupId, otherMemberId, DefaultSessionTimeout, protocolType,
-      List(("roundrobin", metadata)), isCoordinatorForGroup = true)
+      List(("roundrobin", metadata)))
     assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL.code, otherJoinGroupResult.errorCode)
   }
 
   @Test
   def testJoinGroupUnknownConsumerExistingGroup() {
-    val groupId = "groupId"
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
     val otherMemberId = "memberId"
-    val metadata = Array[Byte]()
-    val protocolType = "consumer"
-    val protocols = List(("range", metadata))
 
-    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols,
-      isCoordinatorForGroup = true)
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
     assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
 
-    EasyMock.reset(offsetManager)
-    val otherJoinGroupResult = joinGroup(groupId, otherMemberId, DefaultSessionTimeout, protocolType, protocols,
-      isCoordinatorForGroup = true)
+    EasyMock.reset(replicaManager)
+    val otherJoinGroupResult = joinGroup(groupId, otherMemberId, DefaultSessionTimeout, protocolType, protocols)
     assertEquals(Errors.UNKNOWN_MEMBER_ID.code, otherJoinGroupResult.errorCode)
   }
 
   @Test
   def testHeartbeatWrongCoordinator() {
-    val groupId = "groupId"
-    val consumerId = "memberId"
 
-    val heartbeatResult = heartbeat(groupId, consumerId, -1, isCoordinatorForGroup = false)
+    val heartbeatResult = heartbeat(otherGroupId, memberId, -1)
     assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.code, heartbeatResult)
   }
 
   @Test
   def testHeartbeatUnknownGroup() {
-    val groupId = "groupId"
-    val consumerId = "memberId"
 
-    val heartbeatResult = heartbeat(groupId, consumerId, -1, isCoordinatorForGroup = true)
+    val heartbeatResult = heartbeat(groupId, memberId, -1)
     assertEquals(Errors.UNKNOWN_MEMBER_ID.code, heartbeatResult)
   }
 
   @Test
   def testHeartbeatUnknownConsumerExistingGroup() {
-    val groupId = "groupId"
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
     val otherMemberId = "memberId"
-    val metadata = Array[Byte]()
-    val protocolType = "consumer"
-    val protocols = List(("range", metadata))
 
-    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols,
-      isCoordinatorForGroup = true)
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
+    val assignedMemberId = joinGroupResult.memberId
     val joinGroupErrorCode = joinGroupResult.errorCode
     assertEquals(Errors.NONE.code, joinGroupErrorCode)
 
-    EasyMock.reset(offsetManager)
-    val syncGroupResult = syncGroupLeader(groupId, joinGroupResult.generationId, joinGroupResult.memberId, Map.empty, true)
+    EasyMock.reset(replicaManager)
+    val syncGroupResult = syncGroupLeader(groupId, joinGroupResult.generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
     val syncGroupErrorCode = syncGroupResult._2
     assertEquals(Errors.NONE.code, syncGroupErrorCode)
 
-    EasyMock.reset(offsetManager)
-    val heartbeatResult = heartbeat(groupId, otherMemberId, 1, isCoordinatorForGroup = true)
+    EasyMock.reset(replicaManager)
+    val heartbeatResult = heartbeat(groupId, otherMemberId, 1)
     assertEquals(Errors.UNKNOWN_MEMBER_ID.code, heartbeatResult)
   }
 
   @Test
   def testHeartbeatRebalanceInProgress() {
-    val groupId = "groupId"
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
-    val metadata = Array[Byte]()
-    val protocolType = "consumer"
-    val protocols = List(("range", metadata))
 
-    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols,
-      isCoordinatorForGroup = true)
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
     val assignedMemberId = joinGroupResult.memberId
     val joinGroupErrorCode = joinGroupResult.errorCode
     assertEquals(Errors.NONE.code, joinGroupErrorCode)
 
-    EasyMock.reset(offsetManager)
-    val heartbeatResult = heartbeat(groupId, assignedMemberId, 2, isCoordinatorForGroup = true)
+    EasyMock.reset(replicaManager)
+    val heartbeatResult = heartbeat(groupId, assignedMemberId, 2)
     assertEquals(Errors.REBALANCE_IN_PROGRESS.code, heartbeatResult)
   }
 
   @Test
   def testHeartbeatIllegalGeneration() {
-    val groupId = "groupId"
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
-    val metadata = Array[Byte]()
-    val protocolType = "consumer"
-    val protocols = List(("range", metadata))
 
-    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols,
-      isCoordinatorForGroup = true)
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
     val assignedMemberId = joinGroupResult.memberId
     val joinGroupErrorCode = joinGroupResult.errorCode
     assertEquals(Errors.NONE.code, joinGroupErrorCode)
 
-    EasyMock.reset(offsetManager)
-    val syncGroupResult = syncGroupLeader(groupId, joinGroupResult.generationId, assignedMemberId, Map.empty, true)
+    EasyMock.reset(replicaManager)
+    val syncGroupResult = syncGroupLeader(groupId, joinGroupResult.generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
     val syncGroupErrorCode = syncGroupResult._2
     assertEquals(Errors.NONE.code, syncGroupErrorCode)
 
-    EasyMock.reset(offsetManager)
-    val heartbeatResult = heartbeat(groupId, assignedMemberId, 2, isCoordinatorForGroup = true)
+    EasyMock.reset(replicaManager)
+    val heartbeatResult = heartbeat(groupId, assignedMemberId, 2)
     assertEquals(Errors.ILLEGAL_GENERATION.code, heartbeatResult)
   }
 
   @Test
   def testValidHeartbeat() {
-    val groupId = "groupId"
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
-    val metadata = Array[Byte]()
-    val protocolType = "consumer"
-    val protocols = List(("range", metadata))
 
-    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols,
-      isCoordinatorForGroup = true)
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
     val assignedConsumerId = joinGroupResult.memberId
     val generationId = joinGroupResult.generationId
     val joinGroupErrorCode = joinGroupResult.errorCode
     assertEquals(Errors.NONE.code, joinGroupErrorCode)
 
-    EasyMock.reset(offsetManager)
-    val syncGroupResult = syncGroupLeader(groupId, generationId, assignedConsumerId, Map.empty, true)
+    EasyMock.reset(replicaManager)
+    val syncGroupResult = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]()))
     val syncGroupErrorCode = syncGroupResult._2
     assertEquals(Errors.NONE.code, syncGroupErrorCode)
 
-    EasyMock.reset(offsetManager)
-    val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1, isCoordinatorForGroup = true)
+    EasyMock.reset(replicaManager)
+    val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1)
     assertEquals(Errors.NONE.code, heartbeatResult)
   }
 
   @Test
   def testSyncGroupNotCoordinator() {
-    val groupId = "groupId"
-    val memberId = "member"
     val generation = 1
 
-    val syncGroupResult = syncGroupFollower(groupId, generation, memberId, false)
+    val syncGroupResult = syncGroupFollower(otherGroupId, generation, memberId)
     assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.code, syncGroupResult._2)
   }
 
   @Test
   def testSyncGroupFromUnknownGroup() {
-    val groupId = "groupId"
-    val memberId = "member"
     val generation = 1
 
-    val syncGroupResult = syncGroupFollower(groupId, generation, memberId, true)
+    val syncGroupResult = syncGroupFollower(groupId, generation, memberId)
     assertEquals(Errors.UNKNOWN_MEMBER_ID.code, syncGroupResult._2)
   }
 
   @Test
   def testSyncGroupFromUnknownMember() {
-    val groupId = "groupId"
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
-    val metadata = Array[Byte]()
-    val protocolType = "consumer"
-    val protocols = List(("range", metadata))
 
-    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols,
-      isCoordinatorForGroup = true)
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
     val assignedConsumerId = joinGroupResult.memberId
     val generationId = joinGroupResult.generationId
     assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
 
-    EasyMock.reset(offsetManager)
-    val syncGroupResult = syncGroupLeader(groupId, generationId, assignedConsumerId, Map.empty, true)
+    EasyMock.reset(replicaManager)
+    val syncGroupResult = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]()))
     val syncGroupErrorCode = syncGroupResult._2
     assertEquals(Errors.NONE.code, syncGroupErrorCode)
 
-    EasyMock.reset(offsetManager)
+    EasyMock.reset(replicaManager)
     val unknownMemberId = "blah"
-    val unknownMemberSyncResult = syncGroupFollower(groupId, generationId, unknownMemberId, true)
+    val unknownMemberSyncResult = syncGroupFollower(groupId, generationId, unknownMemberId)
     assertEquals(Errors.UNKNOWN_MEMBER_ID.code, unknownMemberSyncResult._2)
   }
 
   @Test
   def testSyncGroupFromIllegalGeneration() {
-    val groupId = "groupId"
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
-    val metadata = Array[Byte]()
-    val protocolType = "consumer"
-    val protocols = List(("range", metadata))
 
-    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols,
-      isCoordinatorForGroup = true)
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
     val assignedConsumerId = joinGroupResult.memberId
     val generationId = joinGroupResult.generationId
     assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
 
-    EasyMock.reset(offsetManager)
+    EasyMock.reset(replicaManager)
     // send the sync group with an invalid generation
-    val syncGroupResult = syncGroupLeader(groupId, generationId+1, assignedConsumerId, Map.empty, true)
+    val syncGroupResult = syncGroupLeader(groupId, generationId+1, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]()))
     assertEquals(Errors.ILLEGAL_GENERATION.code, syncGroupResult._2)
   }
 
   @Test
   def testJoinGroupFromUnchangedFollowerDoesNotRebalance() {
-    val groupId = "groupId"
-    val protocolType = "consumer"
-    val protocols = List(("range", Array[Byte]()))
-
     // to get a group of two members:
     // 1. join and sync with a single member (because we can't immediately join with two members)
     // 2. join and sync with the first member and a new member
 
     val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout,
-      protocolType, protocols, isCoordinatorForGroup = true)
+      protocolType, protocols)
     val firstMemberId = firstJoinResult.memberId
     val firstGenerationId = firstJoinResult.generationId
     assertEquals(firstMemberId, firstJoinResult.leaderId)
     assertEquals(Errors.NONE.code, firstJoinResult.errorCode)
 
-    EasyMock.reset(offsetManager)
-    val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map.empty, true)
+    EasyMock.reset(replicaManager)
+    val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]()))
     assertEquals(Errors.NONE.code, firstSyncResult._2)
 
-    EasyMock.reset(offsetManager)
+    EasyMock.reset(replicaManager)
     val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout,
-      protocolType, protocols, isCoordinatorForGroup = true)
+      protocolType, protocols)
 
-    EasyMock.reset(offsetManager)
-    val joinFuture = sendJoinGroup(groupId, firstMemberId, DefaultSessionTimeout, protocolType, protocols,
-      isCoordinatorForGroup = true);
+    EasyMock.reset(replicaManager)
+    val joinFuture = sendJoinGroup(groupId, firstMemberId, DefaultSessionTimeout, protocolType, protocols)
 
     val joinResult = await(joinFuture, DefaultSessionTimeout+100)
     val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
@@ -424,9 +371,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val nextGenerationId = joinResult.generationId
 
     // this shouldn't cause a rebalance since protocol information hasn't changed
-    EasyMock.reset(offsetManager)
-    val followerJoinResult = joinGroup(groupId, otherJoinResult.memberId, DefaultSessionTimeout, protocolType, protocols,
-      isCoordinatorForGroup = true)
+    EasyMock.reset(replicaManager)
+    val followerJoinResult = joinGroup(groupId, otherJoinResult.memberId, DefaultSessionTimeout, protocolType, protocols)
 
     assertEquals(Errors.NONE.code, followerJoinResult.errorCode)
     assertEquals(nextGenerationId, followerJoinResult.generationId)
@@ -434,27 +380,22 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
   @Test
   def testJoinGroupFromUnchangedLeaderShouldRebalance() {
-    val groupId = "groupId"
-    val protocolType = "consumer"
-    val protocols = List(("range", Array[Byte]()))
-
     val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout,
-      protocolType, protocols, isCoordinatorForGroup = true)
+      protocolType, protocols)
     val firstMemberId = firstJoinResult.memberId
     val firstGenerationId = firstJoinResult.generationId
     assertEquals(firstMemberId, firstJoinResult.leaderId)
     assertEquals(Errors.NONE.code, firstJoinResult.errorCode)
 
-    EasyMock.reset(offsetManager)
-    val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map.empty, true)
+    EasyMock.reset(replicaManager)
+    val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]()))
     assertEquals(Errors.NONE.code, firstSyncResult._2)
 
     // join groups from the leader should force the group to rebalance, which allows the
     // leader to push new assignments when local metadata changes
 
-    EasyMock.reset(offsetManager)
-    val secondJoinResult = joinGroup(groupId, firstMemberId, DefaultSessionTimeout, protocolType, protocols,
-      isCoordinatorForGroup = true)
+    EasyMock.reset(replicaManager)
+    val secondJoinResult = joinGroup(groupId, firstMemberId, DefaultSessionTimeout, protocolType, protocols)
 
     assertEquals(Errors.NONE.code, secondJoinResult.errorCode)
     assertNotEquals(firstGenerationId, secondJoinResult.generationId)
@@ -462,32 +403,27 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
   @Test
   def testLeaderFailureInSyncGroup() {
-    val groupId = "groupId"
-    val protocolType = "consumer"
-    val protocols = List(("range", Array[Byte]()))
-
     // to get a group of two members:
     // 1. join and sync with a single member (because we can't immediately join with two members)
     // 2. join and sync with the first member and a new member
 
     val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout,
-      protocolType, protocols, isCoordinatorForGroup = true)
+      protocolType, protocols)
     val firstMemberId = firstJoinResult.memberId
     val firstGenerationId = firstJoinResult.generationId
     assertEquals(firstMemberId, firstJoinResult.leaderId)
     assertEquals(Errors.NONE.code, firstJoinResult.errorCode)
 
-    EasyMock.reset(offsetManager)
-    val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map.empty, true)
+    EasyMock.reset(replicaManager)
+    val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]()))
     assertEquals(Errors.NONE.code, firstSyncResult._2)
 
-    EasyMock.reset(offsetManager)
+    EasyMock.reset(replicaManager)
     val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout,
-      protocolType, protocols, isCoordinatorForGroup = true)
+      protocolType, protocols)
 
-    EasyMock.reset(offsetManager)
-    val joinFuture = sendJoinGroup(groupId, firstMemberId, DefaultSessionTimeout, protocolType, protocols,
-      isCoordinatorForGroup = true);
+    EasyMock.reset(replicaManager)
+    val joinFuture = sendJoinGroup(groupId, firstMemberId, DefaultSessionTimeout, protocolType, protocols)
 
     val joinResult = await(joinFuture, DefaultSessionTimeout+100)
     val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
@@ -502,41 +438,35 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
     // with no leader SyncGroup, the follower's request should failure with an error indicating
     // that it should rejoin
-    EasyMock.reset(offsetManager)
-    val followerSyncFuture= sendSyncGroupFollower(groupId, nextGenerationId, otherJoinResult.memberId,
-      isCoordinatorForGroup = true)
+    EasyMock.reset(replicaManager)
+    val followerSyncFuture= sendSyncGroupFollower(groupId, nextGenerationId, otherJoinResult.memberId)
     val followerSyncResult = await(followerSyncFuture, DefaultSessionTimeout+100)
     assertEquals(Errors.REBALANCE_IN_PROGRESS.code, followerSyncResult._2)
   }
 
   @Test
   def testSyncGroupFollowerAfterLeader() {
-    val groupId = "groupId"
-    val protocolType = "consumer"
-    val protocols = List(("range", Array[Byte]()))
-
     // to get a group of two members:
     // 1. join and sync with a single member (because we can't immediately join with two members)
     // 2. join and sync with the first member and a new member
 
     val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout,
-      protocolType, protocols, isCoordinatorForGroup = true)
+      protocolType, protocols)
     val firstMemberId = firstJoinResult.memberId
     val firstGenerationId = firstJoinResult.generationId
     assertEquals(firstMemberId, firstJoinResult.leaderId)
     assertEquals(Errors.NONE.code, firstJoinResult.errorCode)
 
-    EasyMock.reset(offsetManager)
-    val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map.empty, true)
+    EasyMock.reset(replicaManager)
+    val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]()))
     assertEquals(Errors.NONE.code, firstSyncResult._2)
 
-    EasyMock.reset(offsetManager)
+    EasyMock.reset(replicaManager)
     val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout,
-      protocolType, protocols, isCoordinatorForGroup = true)
+      protocolType, protocols)
 
-    EasyMock.reset(offsetManager)
-    val joinFuture = sendJoinGroup(groupId, firstMemberId, DefaultSessionTimeout, protocolType, protocols,
-      isCoordinatorForGroup = true);
+    EasyMock.reset(replicaManager)
+    val joinFuture = sendJoinGroup(groupId, firstMemberId, DefaultSessionTimeout, protocolType, protocols)
 
     val joinResult = await(joinFuture, DefaultSessionTimeout+100)
     val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
@@ -553,48 +483,42 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val followerId = otherJoinResult.memberId
     val followerAssignment = Array[Byte](1)
 
-    EasyMock.reset(offsetManager)
+    EasyMock.reset(replicaManager)
     val leaderSyncResult = syncGroupLeader(groupId, nextGenerationId, leaderId,
-      Map(leaderId -> leaderAssignment, followerId -> followerAssignment), isCoordinatorForGroup = true)
+      Map(leaderId -> leaderAssignment, followerId -> followerAssignment))
     assertEquals(Errors.NONE.code, leaderSyncResult._2)
     assertEquals(leaderAssignment, leaderSyncResult._1)
 
-    EasyMock.reset(offsetManager)
-    val followerSyncResult = syncGroupFollower(groupId, nextGenerationId, otherJoinResult.memberId,
-      isCoordinatorForGroup = true)
+    EasyMock.reset(replicaManager)
+    val followerSyncResult = syncGroupFollower(groupId, nextGenerationId, otherJoinResult.memberId)
     assertEquals(Errors.NONE.code, followerSyncResult._2)
     assertEquals(followerAssignment, followerSyncResult._1)
   }
 
   @Test
   def testSyncGroupLeaderAfterFollower() {
-    val groupId = "groupId"
-    val protocolType = "consumer"
-    val protocols = List(("range", Array[Byte]()))
-
     // to get a group of two members:
     // 1. join and sync with a single member (because we can't immediately join with two members)
     // 2. join and sync with the first member and a new member
 
     val joinGroupResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout,
-      protocolType, protocols, isCoordinatorForGroup = true)
+      protocolType, protocols)
     val firstMemberId = joinGroupResult.memberId
     val firstGenerationId = joinGroupResult.generationId
     assertEquals(firstMemberId, joinGroupResult.leaderId)
     assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
 
-    EasyMock.reset(offsetManager)
-    val syncGroupResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map.empty, true)
+    EasyMock.reset(replicaManager)
+    val syncGroupResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]()))
     val syncGroupErrorCode = syncGroupResult._2
     assertEquals(Errors.NONE.code, syncGroupErrorCode)
 
-    EasyMock.reset(offsetManager)
+    EasyMock.reset(replicaManager)
     val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout,
-      protocolType, protocols, isCoordinatorForGroup = true)
+      protocolType, protocols)
 
-    EasyMock.reset(offsetManager)
-    val joinFuture = sendJoinGroup(groupId, firstMemberId, DefaultSessionTimeout, protocolType, protocols,
-      isCoordinatorForGroup = true);
+    EasyMock.reset(replicaManager)
+    val joinFuture = sendJoinGroup(groupId, firstMemberId, DefaultSessionTimeout, protocolType, protocols)
 
     val joinResult = await(joinFuture, DefaultSessionTimeout+100)
     val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
@@ -611,12 +535,12 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     assertEquals(firstMemberId, joinResult.leaderId)
     assertEquals(firstMemberId, otherJoinResult.leaderId)
 
-    EasyMock.reset(offsetManager)
-    val followerSyncFuture = sendSyncGroupFollower(groupId, nextGenerationId, followerId, isCoordinatorForGroup = true)
+    EasyMock.reset(replicaManager)
+    val followerSyncFuture = sendSyncGroupFollower(groupId, nextGenerationId, followerId)
 
-    EasyMock.reset(offsetManager)
+    EasyMock.reset(replicaManager)
     val leaderSyncResult = syncGroupLeader(groupId, nextGenerationId, leaderId,
-      Map(leaderId -> leaderAssignment, followerId -> followerAssignment), isCoordinatorForGroup = true)
+      Map(leaderId -> leaderAssignment, followerId -> followerAssignment))
     assertEquals(Errors.NONE.code, leaderSyncResult._2)
     assertEquals(leaderAssignment, leaderSyncResult._1)
 
@@ -627,94 +551,74 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
   @Test
   def testCommitOffsetFromUnknownGroup() {
-    val groupId = "groupId"
-    val consumerId = "consumer"
     val generationId = 1
     val tp = new TopicAndPartition("topic", 0)
     val offset = OffsetAndMetadata(0)
 
-    val commitOffsetResult = commitOffsets(groupId, consumerId, generationId, Map(tp -> offset), true)
+    val commitOffsetResult = commitOffsets(groupId, memberId, generationId, immutable.Map(tp -> offset))
     assertEquals(Errors.ILLEGAL_GENERATION.code, commitOffsetResult(tp))
   }
 
   @Test
   def testCommitOffsetWithDefaultGeneration() {
-    val groupId = "groupId"
     val tp = new TopicAndPartition("topic", 0)
     val offset = OffsetAndMetadata(0)
 
     val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
-      OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset), true)
+      OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp -> offset))
     assertEquals(Errors.NONE.code, commitOffsetResult(tp))
   }
 
   @Test
   def testCommitOffsetInAwaitingSync() {
-    val groupId = "groupId"
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
-    val metadata = Array[Byte]()
-    val protocolType = "consumer"
-    val protocols = List(("range", metadata))
     val tp = new TopicAndPartition("topic", 0)
     val offset = OffsetAndMetadata(0)
 
-    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols,
-      isCoordinatorForGroup = true)
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
     val assignedMemberId = joinGroupResult.memberId
     val generationId = joinGroupResult.generationId
     val joinGroupErrorCode = joinGroupResult.errorCode
     assertEquals(Errors.NONE.code, joinGroupErrorCode)
 
-    EasyMock.reset(offsetManager)
-    val commitOffsetResult = commitOffsets(groupId, assignedMemberId, generationId, Map(tp -> offset), true)
+    EasyMock.reset(replicaManager)
+    val commitOffsetResult = commitOffsets(groupId, assignedMemberId, generationId, immutable.Map(tp -> offset))
     assertEquals(Errors.REBALANCE_IN_PROGRESS.code, commitOffsetResult(tp))
   }
 
   @Test
   def testHeartbeatDuringRebalanceCausesRebalanceInProgress() {
-    val groupId = "groupId"
-    val metadata = Array[Byte]()
-    val protocolType = "consumer"
-    val protocols = List(("range", metadata))
-
     // First start up a group (with a slightly larger timeout to give us time to heartbeat when the rebalance starts)
     val joinGroupResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout,
-      protocolType, protocols, isCoordinatorForGroup = true)
+      protocolType, protocols)
     val assignedConsumerId = joinGroupResult.memberId
     val initialGenerationId = joinGroupResult.generationId
     val joinGroupErrorCode = joinGroupResult.errorCode
     assertEquals(Errors.NONE.code, joinGroupErrorCode)
 
     // Then join with a new consumer to trigger a rebalance
-    EasyMock.reset(offsetManager)
-    sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout, protocolType, protocols,
-      isCoordinatorForGroup = true)
+    EasyMock.reset(replicaManager)
+    sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout, protocolType, protocols)
 
     // We should be in the middle of a rebalance, so the heartbeat should return rebalance in progress
-    EasyMock.reset(offsetManager)
-    val heartbeatResult = heartbeat(groupId, assignedConsumerId, initialGenerationId, isCoordinatorForGroup = true)
+    EasyMock.reset(replicaManager)
+    val heartbeatResult = heartbeat(groupId, assignedConsumerId, initialGenerationId)
     assertEquals(Errors.REBALANCE_IN_PROGRESS.code, heartbeatResult)
   }
 
   @Test
   def testGenerationIdIncrementsOnRebalance() {
-    val groupId = "groupId"
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
     val otherMemberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
-    val metadata = Array[Byte]()
-    val protocolType = "consumer"
-    val protocols = List(("range", metadata))
 
-    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols,
-      isCoordinatorForGroup = true)
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
     val initialGenerationId = joinGroupResult.generationId
     val joinGroupErrorCode = joinGroupResult.errorCode
     assertEquals(1, initialGenerationId)
     assertEquals(Errors.NONE.code, joinGroupErrorCode)
 
-    EasyMock.reset(offsetManager)
-    val otherJoinGroupResult = joinGroup(groupId, otherMemberId, DefaultSessionTimeout, protocolType, protocols,
-      isCoordinatorForGroup = true)
+    EasyMock.reset(replicaManager)
+    val otherJoinGroupResult = joinGroup(groupId, otherMemberId, DefaultSessionTimeout, protocolType, protocols)
     val nextGenerationId = otherJoinGroupResult.generationId
     val otherJoinGroupErrorCode = otherJoinGroupResult.errorCode
     assertEquals(2, nextGenerationId)
@@ -723,57 +627,44 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
   @Test
   def testLeaveGroupWrongCoordinator() {
-    val groupId = "groupId"
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
-    val leaveGroupResult = leaveGroup(groupId, memberId, isCoordinatorForGroup = false)
+    val leaveGroupResult = leaveGroup(otherGroupId, memberId)
     assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.code, leaveGroupResult)
   }
 
   @Test
   def testLeaveGroupUnknownGroup() {
-    val groupId = "groupId"
-    val memberId = "consumerId"
 
-    val leaveGroupResult = leaveGroup(groupId, memberId, isCoordinatorForGroup = true)
+    val leaveGroupResult = leaveGroup(groupId, memberId)
     assertEquals(Errors.UNKNOWN_MEMBER_ID.code, leaveGroupResult)
   }
 
   @Test
   def testLeaveGroupUnknownConsumerExistingGroup() {
-    val groupId = "groupId"
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
     val otherMemberId = "consumerId"
-    val metadata = Array[Byte]()
-    val protocolType = "consumer"
-    val protocols = List(("range", metadata))
 
-    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols,
-      isCoordinatorForGroup = true)
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
     val joinGroupErrorCode = joinGroupResult.errorCode
     assertEquals(Errors.NONE.code, joinGroupErrorCode)
 
-    EasyMock.reset(offsetManager)
-    val leaveGroupResult = leaveGroup(groupId, otherMemberId, isCoordinatorForGroup = true)
+    EasyMock.reset(replicaManager)
+    val leaveGroupResult = leaveGroup(groupId, otherMemberId)
     assertEquals(Errors.UNKNOWN_MEMBER_ID.code, leaveGroupResult)
   }
 
   @Test
   def testValidLeaveGroup() {
-    val groupId = "groupId"
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
-    val metadata = Array[Byte]()
-    val protocolType = "consumer"
-    val protocols = List(("range", metadata))
 
-    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols,
-      isCoordinatorForGroup = true)
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
     val assignedMemberId = joinGroupResult.memberId
     val joinGroupErrorCode = joinGroupResult.errorCode
     assertEquals(Errors.NONE.code, joinGroupErrorCode)
 
-    EasyMock.reset(offsetManager)
-    val leaveGroupResult = leaveGroup(groupId, assignedMemberId, isCoordinatorForGroup = true)
+    EasyMock.reset(replicaManager)
+    val leaveGroupResult = leaveGroup(groupId, assignedMemberId)
     assertEquals(Errors.NONE.code, leaveGroupResult)
   }
 
@@ -810,13 +701,12 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
                             memberId: String,
                             sessionTimeout: Int,
                             protocolType: String,
-                            protocols: List[(String, Array[Byte])],
-                            isCoordinatorForGroup: Boolean): Future[JoinGroupResult] = {
+                            protocols: List[(String, Array[Byte])]): Future[JoinGroupResult] = {
     val (responseFuture, responseCallback) = setupJoinGroupCallback
-    EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1)
-    EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup)
-    EasyMock.replay(offsetManager)
-    consumerCoordinator.handleJoinGroup(groupId, memberId, sessionTimeout, protocolType, protocols, responseCallback)
+
+    EasyMock.replay(replicaManager)
+
+    consumerCoordinator.handleJoinGroup(groupId, memberId, "clientId", sessionTimeout, protocolType, protocols, responseCallback)
     responseFuture
   }
 
@@ -824,24 +714,34 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
   private def sendSyncGroupLeader(groupId: String,
                                   generation: Int,
                                   leaderId: String,
-                                  assignment: Map[String, Array[Byte]],
-                                  isCoordinatorForGroup: Boolean): Future[SyncGroupCallbackParams] = {
+                                  assignment: Map[String, Array[Byte]]): Future[SyncGroupCallbackParams] = {
     val (responseFuture, responseCallback) = setupSyncGroupCallback
-    EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1)
-    EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup)
-    EasyMock.replay(offsetManager)
+
+    val capturedArgument: Capture[Map[TopicAndPartition, ProducerResponseStatus] => Unit] = EasyMock.newCapture()
+
+    EasyMock.expect(replicaManager.appendMessages(EasyMock.anyLong(),
+      EasyMock.anyShort(),
+      EasyMock.anyBoolean(),
+      EasyMock.anyObject().asInstanceOf[Map[TopicAndPartition, MessageSet]],
+      EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
+      override def answer = capturedArgument.getValue.apply(
+        Map(TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, groupPartitionId) ->
+          new ProducerResponseStatus(Errors.NONE.code, 0L)
+        )
+      )})
+    EasyMock.replay(replicaManager)
+
     consumerCoordinator.handleSyncGroup(groupId, generation, leaderId, assignment, responseCallback)
     responseFuture
   }
 
   private def sendSyncGroupFollower(groupId: String,
                                     generation: Int,
-                                    memberId: String,
-                                    isCoordinatorForGroup: Boolean): Future[SyncGroupCallbackParams] = {
+                                    memberId: String): Future[SyncGroupCallbackParams] = {
     val (responseFuture, responseCallback) = setupSyncGroupCallback
-    EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1)
-    EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup)
-    EasyMock.replay(offsetManager)
+
+    EasyMock.replay(replicaManager)
+
     consumerCoordinator.handleSyncGroup(groupId, generation, memberId, Map.empty[String, Array[Byte]], responseCallback)
     responseFuture
   }
@@ -850,9 +750,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
                         memberId: String,
                         sessionTimeout: Int,
                         protocolType: String,
-                        protocols: List[(String, Array[Byte])],
-                        isCoordinatorForGroup: Boolean): JoinGroupResult = {
-    val responseFuture = sendJoinGroup(groupId, memberId, sessionTimeout, protocolType, protocols, isCoordinatorForGroup)
+                        protocols: List[(String, Array[Byte])]): JoinGroupResult = {
+    val responseFuture = sendJoinGroup(groupId, memberId, sessionTimeout, protocolType, protocols)
     // should only have to wait as long as session timeout, but allow some extra time in case of an unexpected delay
     Await.result(responseFuture, Duration(sessionTimeout+100, TimeUnit.MILLISECONDS))
   }
@@ -860,29 +759,26 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
   private def syncGroupFollower(groupId: String,
                                 generationId: Int,
-                                memberId: String,
-                                isCoordinatorForGroup: Boolean): SyncGroupCallbackParams = {
-    val responseFuture = sendSyncGroupFollower(groupId, generationId, memberId, isCoordinatorForGroup)
+                                memberId: String): SyncGroupCallbackParams = {
+    val responseFuture = sendSyncGroupFollower(groupId, generationId, memberId)
     Await.result(responseFuture, Duration(DefaultSessionTimeout+100, TimeUnit.MILLISECONDS))
   }
 
   private def syncGroupLeader(groupId: String,
                               generationId: Int,
                               memberId: String,
-                              assignment: Map[String, Array[Byte]],
-                              isCoordinatorForGroup: Boolean): SyncGroupCallbackParams = {
-    val responseFuture = sendSyncGroupLeader(groupId, generationId, memberId, assignment, isCoordinatorForGroup)
+                              assignment: Map[String, Array[Byte]]): SyncGroupCallbackParams = {
+    val responseFuture = sendSyncGroupLeader(groupId, generationId, memberId, assignment)
     Await.result(responseFuture, Duration(DefaultSessionTimeout+100, TimeUnit.MILLISECONDS))
   }
 
   private def heartbeat(groupId: String,
                         consumerId: String,
-                        generationId: Int,
-                        isCoordinatorForGroup: Boolean): HeartbeatCallbackParams = {
+                        generationId: Int): HeartbeatCallbackParams = {
     val (responseFuture, responseCallback) = setupHeartbeatCallback
-    EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1)
-    EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup)
-    EasyMock.replay(offsetManager)
+
+    EasyMock.replay(replicaManager)
+
     consumerCoordinator.handleHeartbeat(groupId, consumerId, generationId, responseCallback)
     Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
   }
@@ -894,26 +790,33 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
   private def commitOffsets(groupId: String,
                             consumerId: String,
                             generationId: Int,
-                            offsets: Map[TopicAndPartition, OffsetAndMetadata],
-                            isCoordinatorForGroup: Boolean): CommitOffsetCallbackParams = {
+                            offsets: immutable.Map[TopicAndPartition, OffsetAndMetadata]): CommitOffsetCallbackParams = {
     val (responseFuture, responseCallback) = setupCommitOffsetsCallback
-    EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1)
-    EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup)
-    val storeOffsetAnswer = new IAnswer[Unit] {
-      override def answer = responseCallback.apply(offsets.mapValues(_ => Errors.NONE.code))
-    }
-    EasyMock.expect(offsetManager.storeOffsets(groupId, consumerId, generationId, offsets, responseCallback))
-      .andAnswer(storeOffsetAnswer)
-    EasyMock.replay(offsetManager)
+
+    val capturedArgument: Capture[Map[TopicAndPartition, ProducerResponseStatus] => Unit] = EasyMock.newCapture()
+
+    EasyMock.expect(replicaManager.appendMessages(EasyMock.anyLong(),
+      EasyMock.anyShort(),
+      EasyMock.anyBoolean(),
+      EasyMock.anyObject().asInstanceOf[Map[TopicAndPartition, MessageSet]],
+      EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
+      override def answer = capturedArgument.getValue.apply(
+        Map(TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, groupPartitionId) ->
+          new ProducerResponseStatus(Errors.NONE.code, 0L)
+        )
+      )})
+    EasyMock.replay(replicaManager)
+
     consumerCoordinator.handleCommitOffsets(groupId, consumerId, generationId, offsets, responseCallback)
     Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
   }
 
-  private def leaveGroup(groupId: String, consumerId: String, isCoordinatorForGroup: Boolean): LeaveGroupCallbackParams = {
+  private def leaveGroup(groupId: String, consumerId: String): LeaveGroupCallbackParams = {
     val (responseFuture, responseCallback) = setupHeartbeatCallback
-    EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1)
-    EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup)
-    EasyMock.replay(offsetManager)
+
+    EasyMock.expect(replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, groupPartitionId)).andReturn(None)
+    EasyMock.replay(replicaManager)
+
     consumerCoordinator.handleLeaveGroup(groupId, consumerId, responseCallback)
     Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
index 0f3e748..021aea6 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
@@ -97,11 +97,6 @@ class GroupMetadataTest extends JUnitSuite {
   }
 
   @Test(expected = classOf[IllegalStateException])
-  def testStableToDeadIllegalTransition() {
-    group.transitionTo(Dead)
-  }
-
-  @Test(expected = classOf[IllegalStateException])
   def testPreparingRebalanceToPreparingRebalanceIllegalTransition() {
     group.transitionTo(PreparingRebalance)
     group.transitionTo(PreparingRebalance)
@@ -121,13 +116,6 @@ class GroupMetadataTest extends JUnitSuite {
   }
 
   @Test(expected = classOf[IllegalStateException])
-  def testAwaitingSyncToDeadIllegalTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(AwaitingSync)
-    group.transitionTo(Dead)
-  }
-
-  @Test(expected = classOf[IllegalStateException])
   def testDeadToDeadIllegalTransition() {
     group.transitionTo(PreparingRebalance)
     group.transitionTo(Dead)


Mime
View raw message