kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject kafka git commit: KAFKA-2245; Add response tests for consumer coordinator; reviewed by Joel Koshy
Date Mon, 22 Jun 2015 17:14:21 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk dc54055d0 -> 1eac3ceaf


KAFKA-2245; Add response tests for consumer coordinator; reviewed by Joel Koshy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1eac3cea
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1eac3cea
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1eac3cea

Branch: refs/heads/trunk
Commit: 1eac3ceaf94b3e7583c7b6de2cfe13539ab06dd6
Parents: dc54055
Author: Onur Karaman <okaraman@linkedin.com>
Authored: Mon Jun 22 10:14:14 2015 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Mon Jun 22 10:14:14 2015 -0700

----------------------------------------------------------------------
 .../kafka/coordinator/ConsumerCoordinator.scala |   4 +-
 .../kafka/coordinator/CoordinatorMetadata.scala |   4 +-
 .../ConsumerCoordinatorResponseTest.scala       | 293 +++++++++++++++++++
 3 files changed, 297 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1eac3cea/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
index 51e89c8..a385adb 100644
--- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
@@ -46,8 +46,8 @@ class ConsumerCoordinator(val config: KafkaConfig,
   private var coordinatorMetadata: CoordinatorMetadata = null
 
   /**
-   * NOTE: If a group lock and coordinatorLock are simultaneously needed,
-   * be sure to acquire the group lock before coordinatorLock to prevent deadlock
+   * NOTE: If a group lock and metadataLock are simultaneously needed,
+   * be sure to acquire the group lock before metadataLock to prevent deadlock
    */
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/1eac3cea/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala
index c39e6de..0cd5605 100644
--- a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala
@@ -37,8 +37,8 @@ private[coordinator] class CoordinatorMetadata(config: KafkaConfig,
                                                maybePrepareRebalance: ConsumerGroupMetadata
=> Unit) {
 
   /**
-   * NOTE: If a group lock and coordinatorLock are simultaneously needed,
-   * be sure to acquire the group lock before coordinatorLock to prevent deadlock
+   * NOTE: If a group lock and metadataLock are simultaneously needed,
+   * be sure to acquire the group lock before metadataLock to prevent deadlock
    */
   private val metadataLock = new ReentrantReadWriteLock()
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1eac3cea/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
b/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
new file mode 100644
index 0000000..a44fbd6
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator
+
+
+import java.util.concurrent.TimeUnit
+
+import junit.framework.Assert._
+import kafka.common.TopicAndPartition
+import kafka.server.{KafkaConfig, OffsetManager}
+import kafka.utils.TestUtils
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.JoinGroupRequest
+import org.easymock.EasyMock
+import org.junit.{After, Before, Test}
+import org.scalatest.junit.JUnitSuite
+
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future, Promise}
+
+/**
+ * Test ConsumerCoordinator responses
+ */
+class ConsumerCoordinatorResponseTest extends JUnitSuite {
+  type JoinGroupCallbackParams = (Set[TopicAndPartition], String, Int, Short)
+  type JoinGroupCallback = (Set[TopicAndPartition], String, Int, Short) => Unit
+  type HeartbeatCallbackParams = Short
+  type HeartbeatCallback = Short => Unit
+
+  val ConsumerMinSessionTimeout = 10
+  val ConsumerMaxSessionTimeout = 30
+  val DefaultSessionTimeout = 20
+  var offsetManager: OffsetManager = null
+  var consumerCoordinator: ConsumerCoordinator = null
+
+  @Before
+  def setUp() {
+    val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")
+    props.setProperty(KafkaConfig.ConsumerMinSessionTimeoutMsProp, ConsumerMinSessionTimeout.toString)
+    props.setProperty(KafkaConfig.ConsumerMaxSessionTimeoutMsProp, ConsumerMaxSessionTimeout.toString)
+    offsetManager = EasyMock.createStrictMock(classOf[OffsetManager])
+    consumerCoordinator = new ConsumerCoordinator(KafkaConfig.fromProps(props), null, offsetManager)
+    consumerCoordinator.startup()
+  }
+
+  @After
+  def tearDown() {
+    consumerCoordinator.shutdown()
+  }
+
+  @Test
+  def testJoinGroupWrongCoordinator() {
+    val groupId = "groupId"
+    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
+    val partitionAssignmentStrategy = "range"
+
+    val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout,
isCoordinatorForGroup = false)
+    val joinGroupErrorCode = joinGroupResult._4
+    assertEquals(Errors.NOT_COORDINATOR_FOR_CONSUMER.code, joinGroupErrorCode)
+  }
+
+  @Test
+  def testJoinGroupUnknownPartitionAssignmentStrategy() {
+    val groupId = "groupId"
+    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
+    val partitionAssignmentStrategy = "foo"
+
+    val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout,
isCoordinatorForGroup = true)
+    val joinGroupErrorCode = joinGroupResult._4
+    assertEquals(Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code, joinGroupErrorCode)
+  }
+
+  @Test
+  def testJoinGroupSessionTimeoutTooSmall() {
+    val groupId = "groupId"
+    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
+    val partitionAssignmentStrategy = "range"
+
+    val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, ConsumerMinSessionTimeout
- 1, isCoordinatorForGroup = true)
+    val joinGroupErrorCode = joinGroupResult._4
+    assertEquals(Errors.INVALID_SESSION_TIMEOUT.code, joinGroupErrorCode)
+  }
+
+  @Test
+  def testJoinGroupSessionTimeoutTooLarge() {
+    val groupId = "groupId"
+    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
+    val partitionAssignmentStrategy = "range"
+
+    val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, ConsumerMaxSessionTimeout
+ 1, isCoordinatorForGroup = true)
+    val joinGroupErrorCode = joinGroupResult._4
+    assertEquals(Errors.INVALID_SESSION_TIMEOUT.code, joinGroupErrorCode)
+  }
+
+  @Test
+  def testJoinGroupUnknownConsumerNewGroup() {
+    val groupId = "groupId"
+    val consumerId = "consumerId"
+    val partitionAssignmentStrategy = "range"
+
+    val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout,
isCoordinatorForGroup = true)
+    val joinGroupErrorCode = joinGroupResult._4
+    assertEquals(Errors.UNKNOWN_CONSUMER_ID.code, joinGroupErrorCode)
+  }
+
+  @Test
+  def testValidJoinGroup() {
+    val groupId = "groupId"
+    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
+    val partitionAssignmentStrategy = "range"
+
+    val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout,
isCoordinatorForGroup = true)
+    val joinGroupErrorCode = joinGroupResult._4
+    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+  }
+
+  @Test
+  def testJoinGroupInconsistentPartitionAssignmentStrategy() {
+    val groupId = "groupId"
+    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
+    val otherConsumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
+    val partitionAssignmentStrategy = "range"
+    val otherPartitionAssignmentStrategy = "roundrobin"
+
+    val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout,
isCoordinatorForGroup = true)
+    val joinGroupErrorCode = joinGroupResult._4
+    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+    EasyMock.reset(offsetManager)
+    val otherJoinGroupResult = joinGroup(groupId, otherConsumerId, otherPartitionAssignmentStrategy,
DefaultSessionTimeout, isCoordinatorForGroup = true)
+    val otherJoinGroupErrorCode = otherJoinGroupResult._4
+    assertEquals(Errors.INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY.code, otherJoinGroupErrorCode)
+  }
+
+  @Test
+  def testJoinGroupUnknownConsumerExistingGroup() {
+    val groupId = "groupId"
+    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
+    val otherConsumerId = "consumerId"
+    val partitionAssignmentStrategy = "range"
+
+    val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout,
isCoordinatorForGroup = true)
+    val joinGroupErrorCode = joinGroupResult._4
+    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+    EasyMock.reset(offsetManager)
+    val otherJoinGroupResult = joinGroup(groupId, otherConsumerId, partitionAssignmentStrategy,
DefaultSessionTimeout, isCoordinatorForGroup = true)
+    val otherJoinGroupErrorCode = otherJoinGroupResult._4
+    assertEquals(Errors.UNKNOWN_CONSUMER_ID.code, otherJoinGroupErrorCode)
+  }
+
+  @Test
+  def testHeartbeatWrongCoordinator() {
+    val groupId = "groupId"
+    val consumerId = "consumerId"
+
+    val heartbeatResult = heartbeat(groupId, consumerId, -1, isCoordinatorForGroup = false)
+    assertEquals(Errors.NOT_COORDINATOR_FOR_CONSUMER.code, heartbeatResult)
+  }
+
+  @Test
+  def testHeartbeatUnknownGroup() {
+    val groupId = "groupId"
+    val consumerId = "consumerId"
+
+    val heartbeatResult = heartbeat(groupId, consumerId, -1, isCoordinatorForGroup = true)
+    assertEquals(Errors.UNKNOWN_CONSUMER_ID.code, heartbeatResult)
+  }
+
+  @Test
+  def testHeartbeatUnknownConsumerExistingGroup() {
+    val groupId = "groupId"
+    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
+    val otherConsumerId = "consumerId"
+    val partitionAssignmentStrategy = "range"
+
+    val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout,
isCoordinatorForGroup = true)
+    val joinGroupErrorCode = joinGroupResult._4
+    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+    EasyMock.reset(offsetManager)
+    val heartbeatResult = heartbeat(groupId, otherConsumerId, 1, isCoordinatorForGroup =
true)
+    assertEquals(Errors.UNKNOWN_CONSUMER_ID.code, heartbeatResult)
+  }
+
+  @Test
+  def testHeartbeatIllegalGeneration() {
+    val groupId = "groupId"
+    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
+    val partitionAssignmentStrategy = "range"
+
+    val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout,
isCoordinatorForGroup = true)
+    val assignedConsumerId = joinGroupResult._2
+    val joinGroupErrorCode = joinGroupResult._4
+    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+    EasyMock.reset(offsetManager)
+    val heartbeatResult = heartbeat(groupId, assignedConsumerId, 2, isCoordinatorForGroup
= true)
+    assertEquals(Errors.ILLEGAL_GENERATION.code, heartbeatResult)
+  }
+
+  @Test
+  def testValidHeartbeat() {
+    val groupId = "groupId"
+    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
+    val partitionAssignmentStrategy = "range"
+
+    val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout,
isCoordinatorForGroup = true)
+    val assignedConsumerId = joinGroupResult._2
+    val joinGroupErrorCode = joinGroupResult._4
+    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+    EasyMock.reset(offsetManager)
+    val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1, isCoordinatorForGroup
= true)
+    assertEquals(Errors.NONE.code, heartbeatResult)
+  }
+
+  @Test
+  def testGenerationIdIncrementsOnRebalance() {
+    val groupId = "groupId"
+    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
+    val otherConsumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
+    val partitionAssignmentStrategy = "range"
+
+    val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout,
isCoordinatorForGroup = true)
+    val initialGenerationId = joinGroupResult._3
+    val joinGroupErrorCode = joinGroupResult._4
+    assertEquals(1, initialGenerationId)
+    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+    EasyMock.reset(offsetManager)
+    val otherJoinGroupResult = joinGroup(groupId, otherConsumerId, partitionAssignmentStrategy,
DefaultSessionTimeout, isCoordinatorForGroup = true)
+    val nextGenerationId = otherJoinGroupResult._3
+    val otherJoinGroupErrorCode = otherJoinGroupResult._4
+    assertEquals(2, nextGenerationId)
+    assertEquals(Errors.NONE.code, otherJoinGroupErrorCode)
+  }
+
+  private def setupJoinGroupCallback: (Future[JoinGroupCallbackParams], JoinGroupCallback)
= {
+    val responsePromise = Promise[JoinGroupCallbackParams]
+    val responseFuture = responsePromise.future
+    val responseCallback: JoinGroupCallback = (partitions, consumerId, generationId, errorCode)
=>
+      responsePromise.success((partitions, consumerId, generationId, errorCode))
+    (responseFuture, responseCallback)
+  }
+
+  private def setupHeartbeatCallback: (Future[HeartbeatCallbackParams], HeartbeatCallback)
= {
+    val responsePromise = Promise[HeartbeatCallbackParams]
+    val responseFuture = responsePromise.future
+    val responseCallback: HeartbeatCallback = errorCode => responsePromise.success(errorCode)
+    (responseFuture, responseCallback)
+  }
+
+  private def joinGroup(groupId: String,
+                        consumerId: String,
+                        partitionAssignmentStrategy: String,
+                        sessionTimeout: Int,
+                        isCoordinatorForGroup: Boolean): JoinGroupCallbackParams = {
+    val (responseFuture, responseCallback) = setupJoinGroupCallback
+    EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1)
+    EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup)
+    EasyMock.replay(offsetManager)
+    consumerCoordinator.handleJoinGroup(groupId, consumerId, Set.empty, sessionTimeout, partitionAssignmentStrategy,
responseCallback)
+    Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
+  }
+
+  private def heartbeat(groupId: String,
+                        consumerId: String,
+                        generationId: Int,
+                        isCoordinatorForGroup: Boolean): HeartbeatCallbackParams = {
+    val (responseFuture, responseCallback) = setupHeartbeatCallback
+    EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1)
+    EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup)
+    EasyMock.replay(offsetManager)
+    consumerCoordinator.handleHeartbeat(groupId, consumerId, generationId, responseCallback)
+    Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
+  }
+}


Mime
View raw message