Repository: kafka Updated Branches: refs/heads/trunk c6b93fa50 -> c9264b4c8 KAFKA-2769; Multi-consumer integration tests for consumer assignment incl. session timeouts and corresponding fixes -- Refactored multi-consumer integration group assignment validation tests for round-robin assignment -- Added multi-consumer integration tests for session timeout expiration: 1. When a consumer stops polling 2. When a consumer calls close() -- Fixes to issues found with session timeout expiration tests woth help from Jason Gustafson: Try to avoid SendFailedException exception by cancelling the scheduled tasks and ensuring metadata update before sending group leave requests + send leave group request with retries. Author: Anna Povzner Reviewers: Jason Gustafson, Guozhang Wang Closes #472 from apovzner/cpkafka-81 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c9264b4c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c9264b4c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c9264b4c Branch: refs/heads/trunk Commit: c9264b4c8904bf330b70cd84b433a7a141ec9d0e Parents: c6b93fa Author: Anna Povzner Authored: Mon Nov 9 17:07:40 2015 -0800 Committer: Guozhang Wang Committed: Mon Nov 9 17:07:40 2015 -0800 ---------------------------------------------------------------------- .../consumer/internals/AbstractCoordinator.java | 1 + .../consumer/internals/ConsumerCoordinator.java | 8 +- .../kafka/api/BaseConsumerTest.scala | 47 +++- .../kafka/api/PlaintextConsumerTest.scala | 253 ++++++++++++++++--- 4 files changed, 267 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c9264b4c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 5b944d0..5b5c8a5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -547,6 +547,7 @@ public abstract class AbstractCoordinator implements Closeable { * Leave the current group and reset local generation/memberId. */ public void maybeLeaveGroup(boolean awaitResponse) { + client.unschedule(heartbeatTask); if (!coordinatorUnknown() && generation > 0) { // this is a minimal effort attempt to leave the group. we do not // attempt any resending if the request fails or times out. http://git-wip-us.apache.org/repos/asf/kafka/blob/c9264b4c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 1bc4050..95aad6d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -69,6 +69,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { private final SubscriptionState subscriptions; private final OffsetCommitCallback defaultOffsetCommitCallback; private final boolean autoCommitEnabled; + private DelayedTask autoCommitTask = null; /** * Initialize the coordination manager. @@ -112,7 +113,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { addMetadataListener(); if (autoCommitEnabled) - scheduleAutoCommitTask(autoCommitIntervalMs); + this.autoCommitTask = scheduleAutoCommitTask(autoCommitIntervalMs); this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix, metricTags); } @@ -307,6 +308,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator { public void close() { client.disableWakeups(); try { + if (autoCommitTask != null) + client.unschedule(autoCommitTask); maybeAutoCommitOffsetsSync(); } finally { super.close(); @@ -358,7 +361,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { } } - private void scheduleAutoCommitTask(final long interval) { + private DelayedTask scheduleAutoCommitTask(final long interval) { DelayedTask task = new DelayedTask() { public void run(long now) { commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() { @@ -372,6 +375,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { } }; client.schedule(task, time.milliseconds() + interval); + return task; } private void maybeAutoCommitOffsetsSync() { http://git-wip-us.apache.org/repos/asf/kafka/blob/c9264b4c/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala index 2e674af..819e690 100644 --- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala @@ -50,6 +50,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging { this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1") this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") // set small enough session timeout + this.serverConfig.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, "30000") this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all") this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test") this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") @@ -316,20 +317,56 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging { override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = count += 1 } - protected class ConsumerAssignmentPoller(consumer: Consumer[Array[Byte], Array[Byte]]) extends ShutdownableThread("daemon-consumer-assignment", false) + protected class ConsumerAssignmentPoller(consumer: Consumer[Array[Byte], Array[Byte]], + topicsToSubscribe: List[String]) extends ShutdownableThread("daemon-consumer-assignment", false) { @volatile private var partitionAssignment: Set[TopicPartition] = Set.empty[TopicPartition] + private var topicsSubscription = topicsToSubscribe + @volatile private var subscriptionChanged = false + + val rebalanceListener = new ConsumerRebalanceListener { + override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]) = { + partitionAssignment = collection.immutable.Set(consumer.assignment().asScala.toArray: _*) + } + + override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]) = { + partitionAssignment = Set.empty[TopicPartition] + } + } + consumer.subscribe(topicsToSubscribe.asJava, rebalanceListener) def consumerAssignment(): Set[TopicPartition] = { partitionAssignment } + /** + * Subscribe consumer to a new set of topics. + * Since this method most likely be called from a different thread, this function + * just "schedules" the subscription change, and actual call to consumer.subscribe is done + * in the doWork() method + * + * This method does not allow to change subscription until doWork processes the previous call + * to this method. This is just to avoid race conditions and enough functionality for testing purposes + * @param newTopicsToSubscribe + */ + def subscribe(newTopicsToSubscribe: List[String]): Unit = { + if (subscriptionChanged) { + throw new IllegalStateException("Do not call subscribe until the previous subsribe request is processed.") + } + topicsSubscription = newTopicsToSubscribe + subscriptionChanged = true + } + + def isSubscribeRequestProcessed(): Boolean = { + !subscriptionChanged + } + override def doWork(): Unit = { - consumer.poll(50) - if (consumer.assignment() != partitionAssignment.asJava) { - partitionAssignment = collection.immutable.Set(consumer.assignment().asScala.toArray: _*) + if (subscriptionChanged) { + consumer.subscribe(topicsSubscription.asJava, rebalanceListener) + subscriptionChanged = false } - Thread.sleep(100L) + consumer.poll(50) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c9264b4c/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index 93bb229..6fabfdc 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -14,6 +14,7 @@ package kafka.api import java.util.regex.Pattern +import kafka.server.KafkaConfig import kafka.utils.TestUtils import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.producer.ProducerRecord @@ -416,55 +417,104 @@ class PlaintextConsumerTest extends BaseConsumerTest { this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "roundrobin-group") this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[RoundRobinAssignor].getName) - val consumerCount = 10 - val rrConsumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]() - for (i <- 0 until consumerCount) { - rrConsumers += new KafkaConsumer(this.consumerConfig) - } - // create two new topics, total number of partitions must be greater than number of consumers val topic1 = "topic1" val topic2 = "topic2" val subscriptions = createTopicAndSendRecords(topic1, 5, 100) ++ createTopicAndSendRecords(topic2, 8, 100) - // all consumers subscribe to all the topics and start polling + // create a group of consumers, subscribe the consumers to all the topics and start polling // for the topic partition assignment - val consumerPollers = Buffer[ConsumerAssignmentPoller]() - for (consumer <- rrConsumers) { - assertEquals(0, consumer.assignment().size) - consumer.subscribe(List(topic1, topic2).asJava) - val poller = new ConsumerAssignmentPoller(consumer) - consumerPollers += poller - poller.start() - } + val (rrConsumers, consumerPollers) = createConsumerGroupAndWaitForAssignment(10, List(topic1, topic2), subscriptions) - TestUtils.waitUntilTrue(() => { - val assignments = Buffer[Set[TopicPartition]]() - consumerPollers.foreach(assignments += _.consumerAssignment()) - isPartitionAssignmentValid(assignments, subscriptions) - }, s"Did not get valid initial assignment for partitions ${subscriptions.asJava}") + // add one more consumer and validate re-assignment + addConsumersToGroupAndWaitForGroupAssignment(1, rrConsumers, consumerPollers, List(topic1, topic2), subscriptions) + + // done with pollers and consumers + for (poller <- consumerPollers) + poller.shutdown() - // add one more consumer - val newConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig) - newConsumer.subscribe(List(topic1, topic2).asJava) - val newPoller = new ConsumerAssignmentPoller(newConsumer) - rrConsumers += newConsumer - consumerPollers += newPoller - newPoller.start() + for (consumer <- rrConsumers) + consumer.unsubscribe() + } - // wait until topics get re-assigned - TestUtils.waitUntilTrue(() => { - val assignments = Buffer[Set[TopicPartition]]() - consumerPollers.foreach(assignments += _.consumerAssignment()) - isPartitionAssignmentValid(assignments, subscriptions) - }, s"Did not get valid assignment for partitions ${subscriptions.asJava} after we added one more consumer") + /** + * This test re-uses BaseConsumerTest's consumers. + * As a result, it is testing the default assignment strategy set by BaseConsumerTest + */ + @Test + def testMultiConsumerDefaultAssignment() { + // use consumers and topics defined in this class + one more topic + sendRecords(100, tp) + sendRecords(100, tp2) + val topic1 = "topic1" + val subscriptions = Set(tp, tp2) ++ createTopicAndSendRecords(topic1, 5, 100) + + // subscribe all consumers to all topics and validate the assignment + val consumerPollers = subscribeConsumersAndWaitForAssignment(consumers, List(topic, topic1), subscriptions) + + // add 2 more consumers and validate re-assignment + addConsumersToGroupAndWaitForGroupAssignment(2, consumers, consumerPollers, List(topic, topic1), subscriptions) + + // add one more topic and validate partition re-assignment + val topic2 = "topic2" + val expandedSubscriptions = subscriptions ++ createTopicAndSendRecords(topic2, 3, 100) + changeConsumerGroupSubscriptionAndValidateAssignment(consumerPollers, List(topic, topic1, topic2), expandedSubscriptions) + // remove the topic we just added and validate re-assignment + changeConsumerGroupSubscriptionAndValidateAssignment(consumerPollers, List(topic, topic1), subscriptions) + + // done with pollers and consumers for (poller <- consumerPollers) poller.shutdown() - for (consumer <- rrConsumers) { + for (consumer <- consumers) consumer.unsubscribe() - } + } + + @Test + def testMultiConsumerSessionTimeoutOnStopPolling(): Unit = { + runMultiConsumerSessionTimeoutTest(false) + } + + @Test + def testMultiConsumerSessionTimeoutOnClose(): Unit = { + runMultiConsumerSessionTimeoutTest(true) + } + + def runMultiConsumerSessionTimeoutTest(closeConsumer: Boolean): Unit = { + // use consumers defined in this class plus one additional consumer + // Use topic defined in this class + one additional topic + sendRecords(100, tp) + sendRecords(100, tp2) + val topic1 = "topic1" + val subscriptions = Set(tp, tp2) ++ createTopicAndSendRecords(topic1, 6, 100) + + // first subscribe consumers that are defined in this class + val consumerPollers = Buffer[ConsumerAssignmentPoller]() + for (consumer <- consumers) + consumerPollers += subscribeConsumerAndStartPolling(consumer, List(topic, topic1)) + + // create one more consumer and add it to the group; we will timeout this consumer + val timeoutConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig) + val expandedConsumers = consumers ++ Buffer[KafkaConsumer[Array[Byte], Array[Byte]]](timeoutConsumer) + val timeoutPoller = subscribeConsumerAndStartPolling(timeoutConsumer, List(topic, topic1)) + val expandedPollers = consumerPollers ++ Buffer[ConsumerAssignmentPoller](timeoutPoller) + + // validate the initial assignment + validateGroupAssignment(expandedPollers, subscriptions, s"Did not get valid initial assignment for partitions ${subscriptions.asJava}") + + // stop polling and close one of the consumers, should trigger partition re-assignment among alive consumers + timeoutPoller.shutdown() + if (closeConsumer) + timeoutConsumer.close() + + val maxSessionTimeout = this.serverConfig.getProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp).toLong + validateGroupAssignment(consumerPollers, subscriptions, + s"Did not get valid assignment for partitions ${subscriptions.asJava} after one consumer left", 3*maxSessionTimeout) + + // done with pollers and consumers + for (poller <- consumerPollers) + poller.shutdown() } /** @@ -481,4 +531,137 @@ class PlaintextConsumerTest extends BaseConsumerTest { } parts } + + /** + * Subscribes consumer 'consumer' to a given list of topics 'topicsToSubscribe', creates + * consumer poller and starts polling. + * Assumes that the consumer is not subscribed to any topics yet + * @param consumer consumer + * @param topicsToSubscribe topics that this consumer will subscribe to + * @return consumer poller for the given consumer + */ + def subscribeConsumerAndStartPolling(consumer: Consumer[Array[Byte], Array[Byte]], + topicsToSubscribe: List[String]): ConsumerAssignmentPoller = { + assertEquals(0, consumer.assignment().size) + val consumerPoller = new ConsumerAssignmentPoller(consumer, topicsToSubscribe) + consumerPoller.start() + consumerPoller + } + + /** + * Creates consumer pollers corresponding to a given consumer group, one per consumer; subscribes consumers to + * 'topicsToSubscribe' topics, waits until consumers get topics assignment, and validates the assignment + * Currently, assignment validation requires that total number of partitions is greater or equal to + * number of consumers (i.e. subscriptions.size >= consumerGroup.size) + * Assumes that topics are already created with partitions corresponding to a given set of topic partitions ('subscriptions') + * + * When the function returns, consumer pollers will continue to poll until shutdown is called on every poller. + * + * @param consumerGroup consumer group + * @param topicsToSubscribe topics to which consumers will subscribe to + * @param subscriptions set of all topic partitions + * @return collection of consumer pollers + */ + def subscribeConsumersAndWaitForAssignment(consumerGroup: Buffer[KafkaConsumer[Array[Byte], Array[Byte]]], + topicsToSubscribe: List[String], + subscriptions: Set[TopicPartition]): Buffer[ConsumerAssignmentPoller] = { + val consumerPollers = Buffer[ConsumerAssignmentPoller]() + for (consumer <- consumerGroup) + consumerPollers += subscribeConsumerAndStartPolling(consumer, topicsToSubscribe) + validateGroupAssignment(consumerPollers, subscriptions, s"Did not get valid initial assignment for partitions ${subscriptions.asJava}") + consumerPollers + } + + /** + * Creates 'consumerCount' consumers and consumer pollers, one per consumer; subscribes consumers to + * 'topicsToSubscribe' topics, waits until consumers get topics assignment, and validates the assignment + * Currently, assignment validation requires that total number of partitions is greater or equal to + * number of consumers (i.e. subscriptions.size >= consumerCount) + * Assumes that topics are already created with partitions corresponding to a given set of topic partitions ('subscriptions') + * + * When the function returns, consumer pollers will continue to poll until shutdown is called on every poller. + * + * @param consumerCount number of consumers to create + * @param topicsToSubscribe topics to which consumers will subscribe to + * @param subscriptions set of all topic partitions + * @return collection of created consumers and collection of corresponding consumer pollers + */ + def createConsumerGroupAndWaitForAssignment(consumerCount: Int, + topicsToSubscribe: List[String], + subscriptions: Set[TopicPartition]): (Buffer[KafkaConsumer[Array[Byte], Array[Byte]]], Buffer[ConsumerAssignmentPoller]) = { + assertTrue(consumerCount <= subscriptions.size) + val consumerGroup = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]() + for (i <- 0 until consumerCount) + consumerGroup += new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig) + + // create consumer pollers, wait for assignment and validate it + val consumerPollers = subscribeConsumersAndWaitForAssignment(consumerGroup, topicsToSubscribe, subscriptions) + + (consumerGroup, consumerPollers) + } + + /** + * Create 'numOfConsumersToAdd' consumers add then to the consumer group 'consumerGroup', and create corresponding + * pollers for these consumers. Wait for partition re-assignment and validate. + * + * Currently, assignment validation requires that total number of partitions is greater or equal to + * number of consumers, so subscriptions.size must be greate or equal the resulting number of consumers in the group + * + * @param numOfConsumersToAdd number of consumers to create and add to the consumer group + * @param consumerGroup current consumer group + * @param consumerPollers current consumer pollers + * @param topicsToSubscribe topics to which new consumers will subsribe to + * @param subscriptions set of all topic partitions + */ + def addConsumersToGroupAndWaitForGroupAssignment(numOfConsumersToAdd: Int, + consumerGroup: Buffer[KafkaConsumer[Array[Byte], Array[Byte]]], + consumerPollers: Buffer[ConsumerAssignmentPoller], + topicsToSubscribe: List[String], + subscriptions: Set[TopicPartition]): Unit = { + assertTrue(consumerGroup.size + numOfConsumersToAdd <= subscriptions.size) + for (i <- 0 until numOfConsumersToAdd) { + val newConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig) + consumerGroup += newConsumer + consumerPollers += subscribeConsumerAndStartPolling(newConsumer, topicsToSubscribe) + } + + // wait until topics get re-assigned and validate assignment + validateGroupAssignment(consumerPollers, subscriptions, + s"Did not get valid assignment for partitions ${subscriptions.asJava} after we added ${numOfConsumersToAdd} consumer(s)") + } + + /** + * Wait for consumers to get partition assignment and validate it. + * + * @param consumerPollers consumer pollers corresponding to the consumer group we are testing + * @param subscriptions set of all topic partitions + * @param msg message to print when waiting for/validating assignment fails + */ + def validateGroupAssignment(consumerPollers: Buffer[ConsumerAssignmentPoller], + subscriptions: Set[TopicPartition], + msg: String, + waitTime: Long = 10000L): Unit = { + TestUtils.waitUntilTrue(() => { + val assignments = Buffer[Set[TopicPartition]]() + consumerPollers.foreach(assignments += _.consumerAssignment()) + isPartitionAssignmentValid(assignments, subscriptions) + }, msg, waitTime) + } + + def changeConsumerGroupSubscriptionAndValidateAssignment(consumerPollers: Buffer[ConsumerAssignmentPoller], + topicsToSubscribe: List[String], + subscriptions: Set[TopicPartition]): Unit = { + for (poller <- consumerPollers) + poller.subscribe(topicsToSubscribe) + + // since subscribe call to poller does not actually call consumer subsribe right away, wait + // until subscribe is called on all consumers + TestUtils.waitUntilTrue(() => { + consumerPollers forall (poller => poller.isSubscribeRequestProcessed()) + }, s"Failed to call subscribe on all consumers in the group for subscription ${subscriptions}", 1000L) + + validateGroupAssignment(consumerPollers, subscriptions, + s"Did not get valid assignment for partitions ${subscriptions.asJava} after we changed subscription") + } + }