kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2769; Multi-consumer integration tests for consumer assignment incl. session timeouts and corresponding fixes
Date Tue, 10 Nov 2015 01:01:51 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.9.0 2b97b6cc0 -> 98b625780


KAFKA-2769; Multi-consumer integration tests for consumer assignment incl. session timeouts
and corresponding fixes

-- Refactored multi-consumer integration group assignment validation tests for round-robin
assignment
-- Added multi-consumer integration tests for session timeout expiration:
   1. When a consumer stops polling
   2. When a consumer calls close()

-- Fixes to issues found with session timeout expiration tests woth help from Jason Gustafson:
Try to avoid  SendFailedException exception by cancelling the scheduled tasks and ensuring
metadata update before sending group leave requests + send leave group request with retries.

Author: Anna Povzner <anna@confluent.io>

Reviewers: Jason Gustafson, Guozhang Wang

Closes #472 from apovzner/cpkafka-81

(cherry picked from commit c9264b4c8904bf330b70cd84b433a7a141ec9d0e)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/98b62578
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/98b62578
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/98b62578

Branch: refs/heads/0.9.0
Commit: 98b625780c1f524c1f4fa55e099c88eb8a036fca
Parents: 2b97b6c
Author: Anna Povzner <anna@confluent.io>
Authored: Mon Nov 9 17:07:40 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Nov 9 17:07:50 2015 -0800

