kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2434: Remove identical topic subscription constraint for roundrobin strategy in old consumer API
Date Tue, 03 Jan 2017 16:57:33 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk cea2a669b -> 9e9e6d875


KAFKA-2434: Remove identical topic subscription constraint for roundrobin strategy in old
consumer API

Author: Andrew Olson <aolson1@cerner.com>

Reviewers: Onur Karaman

Closes #145 from noslowerdna/KAFKA-2434


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9e9e6d87
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9e9e6d87
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9e9e6d87

Branch: refs/heads/trunk
Commit: 9e9e6d8752646436cb836af03a8906605fa76471
Parents: cea2a66
Author: Andrew Olson <aolson1@cerner.com>
Authored: Tue Jan 3 08:57:30 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Jan 3 08:57:30 2017 -0800

----------------------------------------------------------------------
 .../kafka/consumer/PartitionAssignor.scala      | 31 ++++-------
 .../consumer/ZookeeperConsumerConnector.scala   |  2 +-
 .../kafka/consumer/PartitionAssignorTest.scala  | 55 ++++++++++++++++++++
 3 files changed, 66 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9e9e6d87/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
index 900a4b6..f02df35 100755
--- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
@@ -47,12 +47,13 @@ class AssignmentContext(group: String, val consumerId: String, excludeInternalTo
     myTopicCount.getConsumerThreadIdsPerTopic
   }
 
-  val partitionsForTopic: collection.Map[String, Seq[Int]] =
-    zkUtils.getPartitionsForTopics(myTopicThreadIds.keySet.toSeq)
-
   val consumersForTopic: collection.Map[String, List[ConsumerThreadId]] =
     zkUtils.getConsumersPerTopic(group, excludeInternalTopics)
 
+  // Some assignment strategies require knowledge of all topics consumed by any member of
the group
+  val partitionsForTopic: collection.Map[String, Seq[Int]] =
+    zkUtils.getPartitionsForTopics(consumersForTopic.keySet.toSeq)
+
   val consumers: Seq[String] = zkUtils.getConsumersInGroup(group).sorted
 }
 
@@ -61,13 +62,7 @@ class AssignmentContext(group: String, val consumerId: String, excludeInternalTo
  * then proceeds to do a round-robin assignment from partition to consumer thread. If the
subscriptions of all consumer
  * instances are identical, then the partitions will be uniformly distributed. (i.e., the
partition ownership counts
  * will be within a delta of exactly one across all consumer threads.)
- *
- * (For simplicity of implementation) the assignor is allowed to assign a given topic-partition
to any consumer instance
- * and thread-id within that instance. Therefore, round-robin assignment is allowed only
if:
- * a) Every topic has the same number of streams within a consumer instance
- * b) The set of subscribed topics is identical for every consumer instance within the group.
  */
