kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: KAFKA-2720: expire group metadata when all offsets have expired
Date Thu, 16 Jun 2016 02:47:24 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk fb42558e2 -> 8c551675a


http://git-wip-us.apache.org/repos/asf/kafka/blob/8c551675/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
new file mode 100644
index 0000000..0bd6d71
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
@@ -0,0 +1,407 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.coordinator
+
+import kafka.cluster.Partition
+import kafka.common.OffsetAndMetadata
+import kafka.log.LogAppendInfo
+import kafka.message.{ByteBufferMessageSet, Message, MessageSet}
+import kafka.server.{KafkaConfig, ReplicaManager}
+import kafka.utils.{KafkaScheduler, MockTime, TestUtils, ZkUtils}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.internals.TopicConstants
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.Record
+import org.apache.kafka.common.requests.OffsetFetchResponse
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.easymock.{Capture, EasyMock, IAnswer}
+import org.junit.{After, Before, Test}
+import org.junit.Assert._
+
+import scala.collection._
+
+class GroupMetadataManagerTest {
+
+  var time: MockTime = null
+  var replicaManager: ReplicaManager = null
+  var groupMetadataManager: GroupMetadataManager = null
+  var scheduler: KafkaScheduler = null
+  var zkUtils: ZkUtils = null
+  var partition: Partition = null
+
+  val groupId = "foo"
+  val groupPartitionId = 0
+  val protocolType = "protocolType"
+  val sessionTimeout = 30000
+
+
+  @Before
+  def setUp() {
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(nodeId = 0, zkConnect
= ""))
+
+    val offsetConfig = OffsetConfig(maxMetadataSize = config.offsetMetadataMaxSize,
+      loadBufferSize = config.offsetsLoadBufferSize,
+      offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
+      offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
+      offsetsTopicNumPartitions = config.offsetsTopicPartitions,
+      offsetsTopicSegmentBytes = config.offsetsTopicSegmentBytes,
+      offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
+      offsetsTopicCompressionCodec = config.offsetsTopicCompressionCodec,
+      offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
+      offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
+
+    // make two partitions of the group topic to make sure some partitions are not owned
by the coordinator
+    val ret = mutable.Map[String, Map[Int, Seq[Int]]]()
+    ret += (TopicConstants.GROUP_METADATA_TOPIC_NAME -> Map(0 -> Seq(1), 1 -> Seq(1)))
+
+    zkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
+    EasyMock.expect(zkUtils.getPartitionAssignmentForTopics(Seq(TopicConstants.GROUP_METADATA_TOPIC_NAME))).andReturn(ret)
+    EasyMock.replay(zkUtils)
+
+    time = new MockTime
+    replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
+    groupMetadataManager = new GroupMetadataManager(0, offsetConfig, replicaManager, zkUtils,
time)
+    partition = EasyMock.niceMock(classOf[Partition])
+
+  }
+
+  @After
+  def tearDown() {
+    EasyMock.reset(replicaManager)
+    EasyMock.reset(partition)
+  }
+
+  @Test
+  def testAddGroup() {
+    val group = new GroupMetadata("foo")
+    assertEquals(group, groupMetadataManager.addGroup(group))
+    assertEquals(group, groupMetadataManager.addGroup(new GroupMetadata("foo")))
+  }
+
+  @Test
+  def testStoreEmptyGroup() {
+    val group = new GroupMetadata(groupId)
+    groupMetadataManager.addGroup(group)
+
+    expectAppendMessage(Errors.NONE)
+    EasyMock.replay(replicaManager)
+
+    var errorCode: Option[Short] = None
+    def callback(error: Short) {
+      errorCode = Some(error)
+    }
+
+    val delayedStore = groupMetadataManager.prepareStoreGroup(group, Map.empty, callback)
+    groupMetadataManager.store(delayedStore)
+    assertEquals(Errors.NONE.code, errorCode.get)
+  }
+
+  @Test
+  def testStoreNonEmptyGroup() {
+    val memberId = "memberId"
+    val clientId = "clientId"
+    val clientHost = "localhost"
+
+    val group = new GroupMetadata(groupId)
+    groupMetadataManager.addGroup(group)
+
+    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeout,
+      protocolType, List(("protocol", Array[Byte]())))
+    member.awaitingJoinCallback = (joinGroupResult: JoinGroupResult) => {}
+    group.add(memberId, member)
+    group.transitionTo(PreparingRebalance)
+    group.initNextGeneration()
+
+    expectAppendMessage(Errors.NONE)
+    EasyMock.replay(replicaManager)
+
+    var errorCode: Option[Short] = None
+    def callback(error: Short) {
+      errorCode = Some(error)
+    }
+
+    val delayedStore = groupMetadataManager.prepareStoreGroup(group, Map(memberId -> Array[Byte]()),
callback)
+    groupMetadataManager.store(delayedStore)
+    assertEquals(Errors.NONE.code, errorCode.get)
+  }
+
+  @Test
+  def testCommitOffset() {
+    val memberId = ""
+    val generationId = -1
+    val topicPartition = new TopicPartition("foo", 0)
+    val offset = 37
+
+    groupMetadataManager.addPartitionOwnership(groupPartitionId)
+
+    val group = new GroupMetadata(groupId)
+    groupMetadataManager.addGroup(group)
+
+    val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
+
+    expectAppendMessage(Errors.NONE)
+    EasyMock.replay(replicaManager)
+
+    var commitErrors: Option[immutable.Map[TopicPartition, Short]] = None
+    def callback(errors: immutable.Map[TopicPartition, Short]) {
+      commitErrors = Some(errors)
+    }
+
+    val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId,
offsets, callback)
+    assertTrue(group.hasOffsets)
+
+    groupMetadataManager.store(delayedStore)
+
+    assertFalse(commitErrors.isEmpty)
+    val maybeError = commitErrors.get.get(topicPartition)
+    assertEquals(Some(Errors.NONE.code), maybeError)
+    assertTrue(group.hasOffsets)
+
+    val cachedOffsets = groupMetadataManager.getOffsets(groupId, Seq(topicPartition))
+    val maybePartitionResponse = cachedOffsets.get(topicPartition)
+    assertFalse(maybePartitionResponse.isEmpty)
+
+    val partitionResponse = maybePartitionResponse.get
+    assertEquals(Errors.NONE.code, partitionResponse.errorCode)
+    assertEquals(offset, partitionResponse.offset)
+  }
+
+  @Test
+  def testCommitOffsetFailure() {
+    val memberId = ""
+    val generationId = -1
+    val topicPartition = new TopicPartition("foo", 0)
+    val offset = 37
+
+    groupMetadataManager.addPartitionOwnership(groupPartitionId)
+
+    val group = new GroupMetadata(groupId)
+    groupMetadataManager.addGroup(group)
+
+    val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
+
+    expectAppendMessage(Errors.NOT_LEADER_FOR_PARTITION)
+    EasyMock.replay(replicaManager)
+
+    var commitErrors: Option[immutable.Map[TopicPartition, Short]] = None
+    def callback(errors: immutable.Map[TopicPartition, Short]) {
+      commitErrors = Some(errors)
+    }
+
+    val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId,
offsets, callback)
+    assertTrue(group.hasOffsets)
+
+    groupMetadataManager.store(delayedStore)
+
+    assertFalse(commitErrors.isEmpty)
+    val maybeError = commitErrors.get.get(topicPartition)
+    assertEquals(Some(Errors.NOT_COORDINATOR_FOR_GROUP.code), maybeError)
+    assertFalse(group.hasOffsets)
+
+    val cachedOffsets = groupMetadataManager.getOffsets(groupId, Seq(topicPartition))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition).map(_.offset))
+  }
+
+  @Test
+  def testExpireOffset() {
+    val memberId = ""
+    val generationId = -1
+    val topicPartition1 = new TopicPartition("foo", 0)
+    val topicPartition2 = new TopicPartition("foo", 1)
+    val offset = 37
+
+    groupMetadataManager.addPartitionOwnership(groupPartitionId)
+
+    val group = new GroupMetadata(groupId)
+    groupMetadataManager.addGroup(group)
+
+    // expire the offset after 1 millisecond
+    val startMs = time.milliseconds
+    val offsets = immutable.Map(
+      topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1),
+      topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
+
+    EasyMock.expect(replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME,
groupPartitionId)).andStubReturn(Some(partition))
+    expectAppendMessage(Errors.NONE)
+    EasyMock.replay(replicaManager)
+
+    var commitErrors: Option[immutable.Map[TopicPartition, Short]] = None
+    def callback(errors: immutable.Map[TopicPartition, Short]) {
+      commitErrors = Some(errors)
+    }
+
+    val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId,
offsets, callback)
+    assertTrue(group.hasOffsets)
+
+    groupMetadataManager.store(delayedStore)
+    assertFalse(commitErrors.isEmpty)
+    assertEquals(Some(Errors.NONE.code), commitErrors.get.get(topicPartition1))
+
+    // expire only one of the offsets
+    time.sleep(2)
+
+    EasyMock.reset(partition)
+    EasyMock.expect(partition.appendMessagesToLeader(EasyMock.anyObject(classOf[ByteBufferMessageSet]),
EasyMock.anyInt()))
+      .andReturn(LogAppendInfo.UnknownLogAppendInfo)
+    EasyMock.replay(partition)
+
+    groupMetadataManager.cleanupGroupMetadata()
+
+    assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
+    assertEquals(None, group.offset(topicPartition1))
+    assertEquals(Some(offset), group.offset(topicPartition2).map(_.offset))
+
+    val cachedOffsets = groupMetadataManager.getOffsets(groupId, Seq(topicPartition1, topicPartition2))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset))
+    assertEquals(Some(offset), cachedOffsets.get(topicPartition2).map(_.offset))
+  }
+
+  @Test
+  def testExpireGroup() {
+    val memberId = ""
+    val generationId = -1
+    val topicPartition1 = new TopicPartition("foo", 0)
+    val topicPartition2 = new TopicPartition("foo", 1)
+    val offset = 37
+
+    groupMetadataManager.addPartitionOwnership(groupPartitionId)
+
+    val group = new GroupMetadata(groupId)
+    groupMetadataManager.addGroup(group)
+
+    // expire the offset after 1 millisecond
+    val startMs = time.milliseconds
+    val offsets = immutable.Map(
+      topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1),
+      topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
+
+    EasyMock.expect(replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME,
groupPartitionId)).andStubReturn(Some(partition))
+    expectAppendMessage(Errors.NONE)
+    EasyMock.replay(replicaManager)
+
+    var commitErrors: Option[immutable.Map[TopicPartition, Short]] = None
+    def callback(errors: immutable.Map[TopicPartition, Short]) {
+      commitErrors = Some(errors)
+    }
+
+    val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId,
offsets, callback)
+    assertTrue(group.hasOffsets)
+
+    groupMetadataManager.store(delayedStore)
+    assertFalse(commitErrors.isEmpty)
+    assertEquals(Some(Errors.NONE.code), commitErrors.get.get(topicPartition1))
+
+    // expire all of the offsets
+    time.sleep(4)
+
+    // expect the offset tombstone
+    EasyMock.reset(partition)
+    EasyMock.expect(partition.appendMessagesToLeader(EasyMock.anyObject(classOf[ByteBufferMessageSet]),
EasyMock.anyInt()))
+      .andReturn(LogAppendInfo.UnknownLogAppendInfo)
+    EasyMock.replay(partition)
+
+    groupMetadataManager.cleanupGroupMetadata()
+
+    // the full group should be gone since all offsets were removed
+    assertEquals(None, groupMetadataManager.getGroup(groupId))
+    val cachedOffsets = groupMetadataManager.getOffsets(groupId, Seq(topicPartition1, topicPartition2))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition2).map(_.offset))
+  }
+
+  @Test
+  def testExpireOffsetsWithActiveGroup() {
+    val memberId = "memberId"
+    val clientId = "clientId"
+    val clientHost = "localhost"
+    val topicPartition1 = new TopicPartition("foo", 0)
+    val topicPartition2 = new TopicPartition("foo", 1)
+    val offset = 37
+
+    groupMetadataManager.addPartitionOwnership(groupPartitionId)
+
+    val group = new GroupMetadata(groupId)
+    groupMetadataManager.addGroup(group)
+
+    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeout,
+      protocolType, List(("protocol", Array[Byte]())))
+    member.awaitingJoinCallback = (joinGroupResult: JoinGroupResult) => {}
+    group.add(memberId, member)
+    group.transitionTo(PreparingRebalance)
+    group.initNextGeneration()
+
+    // expire the offset after 1 millisecond
+    val startMs = time.milliseconds
+    val offsets = immutable.Map(
+      topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1),
+      topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
+
+    EasyMock.expect(replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME,
groupPartitionId)).andStubReturn(Some(partition))
+    expectAppendMessage(Errors.NONE)
+    EasyMock.replay(replicaManager)
+
+    var commitErrors: Option[immutable.Map[TopicPartition, Short]] = None
+    def callback(errors: immutable.Map[TopicPartition, Short]) {
+      commitErrors = Some(errors)
+    }
+
+    val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, group.generationId,
offsets, callback)
+    assertTrue(group.hasOffsets)
+
+    groupMetadataManager.store(delayedStore)
+    assertFalse(commitErrors.isEmpty)
+    assertEquals(Some(Errors.NONE.code), commitErrors.get.get(topicPartition1))
+
+    // expire all of the offsets
+    time.sleep(4)
+
+    // expect the offset tombstone
+    EasyMock.reset(partition)
+    EasyMock.expect(partition.appendMessagesToLeader(EasyMock.anyObject(classOf[ByteBufferMessageSet]),
EasyMock.anyInt()))
+      .andReturn(LogAppendInfo.UnknownLogAppendInfo)
+    EasyMock.replay(partition)
+
+    groupMetadataManager.cleanupGroupMetadata()
+
+    // group should still be there, but the offsets should be gone
+    assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
+    assertEquals(None, group.offset(topicPartition1))
+    assertEquals(None, group.offset(topicPartition2))
+
+    val cachedOffsets = groupMetadataManager.getOffsets(groupId, Seq(topicPartition1, topicPartition2))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition2).map(_.offset))
+  }
+
+  private def expectAppendMessage(error: Errors) {
+    val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
+    EasyMock.expect(replicaManager.appendMessages(EasyMock.anyLong(),
+      EasyMock.anyShort(),
+      EasyMock.anyBoolean(),
+      EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MessageSet]],
+      EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
+      override def answer = capturedArgument.getValue.apply(
+        Map(new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
->
+          new PartitionResponse(error.code, 0L, Record.NO_TIMESTAMP)
+        )
+      )})
+    EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(Some(Message.MagicValue_V1)).anyTimes()
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c551675/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
index 2846622..18dd143 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
@@ -17,6 +17,8 @@
 
 package kafka.coordinator
 
