kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/2] kafka git commit: KAFKA-2273; Sticky partition assignment strategy (KIP-54)
Date Thu, 18 May 2017 03:17:06 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 9815e18fe -> e1abf1770


http://git-wip-us.apache.org/repos/asf/kafka/blob/e1abf177/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 4a49833..e565ce2 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -399,7 +399,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     assertEquals(0, consumer0.assignment().size)
 
-    val pattern1 = Pattern.compile(".*o.*") // only 'topic' and 'foo' match this 
+    val pattern1 = Pattern.compile(".*o.*") // only 'topic' and 'foo' match this
     consumer0.subscribe(pattern1, new TestConsumerReassignmentListener)
     consumer0.poll(50)
 
@@ -883,6 +883,58 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }
   }
 
+  def reverse(m: Map[Long, Set[TopicPartition]]) =
+    m.values.toSet.flatten.map(v => (v, m.keys.filter(m(_).contains(v)).head)).toMap
+
+  /**
+   * This test runs the following scenario to verify sticky assignor behavior.
+   * Topics: single-topic, with random number of partitions, where #par is 10, 20, 30, 40,
50, 60, 70, 80, 90, or 100
+   * Consumers: 9 consumers subscribed to the single topic
+   * Expected initial assignment: partitions are assigned to consumers in a round robin fashion.
+   *  - (#par mod 9) consumers will get (#par / 9 + 1) partitions, and the rest get (#par
/ 9) partitions
+   * Then consumer #10 is added to the list (subscribing to the same single topic)
+   * Expected new assignment:
+   *  - (#par / 10) partition per consumer, where one partition from each of the early (#par
mod 9) consumers
+   *    will move to consumer #10, leading to a total of (#par mod 9) partition movement
+   */
+  @Test
+  def testMultiConsumerStickyAssignment() {
+    this.consumers.clear()
+    this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "sticky-group")
+    this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
classOf[StickyAssignor].getName)
+
+    // create one new topic
+    val topic = "single-topic"
+    val rand = 1 + scala.util.Random.nextInt(10)
+    val partitions = createTopicAndSendRecords(topic, rand * 10, 100)
+
+    // create a group of consumers, subscribe the consumers to the single topic and start
polling
+    // for the topic partition assignment
+    val (_, consumerPollers) = createConsumerGroupAndWaitForAssignment(9, List(topic), partitions)
+    validateGroupAssignment(consumerPollers, partitions, s"Did not get valid initial assignment
for partitions ${partitions.asJava}")
+    val prePartition2PollerId = reverse(consumerPollers.map(poller => (poller.getId, poller.consumerAssignment())).toMap)
+
+    // add one more consumer and validate re-assignment
+    addConsumersToGroupAndWaitForGroupAssignment(1, consumers, consumerPollers, List(topic),
partitions)
+
+    val postPartition2PollerId = reverse(consumerPollers.map(poller => (poller.getId,
poller.consumerAssignment())).toMap)
+    val keys = prePartition2PollerId.keySet.union(postPartition2PollerId.keySet)
+    var changes = 0
+    keys.foreach { key =>
+      val preVal = prePartition2PollerId.get(key)
+      val postVal = postPartition2PollerId.get(key)
+      if (preVal.nonEmpty && postVal.nonEmpty) {
+        if (preVal.get != postVal.get)
+          changes += 1
+      } else
+        changes += 1
+    }
+
+    consumerPollers.foreach(_.shutdown())
+
+    assertEquals("Expected only two topic partitions that have switched to other consumers.",
rand, changes)
+  }
+
   /**
    * This test re-uses BaseConsumerTest's consumers.
    * As a result, it is testing the default assignment strategy set by BaseConsumerTest
@@ -1477,8 +1529,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
    * Subscribes consumer 'consumer' to a given list of topics 'topicsToSubscribe', creates
    * consumer poller and starts polling.
    * Assumes that the consumer is not subscribed to any topics yet
-    *
-    * @param consumer consumer
+   *
+   * @param consumer consumer
    * @param topicsToSubscribe topics that this consumer will subscribe to
    * @return consumer poller for the given consumer
    */


Mime
View raw message