----------------------------------------------------------------------
 .../consumer/internals/AbstractCoordinator.java |   1 +
 .../consumer/internals/ConsumerCoordinator.java |   8 +-
 .../kafka/api/BaseConsumerTest.scala            |  47 +++-
 .../kafka/api/PlaintextConsumerTest.scala       | 253 ++++++++++++++++---
 4 files changed, 267 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/98b62578/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 5b944d0..5b5c8a5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -547,6 +547,7 @@ public abstract class AbstractCoordinator implements Closeable {
      * Leave the current group and reset local generation/memberId.
      */
     public void maybeLeaveGroup(boolean awaitResponse) {
+        client.unschedule(heartbeatTask);
         if (!coordinatorUnknown() && generation > 0) {
             // this is a minimal effort attempt to leave the group. we do not
             // attempt any resending if the request fails or times out.

http://git-wip-us.apache.org/repos/asf/kafka/blob/98b62578/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 1bc4050..95aad6d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -69,6 +69,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     private final SubscriptionState subscriptions;
     private final OffsetCommitCallback defaultOffsetCommitCallback;
     private final boolean autoCommitEnabled;
+    private DelayedTask autoCommitTask = null;
 
     /**
      * Initialize the coordination manager.
@@ -112,7 +113,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         addMetadataListener();
 
         if (autoCommitEnabled)
-            scheduleAutoCommitTask(autoCommitIntervalMs);
+            this.autoCommitTask = scheduleAutoCommitTask(autoCommitIntervalMs);
 
         this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
     }
@@ -307,6 +308,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     public void close() {
         client.disableWakeups();
         try {
+            if (autoCommitTask != null)
+                client.unschedule(autoCommitTask);
             maybeAutoCommitOffsetsSync();
         } finally {
             super.close();
@@ -358,7 +361,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         }
     }
 
-    private void scheduleAutoCommitTask(final long interval) {
+    private DelayedTask scheduleAutoCommitTask(final long interval) {
         DelayedTask task = new DelayedTask() {
             public void run(long now) {
                 commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback()
{
@@ -372,6 +375,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
             }
         };
         client.schedule(task, time.milliseconds() + interval);
+        return task;
     }
 
     private void maybeAutoCommitOffsetsSync() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/98b62578/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 2e674af..819e690 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -50,6 +50,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging
{
   this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't
want to lose offset
   this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
   this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") // set small
enough session timeout
+  this.serverConfig.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, "30000")
   this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
   this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
   this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
@@ -316,20 +317,56 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with
Logging {
     override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception:
Exception): Unit = count += 1
   }
 
-  protected class ConsumerAssignmentPoller(consumer: Consumer[Array[Byte], Array[Byte]])
extends ShutdownableThread("daemon-consumer-assignment", false)
+  protected class ConsumerAssignmentPoller(consumer: Consumer[Array[Byte], Array[Byte]],
+                                           topicsToSubscribe: List[String]) extends ShutdownableThread("daemon-consumer-assignment",
false)
   {
     @volatile private var partitionAssignment: Set[TopicPartition] = Set.empty[TopicPartition]
+    private var topicsSubscription = topicsToSubscribe
+    @volatile private var subscriptionChanged = false
+
+    val rebalanceListener = new ConsumerRebalanceListener {
+      override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]) = {
+        partitionAssignment = collection.immutable.Set(consumer.assignment().asScala.toArray:
_*)
+      }
+
+      override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]) = {
+        partitionAssignment = Set.empty[TopicPartition]
+      }
+    }
+    consumer.subscribe(topicsToSubscribe.asJava, rebalanceListener)
 
     def consumerAssignment(): Set[TopicPartition] = {
       partitionAssignment
     }
 
+    /**
+     * Subscribe consumer to a new set of topics.
+     * Since this method most likely be called from a different thread, this function
+     * just "schedules" the subscription change, and actual call to consumer.subscribe is
done
+     * in the doWork() method
+     *
+     * This method does not allow to change subscription until doWork processes the previous
call
+     * to this method. This is just to avoid race conditions and enough functionality for
testing purposes
+     * @param newTopicsToSubscribe
+     */
+    def subscribe(newTopicsToSubscribe: List[String]): Unit = {
+      if (subscriptionChanged) {
+        throw new IllegalStateException("Do not call subscribe until the previous subsribe
request is processed.")
+      }
+      topicsSubscription = newTopicsToSubscribe
+      subscriptionChanged = true
+    }
+
+    def isSubscribeRequestProcessed(): Boolean = {
+      !subscriptionChanged
+    }
+
     override def doWork(): Unit = {
-      consumer.poll(50)
-      if (consumer.assignment() != partitionAssignment.asJava) {
-        partitionAssignment = collection.immutable.Set(consumer.assignment().asScala.toArray:
_*)
+      if (subscriptionChanged) {
+        consumer.subscribe(topicsSubscription.asJava, rebalanceListener)
+        subscriptionChanged = false
       }
-      Thread.sleep(100L)
+      consumer.poll(50)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/98b62578/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 93bb229..6fabfdc 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -14,6 +14,7 @@ package kafka.api
 
 import java.util.regex.Pattern
 
+import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.ProducerRecord
@@ -416,55 +417,104 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "roundrobin-group")
     this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
classOf[RoundRobinAssignor].getName)
 
-    val consumerCount = 10
-    val rrConsumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
-    for (i <- 0 until consumerCount) {
-      rrConsumers += new KafkaConsumer(this.consumerConfig)
-    }
-
     // create two new topics, total number of partitions must be greater than number of consumers
     val topic1 = "topic1"
     val topic2 = "topic2"
     val subscriptions = createTopicAndSendRecords(topic1, 5, 100) ++ createTopicAndSendRecords(topic2,
8, 100)
 
-    // all consumers subscribe to all the topics and start polling
+    // create a group of consumers, subscribe the consumers to all the topics and start polling
     // for the topic partition assignment
-    val consumerPollers = Buffer[ConsumerAssignmentPoller]()
-    for (consumer <- rrConsumers) {
-      assertEquals(0, consumer.assignment().size)
-      consumer.subscribe(List(topic1, topic2).asJava)
-      val poller = new ConsumerAssignmentPoller(consumer)
-      consumerPollers += poller
-      poller.start()
-    }
+    val (rrConsumers, consumerPollers) = createConsumerGroupAndWaitForAssignment(10, List(topic1,
topic2), subscriptions)
 
-    TestUtils.waitUntilTrue(() => {
-      val assignments = Buffer[Set[TopicPartition]]()
-      consumerPollers.foreach(assignments += _.consumerAssignment())
-      isPartitionAssignmentValid(assignments, subscriptions)
-    }, s"Did not get valid initial assignment for partitions ${subscriptions.asJava}")
+    // add one more consumer and validate re-assignment
+    addConsumersToGroupAndWaitForGroupAssignment(1, rrConsumers, consumerPollers, List(topic1,
topic2), subscriptions)
+
+    // done with pollers and consumers
+    for (poller <- consumerPollers)
+      poller.shutdown()
 
-    // add one more consumer
-    val newConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig)
-    newConsumer.subscribe(List(topic1, topic2).asJava)
-    val newPoller = new ConsumerAssignmentPoller(newConsumer)
-    rrConsumers += newConsumer
-    consumerPollers += newPoller
-    newPoller.start()
+    for (consumer <- rrConsumers)
+      consumer.unsubscribe()
+  }
 
-    // wait until topics get re-assigned
-    TestUtils.waitUntilTrue(() => {
-      val assignments = Buffer[Set[TopicPartition]]()
-      consumerPollers.foreach(assignments += _.consumerAssignment())
-      isPartitionAssignmentValid(assignments, subscriptions)
-    }, s"Did not get valid assignment for partitions ${subscriptions.asJava} after we added
one more consumer")
+  /**
+   * This test re-uses BaseConsumerTest's consumers.
+   * As a result, it is testing the default assignment strategy set by BaseConsumerTest
+   */
+  @Test
+  def testMultiConsumerDefaultAssignment() {
+    // use consumers and topics defined in this class + one more topic
+    sendRecords(100, tp)
+    sendRecords(100, tp2)
+    val topic1 = "topic1"
+    val subscriptions = Set(tp, tp2) ++ createTopicAndSendRecords(topic1, 5, 100)
+
+    // subscribe all consumers to all topics and validate the assignment
+    val consumerPollers = subscribeConsumersAndWaitForAssignment(consumers, List(topic, topic1),
subscriptions)
+
+    // add 2 more consumers and validate re-assignment
+    addConsumersToGroupAndWaitForGroupAssignment(2, consumers, consumerPollers, List(topic,
topic1), subscriptions)
+
+    // add one more topic and validate partition re-assignment
+    val topic2 = "topic2"
+    val expandedSubscriptions = subscriptions ++ createTopicAndSendRecords(topic2, 3, 100)
+    changeConsumerGroupSubscriptionAndValidateAssignment(consumerPollers, List(topic, topic1,
topic2), expandedSubscriptions)
 
+    // remove the topic we just added and validate re-assignment
+    changeConsumerGroupSubscriptionAndValidateAssignment(consumerPollers, List(topic, topic1),
subscriptions)
+
+    // done with pollers and consumers
     for (poller <- consumerPollers)
       poller.shutdown()
 
-    for (consumer <- rrConsumers) {
+    for (consumer <- consumers)
       consumer.unsubscribe()
-    }
+  }
+
+  @Test
+  def testMultiConsumerSessionTimeoutOnStopPolling(): Unit = {
+    runMultiConsumerSessionTimeoutTest(false)
+  }
+
+  @Test
+  def testMultiConsumerSessionTimeoutOnClose(): Unit = {
+    runMultiConsumerSessionTimeoutTest(true)
+  }
+
+  def runMultiConsumerSessionTimeoutTest(closeConsumer: Boolean): Unit = {
+    // use consumers defined in this class plus one additional consumer
+    // Use topic defined in this class + one additional topic
+    sendRecords(100, tp)
+    sendRecords(100, tp2)
+    val topic1 = "topic1"
+    val subscriptions = Set(tp, tp2) ++ createTopicAndSendRecords(topic1, 6, 100)
+
+    // first subscribe consumers that are defined in this class
+    val consumerPollers = Buffer[ConsumerAssignmentPoller]()
+    for (consumer <- consumers)
+      consumerPollers += subscribeConsumerAndStartPolling(consumer, List(topic, topic1))
+
+    // create one more consumer and add it to the group; we will timeout this consumer
+    val timeoutConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig)
+    val expandedConsumers = consumers ++ Buffer[KafkaConsumer[Array[Byte], Array[Byte]]](timeoutConsumer)
+    val timeoutPoller = subscribeConsumerAndStartPolling(timeoutConsumer, List(topic, topic1))
+    val expandedPollers = consumerPollers ++ Buffer[ConsumerAssignmentPoller](timeoutPoller)
+
+    // validate the initial assignment
+    validateGroupAssignment(expandedPollers, subscriptions, s"Did not get valid initial assignment
for partitions ${subscriptions.asJava}")
+
+    // stop polling and close one of the consumers, should trigger partition re-assignment
among alive consumers
+    timeoutPoller.shutdown()
+    if (closeConsumer)
+      timeoutConsumer.close()
+
+    val maxSessionTimeout = this.serverConfig.getProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp).toLong
+    validateGroupAssignment(consumerPollers, subscriptions,
+      s"Did not get valid assignment for partitions ${subscriptions.asJava} after one consumer
left", 3*maxSessionTimeout)
+
+    // done with pollers and consumers
+    for (poller <- consumerPollers)
+      poller.shutdown()
   }
 
   /**
@@ -481,4 +531,137 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }
     parts
   }
+
+  /**
+   * Subscribes consumer 'consumer' to a given list of topics 'topicsToSubscribe', creates
+   * consumer poller and starts polling.
+   * Assumes that the consumer is not subscribed to any topics yet
+   * @param consumer consumer
+   * @param topicsToSubscribe topics that this consumer will subscribe to
+   * @return consumer poller for the given consumer
+   */
+  def subscribeConsumerAndStartPolling(consumer: Consumer[Array[Byte], Array[Byte]],
+                                       topicsToSubscribe: List[String]): ConsumerAssignmentPoller
= {
+    assertEquals(0, consumer.assignment().size)
+    val consumerPoller = new ConsumerAssignmentPoller(consumer, topicsToSubscribe)
+    consumerPoller.start()
+    consumerPoller
+  }
+
+  /**
+   * Creates consumer pollers corresponding to a given consumer group, one per consumer;
subscribes consumers to
+   * 'topicsToSubscribe' topics, waits until consumers get topics assignment, and validates
the assignment
+   * Currently, assignment validation requires that total number of partitions is greater
or equal to
+   * number of consumers (i.e. subscriptions.size >= consumerGroup.size)
+   * Assumes that topics are already created with partitions corresponding to a given set
of topic partitions ('subscriptions')
+   *
+   * When the function returns, consumer pollers will continue to poll until shutdown is
called on every poller.
+   *
+   * @param consumerGroup consumer group
+   * @param topicsToSubscribe topics to which consumers will subscribe to
+   * @param subscriptions set of all topic partitions
+   * @return collection of consumer pollers
+   */
+  def subscribeConsumersAndWaitForAssignment(consumerGroup: Buffer[KafkaConsumer[Array[Byte],
Array[Byte]]],
+                                             topicsToSubscribe: List[String],
+                                             subscriptions: Set[TopicPartition]): Buffer[ConsumerAssignmentPoller]
= {
+    val consumerPollers = Buffer[ConsumerAssignmentPoller]()
+    for (consumer <- consumerGroup)
+      consumerPollers += subscribeConsumerAndStartPolling(consumer, topicsToSubscribe)
+    validateGroupAssignment(consumerPollers, subscriptions, s"Did not get valid initial assignment
for partitions ${subscriptions.asJava}")
+    consumerPollers
+  }
+
+  /**
+   * Creates 'consumerCount' consumers and consumer pollers, one per consumer; subscribes
consumers to
+   * 'topicsToSubscribe' topics, waits until consumers get topics assignment, and validates
the assignment
+   * Currently, assignment validation requires that total number of partitions is greater
or equal to
+   * number of consumers (i.e. subscriptions.size >= consumerCount)
+   * Assumes that topics are already created with partitions corresponding to a given set
of topic partitions ('subscriptions')
+   *
+   * When the function returns, consumer pollers will continue to poll until shutdown is
called on every poller.
+   *
+   * @param consumerCount number of consumers to create
+   * @param topicsToSubscribe topics to which consumers will subscribe to
+   * @param subscriptions set of all topic partitions
+   * @return collection of created consumers and collection of corresponding consumer pollers
+   */
+  def createConsumerGroupAndWaitForAssignment(consumerCount: Int,
+                                              topicsToSubscribe: List[String],
+                                              subscriptions: Set[TopicPartition]): (Buffer[KafkaConsumer[Array[Byte],
Array[Byte]]], Buffer[ConsumerAssignmentPoller]) = {
+    assertTrue(consumerCount <= subscriptions.size)
+    val consumerGroup = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
+    for (i <- 0 until consumerCount)
+      consumerGroup += new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig)
+
+    // create consumer pollers, wait for assignment and validate it
+    val consumerPollers = subscribeConsumersAndWaitForAssignment(consumerGroup, topicsToSubscribe,
subscriptions)
+
+    (consumerGroup, consumerPollers)
+  }
+
+  /**
+   * Create 'numOfConsumersToAdd' consumers add then to the consumer group 'consumerGroup',
and create corresponding
+   * pollers for these consumers. Wait for partition re-assignment and validate.
+   *
+   * Currently, assignment validation requires that total number of partitions is greater
or equal to
+   * number of consumers, so subscriptions.size must be greate or equal the resulting number
of consumers in the group
+   *
+   * @param numOfConsumersToAdd number of consumers to create and add to the consumer group
+   * @param consumerGroup current consumer group
+   * @param consumerPollers current consumer pollers
+   * @param topicsToSubscribe topics to which new consumers will subsribe to
+   * @param subscriptions set of all topic partitions
+   */
+  def addConsumersToGroupAndWaitForGroupAssignment(numOfConsumersToAdd: Int,
+                                                   consumerGroup: Buffer[KafkaConsumer[Array[Byte],
Array[Byte]]],
+                                                   consumerPollers: Buffer[ConsumerAssignmentPoller],
+                                                   topicsToSubscribe: List[String],
+                                                   subscriptions: Set[TopicPartition]): Unit
= {
+    assertTrue(consumerGroup.size + numOfConsumersToAdd <= subscriptions.size)
+    for (i <- 0 until numOfConsumersToAdd) {
+      val newConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig)
+      consumerGroup += newConsumer
+      consumerPollers += subscribeConsumerAndStartPolling(newConsumer, topicsToSubscribe)
+    }
+
+    // wait until topics get re-assigned and validate assignment
+    validateGroupAssignment(consumerPollers, subscriptions,
+        s"Did not get valid assignment for partitions ${subscriptions.asJava} after we added
${numOfConsumersToAdd} consumer(s)")
+  }
+
+  /**
+   * Wait for consumers to get partition assignment and validate it.
+   *
+   * @param consumerPollers consumer pollers corresponding to the consumer group we are testing
+   * @param subscriptions set of all topic partitions
+   * @param msg message to print when waiting for/validating assignment fails
+   */
+  def validateGroupAssignment(consumerPollers: Buffer[ConsumerAssignmentPoller],
+                              subscriptions: Set[TopicPartition],
+                              msg: String,
+                              waitTime: Long = 10000L): Unit = {
+    TestUtils.waitUntilTrue(() => {
+      val assignments = Buffer[Set[TopicPartition]]()
+      consumerPollers.foreach(assignments += _.consumerAssignment())
+      isPartitionAssignmentValid(assignments, subscriptions)
+    }, msg, waitTime)
+  }
+
+  def changeConsumerGroupSubscriptionAndValidateAssignment(consumerPollers: Buffer[ConsumerAssignmentPoller],
+                                                           topicsToSubscribe: List[String],
+                                                           subscriptions: Set[TopicPartition]):
Unit = {
+    for (poller <- consumerPollers)
+      poller.subscribe(topicsToSubscribe)
+
+    // since subscribe call to poller does not actually call consumer subsribe right away,
wait
+    // until subscribe is called on all consumers
+    TestUtils.waitUntilTrue(() => {
+      consumerPollers forall (poller => poller.isSubscribeRequestProcessed())
+    }, s"Failed to call subscribe on all consumers in the group for subscription ${subscriptions}",
1000L)
+
+    validateGroupAssignment(consumerPollers, subscriptions,
+        s"Did not get valid assignment for partitions ${subscriptions.asJava} after we changed
subscription")
+  }
+
 }


Mime
View raw message