+import kafka.common.OffsetAndMetadata
+import org.apache.kafka.common.TopicPartition
 import org.junit.Assert._
 import org.junit.{Before, Test}
 import org.scalatest.junit.JUnitSuite
@@ -29,7 +31,7 @@ class GroupMetadataTest extends JUnitSuite {
 
   @Before
   def setUp() {
-    group = new GroupMetadata("groupId", "consumer")
+    group = new GroupMetadata("groupId")
   }
 
   @Test
@@ -53,6 +55,7 @@ class GroupMetadataTest extends JUnitSuite {
   @Test
   def testCannotRebalanceWhenDead() {
     group.transitionTo(PreparingRebalance)
+    group.transitionTo(Empty)
     group.transitionTo(Dead)
     assertFalse(group.canRebalance)
   }
@@ -64,6 +67,12 @@ class GroupMetadataTest extends JUnitSuite {
   }
 
   @Test
+  def testStableToDeadTransition() {
+    group.transitionTo(Dead)
+    assertState(group, Dead)
+  }
+
+  @Test
   def testAwaitingSyncToPreparingRebalanceTransition() {
     group.transitionTo(PreparingRebalance)
     group.transitionTo(AwaitingSync)
@@ -79,6 +88,21 @@ class GroupMetadataTest extends JUnitSuite {
   }
 
   @Test
+  def testPreparingRebalanceToEmptyTransition() {
+    group.transitionTo(PreparingRebalance)
+    group.transitionTo(Empty)
+    assertState(group, Empty)
+  }
+
+  @Test
+  def testEmptyToDeadTransition() {
+    group.transitionTo(PreparingRebalance)
+    group.transitionTo(Empty)
+    group.transitionTo(Dead)
+    assertState(group, Dead)
+  }
+
+  @Test
   def testAwaitingSyncToStableTransition() {
     group.transitionTo(PreparingRebalance)
     group.transitionTo(AwaitingSync)
@@ -115,11 +139,11 @@ class GroupMetadataTest extends JUnitSuite {
     group.transitionTo(AwaitingSync)
   }
 
-  @Test(expected = classOf[IllegalStateException])
   def testDeadToDeadIllegalTransition() {
     group.transitionTo(PreparingRebalance)
     group.transitionTo(Dead)
     group.transitionTo(Dead)
+    assertState(group, Dead)
   }
 
   @Test(expected = classOf[IllegalStateException])
@@ -145,6 +169,7 @@ class GroupMetadataTest extends JUnitSuite {
 
   @Test
   def testSelectProtocol() {
+    val protocolType = "consumer"
     val groupId = "groupId"
     val clientId = "clientId"
     val clientHost = "clientHost"
@@ -152,14 +177,14 @@ class GroupMetadataTest extends JUnitSuite {
 
     val memberId = "memberId"
     val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs,
-      List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))
+      protocolType, List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))
 
     group.add(memberId, member)
     assertEquals("range", group.selectProtocol)
 
     val otherMemberId = "otherMemberId"
     val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, sessionTimeoutMs,
-      List(("roundrobin", Array.empty[Byte]), ("range", Array.empty[Byte])))
+      protocolType, List(("roundrobin", Array.empty[Byte]), ("range", Array.empty[Byte])))
 
     group.add(otherMemberId, otherMember)
     // now could be either range or robin since there is no majority preference
@@ -167,7 +192,7 @@ class GroupMetadataTest extends JUnitSuite {
 
     val lastMemberId = "lastMemberId"
     val lastMember = new MemberMetadata(lastMemberId, groupId, clientId, clientHost, sessionTimeoutMs,
-      List(("roundrobin", Array.empty[Byte]), ("range", Array.empty[Byte])))
+      protocolType, List(("roundrobin", Array.empty[Byte]), ("range", Array.empty[Byte])))
 
     group.add(lastMemberId, lastMember)
     // now we should prefer 'roundrobin'
@@ -182,6 +207,7 @@ class GroupMetadataTest extends JUnitSuite {
 
   @Test
   def testSelectProtocolChoosesCompatibleProtocol() {
+    val protocolType = "consumer"
     val groupId = "groupId"
     val clientId = "clientId"
     val clientHost = "clientHost"
@@ -189,11 +215,11 @@ class GroupMetadataTest extends JUnitSuite {
 
     val memberId = "memberId"
     val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs,
-      List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))
+      protocolType, List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))
 
     val otherMemberId = "otherMemberId"
     val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, sessionTimeoutMs,