-
 class RoundRobinAssignor() extends PartitionAssignor with Logging {
 
   def assign(ctx: AssignmentContext) = {
@@ -77,18 +72,12 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging {
       new Pool[String, mutable.Map[TopicAndPartition, ConsumerThreadId]](Some(valueFactory))
 
     if (ctx.consumersForTopic.nonEmpty) {
-      // check conditions (a) and (b)
-      val (headTopic, headThreadIdSet) = (ctx.consumersForTopic.head._1, ctx.consumersForTopic.head._2.toSet)
-      ctx.consumersForTopic.foreach { case (topic, threadIds) =>
-        val threadIdSet = threadIds.toSet
-        require(threadIdSet == headThreadIdSet,
-          "Round-robin assignment is allowed only if all consumers in the group subscribe
to the same topics, " +
-            "AND if the stream counts across topics are identical for a given consumer instance.\n"
+
-            "Topic %s has the following available consumer streams: %s\n".format(topic, threadIdSet)
+
-            "Topic %s has the following available consumer streams: %s\n".format(headTopic,
headThreadIdSet))
-      }
+      // Collect consumer thread ids across all topics, remove duplicates, and sort to ensure
determinism
+      val allThreadIds = ctx.consumersForTopic.flatMap { case (topic, threadIds) =>
+         threadIds
+      }.toSet.toSeq.sorted
 
-      val threadAssignor = CoreUtils.circularIterator(headThreadIdSet.toSeq.sorted)
+      val threadAssignor = CoreUtils.circularIterator(allThreadIds)
 
       info("Starting round-robin assignment with consumers " + ctx.consumers)
       val allTopicPartitions = ctx.partitionsForTopic.flatMap { case (topic, partitions)
=>
@@ -106,7 +95,7 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging {
       })
 
       allTopicPartitions.foreach(topicPartition => {
-        val threadId = threadAssignor.next()
+        val threadId = threadAssignor.dropWhile(threadId => !ctx.consumersForTopic(topicPartition.topic).contains(threadId)).next
         // record the partition ownership decision
         val assignmentForConsumer = partitionAssignment.getAndMaybePut(threadId.consumer)
         assignmentForConsumer += (topicPartition -> threadId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e9e6d87/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 4ef32e9..9bf0d20 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -853,7 +853,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
         } catch {
           case _: ZkNodeExistsException =>
             // The node hasn't been deleted by the original owner. So wait a bit and retry.
-            info("waiting for the partition ownership to be deleted: " + partition)
+            info("waiting for the partition ownership to be deleted: " + partition + " for
topic " + topic)
             false
         }
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e9e6d87/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
index 9e568f8..1e45bfb 100644
--- a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
@@ -55,6 +55,61 @@ class PartitionAssignorTest extends Logging {
   }
 
   @Test
+  def testRoundRobinPartitionAssignorStaticSubscriptions() {
+    val assignor = new RoundRobinAssignor
+
+    /** test static subscription scenarios */
+    (1 to PartitionAssignorTest.TestCaseCount).foreach (testCase => {
+      val consumerCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxConsumerCount
+ 1))
+      val topicCount = PartitionAssignorTest.MinTopicCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxTopicCount
+ 1))
+
+      val topicPartitionCounts = Map((1 to topicCount).map(topic => {
+        ("topic-" + topic, PartitionAssignorTest.MinPartitionCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxPartitionCount)))
+      }).toSeq:_*)
+
+      val subscriptions = Map((1 to consumerCount).map(consumer => {
+        val streamCounts = Map((1 to topicCount).map(topic => {
+            ("topic-" + topic, 1)
+          }).toSeq:_*)
+        ("g1c" + consumer, StaticSubscriptionInfo(streamCounts))
+      }).toSeq:_*)
+      val scenario = Scenario("g1", topicPartitionCounts, subscriptions)
+      val zkUtils = PartitionAssignorTest.setupZkClientMock(scenario)
+      EasyMock.replay(zkUtils.zkClient)
+      PartitionAssignorTest.assignAndVerify(scenario, assignor, zkUtils, verifyAssignmentIsUniform
= true)
+    })
+  }
+
+  @Test
+  def testRoundRobinPartitionAssignorUnbalancedStaticSubscriptions() {
+    val assignor = new RoundRobinAssignor
+    val minConsumerCount = 5
+
+    /** test unbalanced static subscription scenarios */
+    (1 to PartitionAssignorTest.TestCaseCount).foreach (testCase => {
+      val consumerCount = minConsumerCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxConsumerCount
+ 1))
+      val topicCount = 10
+
+      val topicPartitionCounts = Map((1 to topicCount).map(topic => {
+        ("topic-" + topic, 10)
+      }).toSeq:_*)
+
+      val subscriptions = Map((1 to consumerCount).map(consumer => {
+        // Exclude some topics from some consumers
+        val topicRange = (1 to topicCount - consumer % minConsumerCount)
+        val streamCounts = Map(topicRange.map(topic => {
+            ("topic-" + topic, 3)
+          }).toSeq:_*)
+        ("g1c" + consumer, StaticSubscriptionInfo(streamCounts))
+      }).toSeq:_*)
+      val scenario = Scenario("g1", topicPartitionCounts, subscriptions)
+      val zkUtils = PartitionAssignorTest.setupZkClientMock(scenario)
+      EasyMock.replay(zkUtils.zkClient)
+      PartitionAssignorTest.assignAndVerify(scenario, assignor, zkUtils)
+    })
+  }
+
+  @Test
   def testRangePartitionAssignor() {
     val assignor = new RangeAssignor
     (1 to PartitionAssignorTest.TestCaseCount).foreach { _ =>


Mime
View raw message