-      List(("roundrobin", Array.empty[Byte]), ("blah", Array.empty[Byte])))
+      protocolType, List(("roundrobin", Array.empty[Byte]), ("blah", Array.empty[Byte])))
 
     group.add(memberId, member)
     group.add(otherMemberId, otherMember)
@@ -202,6 +228,7 @@ class GroupMetadataTest extends JUnitSuite {
 
   @Test
   def testSupportsProtocols() {
+    val protocolType = "consumer"
     val groupId = "groupId"
     val clientId = "clientId"
     val clientHost = "clientHost"
@@ -212,7 +239,7 @@ class GroupMetadataTest extends JUnitSuite {
 
     val memberId = "memberId"
     val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs,
-      List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))
+      protocolType, List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))
 
     group.add(memberId, member)
     assertTrue(group.supportsProtocols(Set("roundrobin", "foo")))
@@ -221,7 +248,7 @@ class GroupMetadataTest extends JUnitSuite {
 
     val otherMemberId = "otherMemberId"
     val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, sessionTimeoutMs,
-      List(("roundrobin", Array.empty[Byte]), ("blah", Array.empty[Byte])))
+      protocolType, List(("roundrobin", Array.empty[Byte]), ("blah", Array.empty[Byte])))
 
     group.add(otherMemberId, otherMember)
 
@@ -229,6 +256,116 @@ class GroupMetadataTest extends JUnitSuite {
     assertFalse(group.supportsProtocols(Set("range", "foo")))
   }
 
+  @Test
+  def testInitNextGeneration() {
+    val protocolType = "consumer"
+    val groupId = "groupId"
+    val clientId = "clientId"
+    val clientHost = "clientHost"
+    val sessionTimeoutMs = 10000
+    val memberId = "memberId"
+
+    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs,
+      protocolType, List(("roundrobin", Array.empty[Byte])))
+
+    group.transitionTo(PreparingRebalance)
+    member.awaitingJoinCallback = (result) => {}
+    group.add(memberId, member)
+
+    assertEquals(0, group.generationId)
+    assertNull(group.protocol)
+
+    group.initNextGeneration()
+
+    assertEquals(1, group.generationId)
+    assertEquals("roundrobin", group.protocol)
+  }
+
+  @Test
+  def testInitNextGenerationEmptyGroup() {
+    assertEquals(Empty, group.currentState)
+    assertEquals(0, group.generationId)
+    assertNull(group.protocol)
+
+    group.transitionTo(PreparingRebalance)
+    group.initNextGeneration()
+
+    assertEquals(1, group.generationId)
+    assertNull(group.protocol)
+  }
+
+  @Test
+  def testOffsetCommit(): Unit = {
+    val partition = new TopicPartition("foo", 0)
+    val offset = OffsetAndMetadata(37)
+
+    group.prepareOffsetCommit(Map(partition -> offset))
+    assertTrue(group.hasOffsets)
+    assertEquals(None, group.offset(partition))
+
+    group.completePendingOffsetWrite(partition, offset)
+    assertTrue(group.hasOffsets)
+    assertEquals(Some(offset), group.offset(partition))
+  }
+
+  @Test
+  def testOffsetCommitFailure(): Unit = {
+    val partition = new TopicPartition("foo", 0)
+    val offset = OffsetAndMetadata(37)
+
+    group.prepareOffsetCommit(Map(partition -> offset))
+    assertTrue(group.hasOffsets)
+    assertEquals(None, group.offset(partition))
+
+    group.failPendingOffsetWrite(partition, offset)
+    assertFalse(group.hasOffsets)
+    assertEquals(None, group.offset(partition))
+  }
+
+  @Test
+  def testOffsetCommitFailureWithAnotherPending(): Unit = {
+    val partition = new TopicPartition("foo", 0)
+    val firstOffset = OffsetAndMetadata(37)
+    val secondOffset = OffsetAndMetadata(57)
+
+    group.prepareOffsetCommit(Map(partition -> firstOffset))
+    assertTrue(group.hasOffsets)
+    assertEquals(None, group.offset(partition))
+
+    group.prepareOffsetCommit(Map(partition -> secondOffset))
+    assertTrue(group.hasOffsets)
+
+    group.failPendingOffsetWrite(partition, firstOffset)
+    assertTrue(group.hasOffsets)
+    assertEquals(None, group.offset(partition))
+
+    group.completePendingOffsetWrite(partition, secondOffset)
+    assertTrue(group.hasOffsets)
+    assertEquals(Some(secondOffset), group.offset(partition))
+  }
+
+  @Test
+  def testOffsetCommitWithAnotherPending(): Unit = {
+    val partition = new TopicPartition("foo", 0)
+    val firstOffset = OffsetAndMetadata(37)
+    val secondOffset = OffsetAndMetadata(57)
+
+    group.prepareOffsetCommit(Map(partition -> firstOffset))
+    assertTrue(group.hasOffsets)
+    assertEquals(None, group.offset(partition))
+
+    group.prepareOffsetCommit(Map(partition -> secondOffset))
+    assertTrue(group.hasOffsets)
+
+    group.completePendingOffsetWrite(partition, firstOffset)
+    assertTrue(group.hasOffsets)
+    assertEquals(Some(firstOffset), group.offset(partition))
+
+    group.completePendingOffsetWrite(partition, secondOffset)
+    assertTrue(group.hasOffsets)
+    assertEquals(Some(secondOffset), group.offset(partition))
+  }
+
   private def assertState(group: GroupMetadata, targetState: GroupState) {
     val states: Set[GroupState] = Set(Stable, PreparingRebalance, AwaitingSync, Dead)
     val otherStates = states - targetState

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c551675/core/src/test/scala/unit/kafka/coordinator/MemberMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/MemberMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/MemberMetadataTest.scala
index 88eb9ae..0688424 100644
--- a/core/src/test/scala/unit/kafka/coordinator/MemberMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/MemberMetadataTest.scala
@@ -27,6 +27,7 @@ class MemberMetadataTest extends JUnitSuite {
   val clientId = "clientId"
   val clientHost = "clientHost"
   val memberId = "memberId"
+  val protocolType = "consumer"
   val sessionTimeoutMs = 10000
 
 
@@ -34,7 +35,7 @@ class MemberMetadataTest extends JUnitSuite {
   def testMatchesSupportedProtocols {
     val protocols = List(("range", Array.empty[Byte]))
 
-    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs,
protocols)
+    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs,
protocolType, protocols)
     assertTrue(member.matches(protocols))
     assertFalse(member.matches(List(("range", Array[Byte](0)))))
     assertFalse(member.matches(List(("roundrobin", Array.empty[Byte]))))
@@ -45,7 +46,7 @@ class MemberMetadataTest extends JUnitSuite {
   def testVoteForPreferredProtocol {
     val protocols = List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte]))
 
-    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs,
protocols)
+    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs,
protocolType, protocols)
     assertEquals("range", member.vote(Set("range", "roundrobin")))
     assertEquals("roundrobin", member.vote(Set("blah", "roundrobin")))
   }
@@ -54,7 +55,7 @@ class MemberMetadataTest extends JUnitSuite {
   def testMetadata {
     val protocols = List(("range", Array[Byte](0)), ("roundrobin", Array[Byte](1)))
 
-    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs,
protocols)
+    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs,
protocolType, protocols)
     assertTrue(util.Arrays.equals(Array[Byte](0), member.metadata("range")))
     assertTrue(util.Arrays.equals(Array[Byte](1), member.metadata("roundrobin")))
   }
@@ -63,7 +64,7 @@ class MemberMetadataTest extends JUnitSuite {
   def testMetadataRaisesOnUnsupportedProtocol {
     val protocols = List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte]))
 
-    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs,
protocols)
+    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs,
protocolType, protocols)
     member.metadata("blah")
     fail()
   }
@@ -72,7 +73,7 @@ class MemberMetadataTest extends JUnitSuite {
   def testVoteRaisesOnNoSupportedProtocols {
     val protocols = List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte]))
 
-    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs,
protocols)
+    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs,
protocolType, protocols)
     member.vote(Set("blah"))
     fail()
   }


Mime
View raw message