Repository: kafka
Updated Branches:
refs/heads/0.9.0 2b97b6cc0 -> 98b625780
KAFKA-2769; Multi-consumer integration tests for consumer assignment incl. session timeouts
and corresponding fixes
-- Refactored multi-consumer integration group assignment validation tests for round-robin
assignment
-- Added multi-consumer integration tests for session timeout expiration:
1. When a consumer stops polling
2. When a consumer calls close()
-- Fixes to issues found with session timeout expiration tests woth help from Jason Gustafson:
Try to avoid SendFailedException exception by cancelling the scheduled tasks and ensuring
metadata update before sending group leave requests + send leave group request with retries.
Author: Anna Povzner <anna@confluent.io>
Reviewers: Jason Gustafson, Guozhang Wang
Closes #472 from apovzner/cpkafka-81
(cherry picked from commit c9264b4c8904bf330b70cd84b433a7a141ec9d0e)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/98b62578
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/98b62578
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/98b62578
Branch: refs/heads/0.9.0
Commit: 98b625780c1f524c1f4fa55e099c88eb8a036fca
Parents: 2b97b6c
Author: Anna Povzner <anna@confluent.io>
Authored: Mon Nov 9 17:07:40 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Nov 9 17:07:50 2015 -0800
----------------------------------------------------------------------
.../consumer/internals/AbstractCoordinator.java | 1 +
.../consumer/internals/ConsumerCoordinator.java | 8 +-
.../kafka/api/BaseConsumerTest.scala | 47 +++-
.../kafka/api/PlaintextConsumerTest.scala | 253 ++++++++++++++++---
4 files changed, 267 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/98b62578/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 5b944d0..5b5c8a5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -547,6 +547,7 @@ public abstract class AbstractCoordinator implements Closeable {
* Leave the current group and reset local generation/memberId.
*/
public void maybeLeaveGroup(boolean awaitResponse) {
+ client.unschedule(heartbeatTask);
if (!coordinatorUnknown() && generation > 0) {
// this is a minimal effort attempt to leave the group. we do not
// attempt any resending if the request fails or times out.
http://git-wip-us.apache.org/repos/asf/kafka/blob/98b62578/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 1bc4050..95aad6d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -69,6 +69,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
private final SubscriptionState subscriptions;
private final OffsetCommitCallback defaultOffsetCommitCallback;
private final boolean autoCommitEnabled;
+ private DelayedTask autoCommitTask = null;
/**
* Initialize the coordination manager.
@@ -112,7 +113,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
addMetadataListener();
if (autoCommitEnabled)
- scheduleAutoCommitTask(autoCommitIntervalMs);
+ this.autoCommitTask = scheduleAutoCommitTask(autoCommitIntervalMs);
this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
}
@@ -307,6 +308,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
public void close() {
client.disableWakeups();
try {
+ if (autoCommitTask != null)
+ client.unschedule(autoCommitTask);
maybeAutoCommitOffsetsSync();
} finally {
super.close();
@@ -358,7 +361,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
}
}
- private void scheduleAutoCommitTask(final long interval) {
+ private DelayedTask scheduleAutoCommitTask(final long interval) {
DelayedTask task = new DelayedTask() {
public void run(long now) {
commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback()
{
@@ -372,6 +375,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
}
};
client.schedule(task, time.milliseconds() + interval);
+ return task;
}
private void maybeAutoCommitOffsetsSync() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/98b62578/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 2e674af..819e690 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -50,6 +50,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging
{
this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't
want to lose offset
this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") // set small
enough session timeout
+ this.serverConfig.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, "30000")
this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
@@ -316,20 +317,56 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with
Logging {
override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception:
Exception): Unit = count += 1
}
- protected class ConsumerAssignmentPoller(consumer: Consumer[Array[Byte], Array[Byte]])
extends ShutdownableThread("daemon-consumer-assignment", false)
+ protected class ConsumerAssignmentPoller(consumer: Consumer[Array[Byte], Array[Byte]],
+ topicsToSubscribe: List[String]) extends ShutdownableThread("daemon-consumer-assignment",
false)
{
@volatile private var partitionAssignment: Set[TopicPartition] = Set.empty[TopicPartition]
+ private var topicsSubscription = topicsToSubscribe
+ @volatile private var subscriptionChanged = false
+
+ val rebalanceListener = new ConsumerRebalanceListener {
+ override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]) = {
+ partitionAssignment = collection.immutable.Set(consumer.assignment().asScala.toArray:
_*)
+ }
+
+ override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]) = {
+ partitionAssignment = Set.empty[TopicPartition]
+ }
+ }
+ consumer.subscribe(topicsToSubscribe.asJava, rebalanceListener)
def consumerAssignment(): Set[TopicPartition] = {
partitionAssignment
}
+ /**
+ * Subscribe consumer to a new set of topics.
+ * Since this method most likely be called from a different thread, this function
+ * just "schedules" the subscription change, and actual call to consumer.subscribe is
done
+ * in the doWork() method
+ *
+ * This method does not allow to change subscription until doWork processes the previous
call
+ * to this method. This is just to avoid race conditions and enough functionality for
testing purposes
+ * @param newTopicsToSubscribe
+ */
+ def subscribe(newTopicsToSubscribe: List[String]): Unit = {
+ if (subscriptionChanged) {
+ throw new IllegalStateException("Do not call subscribe until the previous subsribe
request is processed.")
+ }
+ topicsSubscription = newTopicsToSubscribe
+ subscriptionChanged = true
+ }
+
+ def isSubscribeRequestProcessed(): Boolean = {
+ !subscriptionChanged
+ }
+
override def doWork(): Unit = {
- consumer.poll(50)
- if (consumer.assignment() != partitionAssignment.asJava) {
- partitionAssignment = collection.immutable.Set(consumer.assignment().asScala.toArray:
_*)
+ if (subscriptionChanged) {
+ consumer.subscribe(topicsSubscription.asJava, rebalanceListener)
+ subscriptionChanged = false
}
- Thread.sleep(100L)
+ consumer.poll(50)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/98b62578/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 93bb229..6fabfdc 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -14,6 +14,7 @@ package kafka.api
import java.util.regex.Pattern
+import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.ProducerRecord
@@ -416,55 +417,104 @@ class PlaintextConsumerTest extends BaseConsumerTest {
this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "roundrobin-group")
this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
classOf[RoundRobinAssignor].getName)
- val consumerCount = 10
- val rrConsumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
- for (i <- 0 until consumerCount) {
- rrConsumers += new KafkaConsumer(this.consumerConfig)
- }
-
// create two new topics, total number of partitions must be greater than number of consumers
val topic1 = "topic1"
val topic2 = "topic2"
val subscriptions = createTopicAndSendRecords(topic1, 5, 100) ++ createTopicAndSendRecords(topic2,
8, 100)
- // all consumers subscribe to all the topics and start polling
+ // create a group of consumers, subscribe the consumers to all the topics and start polling
// for the topic partition assignment
- val consumerPollers = Buffer[ConsumerAssignmentPoller]()
- for (consumer <- rrConsumers) {
- assertEquals(0, consumer.assignment().size)
- consumer.subscribe(List(topic1, topic2).asJava)
- val poller = new ConsumerAssignmentPoller(consumer)
- consumerPollers += poller
- poller.start()
- }
+ val (rrConsumers, consumerPollers) = createConsumerGroupAndWaitForAssignment(10, List(topic1,
topic2), subscriptions)
- TestUtils.waitUntilTrue(() => {
- val assignments = Buffer[Set[TopicPartition]]()
- consumerPollers.foreach(assignments += _.consumerAssignment())
- isPartitionAssignmentValid(assignments, subscriptions)
- }, s"Did not get valid initial assignment for partitions ${subscriptions.asJava}")
+ // add one more consumer and validate re-assignment
+ addConsumersToGroupAndWaitForGroupAssignment(1, rrConsumers, consumerPollers, List(topic1,
topic2), subscriptions)
+
+ // done with pollers and consumers
+ for (poller <- consumerPollers)
+ poller.shutdown()
- // add one more consumer
- val newConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig)
- newConsumer.subscribe(List(topic1, topic2).asJava)
- val newPoller = new ConsumerAssignmentPoller(newConsumer)
- rrConsumers += newConsumer
- consumerPollers += newPoller
- newPoller.start()
+ for (consumer <- rrConsumers)
+ consumer.unsubscribe()
+ }
- // wait until topics get re-assigned
- TestUtils.waitUntilTrue(() => {
- val assignments = Buffer[Set[TopicPartition]]()
- consumerPollers.foreach(assignments += _.consumerAssignment())
- isPartitionAssignmentValid(assignments, subscriptions)
- }, s"Did not get valid assignment for partitions ${subscriptions.asJava} after we added
one more consumer")
+ /**
+ * This test re-uses BaseConsumerTest's consumers.
+ * As a result, it is testing the default assignment strategy set by BaseConsumerTest
+ */
+ @Test
+ def testMultiConsumerDefaultAssignment() {
+ // use consumers and topics defined in this class + one more topic
+ sendRecords(100, tp)
+ sendRecords(100, tp2)
+ val topic1 = "topic1"
+ val subscriptions = Set(tp, tp2) ++ createTopicAndSendRecords(topic1, 5, 100)
+
+ // subscribe all consumers to all topics and validate the assignment
+ val consumerPollers = subscribeConsumersAndWaitForAssignment(consumers, List(topic, topic1),
subscriptions)
+
+ // add 2 more consumers and validate re-assignment
+ addConsumersToGroupAndWaitForGroupAssignment(2, consumers, consumerPollers, List(topic,
topic1), subscriptions)
+
+ // add one more topic and validate partition re-assignment
+ val topic2 = "topic2"
+ val expandedSubscriptions = subscriptions ++ createTopicAndSendRecords(topic2, 3, 100)
+ changeConsumerGroupSubscriptionAndValidateAssignment(consumerPollers, List(topic, topic1,
topic2), expandedSubscriptions)
+ // remove the topic we just added and validate re-assignment
+ changeConsumerGroupSubscriptionAndValidateAssignment(consumerPollers, List(topic, topic1),
subscriptions)
+
+ // done with pollers and consumers
for (poller <- consumerPollers)
poller.shutdown()
- for (consumer <- rrConsumers) {
+ for (consumer <- consumers)
consumer.unsubscribe()
- }
+ }
+
+ @Test
+ def testMultiConsumerSessionTimeoutOnStopPolling(): Unit = {
+ runMultiConsumerSessionTimeoutTest(false)
+ }
+
+ @Test
+ def testMultiConsumerSessionTimeoutOnClose(): Unit = {
+ runMultiConsumerSessionTimeoutTest(true)
+ }
+
+ def runMultiConsumerSessionTimeoutTest(closeConsumer: Boolean): Unit = {
+ // use consumers defined in this class plus one additional consumer
+ // Use topic defined in this class + one additional topic
+ sendRecords(100, tp)
+ sendRecords(100, tp2)
+ val topic1 = "topic1"
+ val subscriptions = Set(tp, tp2) ++ createTopicAndSendRecords(topic1, 6, 100)
+
+ // first subscribe consumers that are defined in this class
+ val consumerPollers = Buffer[ConsumerAssignmentPoller]()
+ for (consumer <- consumers)
+ consumerPollers += subscribeConsumerAndStartPolling(consumer, List(topic, topic1))
+
+ // create one more consumer and add it to the group; we will timeout this consumer
+ val timeoutConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig)
+ val expandedConsumers = consumers ++ Buffer[KafkaConsumer[Array[Byte], Array[Byte]]](timeoutConsumer)
+ val timeoutPoller = subscribeConsumerAndStartPolling(timeoutConsumer, List(topic, topic1))
+ val expandedPollers = consumerPollers ++ Buffer[ConsumerAssignmentPoller](timeoutPoller)
+
+ // validate the initial assignment
+ validateGroupAssignment(expandedPollers, subscriptions, s"Did not get valid initial assignment
for partitions ${subscriptions.asJava}")
+
+ // stop polling and close one of the consumers, should trigger partition re-assignment
among alive consumers
+ timeoutPoller.shutdown()
+ if (closeConsumer)
+ timeoutConsumer.close()
+
+ val maxSessionTimeout = this.serverConfig.getProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp).toLong
+ validateGroupAssignment(consumerPollers, subscriptions,
+ s"Did not get valid assignment for partitions ${subscriptions.asJava} after one consumer
left", 3*maxSessionTimeout)
+
+ // done with pollers and consumers
+ for (poller <- consumerPollers)
+ poller.shutdown()
}
/**
@@ -481,4 +531,137 @@ class PlaintextConsumerTest extends BaseConsumerTest {
}
parts
}
+
+ /**
+ * Subscribes consumer 'consumer' to a given list of topics 'topicsToSubscribe', creates
+ * consumer poller and starts polling.
+ * Assumes that the consumer is not subscribed to any topics yet
+ * @param consumer consumer
+ * @param topicsToSubscribe topics that this consumer will subscribe to
+ * @return consumer poller for the given consumer
+ */
+ def subscribeConsumerAndStartPolling(consumer: Consumer[Array[Byte], Array[Byte]],
+ topicsToSubscribe: List[String]): ConsumerAssignmentPoller
= {
+ assertEquals(0, consumer.assignment().size)
+ val consumerPoller = new ConsumerAssignmentPoller(consumer, topicsToSubscribe)
+ consumerPoller.start()
+ consumerPoller
+ }
+
+ /**
+ * Creates consumer pollers corresponding to a given consumer group, one per consumer;
subscribes consumers to
+ * 'topicsToSubscribe' topics, waits until consumers get topics assignment, and validates
the assignment
+ * Currently, assignment validation requires that total number of partitions is greater
or equal to
+ * number of consumers (i.e. subscriptions.size >= consumerGroup.size)
+ * Assumes that topics are already created with partitions corresponding to a given set
of topic partitions ('subscriptions')
+ *
+ * When the function returns, consumer pollers will continue to poll until shutdown is
called on every poller.
+ *
+ * @param consumerGroup consumer group
+ * @param topicsToSubscribe topics to which consumers will subscribe to
+ * @param subscriptions set of all topic partitions
+ * @return collection of consumer pollers
+ */
+ def subscribeConsumersAndWaitForAssignment(consumerGroup: Buffer[KafkaConsumer[Array[Byte],
Array[Byte]]],
+ topicsToSubscribe: List[String],
+ subscriptions: Set[TopicPartition]): Buffer[ConsumerAssignmentPoller]
= {
+ val consumerPollers = Buffer[ConsumerAssignmentPoller]()
+ for (consumer <- consumerGroup)
+ consumerPollers += subscribeConsumerAndStartPolling(consumer, topicsToSubscribe)
+ validateGroupAssignment(consumerPollers, subscriptions, s"Did not get valid initial assignment
for partitions ${subscriptions.asJava}")
+ consumerPollers
+ }
+
+ /**
+ * Creates 'consumerCount' consumers and consumer pollers, one per consumer; subscribes
consumers to
+ * 'topicsToSubscribe' topics, waits until consumers get topics assignment, and validates
the assignment
+ * Currently, assignment validation requires that total number of partitions is greater
or equal to
+ * number of consumers (i.e. subscriptions.size >= consumerCount)
+ * Assumes that topics are already created with partitions corresponding to a given set
of topic partitions ('subscriptions')
+ *
+ * When the function returns, consumer pollers will continue to poll until shutdown is
called on every poller.
+ *
+ * @param consumerCount number of consumers to create
+ * @param topicsToSubscribe topics to which consumers will subscribe to
+ * @param subscriptions set of all topic partitions
+ * @return collection of created consumers and collection of corresponding consumer pollers
+ */
+ def createConsumerGroupAndWaitForAssignment(consumerCount: Int,
+ topicsToSubscribe: List[String],
+ subscriptions: Set[TopicPartition]): (Buffer[KafkaConsumer[Array[Byte],
Array[Byte]]], Buffer[ConsumerAssignmentPoller]) = {
+ assertTrue(consumerCount <= subscriptions.size)
+ val consumerGroup = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
+ for (i <- 0 until consumerCount)
+ consumerGroup += new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig)
+
+ // create consumer pollers, wait for assignment and validate it
+ val consumerPollers = subscribeConsumersAndWaitForAssignment(consumerGroup, topicsToSubscribe,
subscriptions)
+
+ (consumerGroup, consumerPollers)
+ }
+
+ /**
+ * Create 'numOfConsumersToAdd' consumers add then to the consumer group 'consumerGroup',
and create corresponding
+ * pollers for these consumers. Wait for partition re-assignment and validate.
+ *
+ * Currently, assignment validation requires that total number of partitions is greater
or equal to
+ * number of consumers, so subscriptions.size must be greate or equal the resulting number
of consumers in the group
+ *
+ * @param numOfConsumersToAdd number of consumers to create and add to the consumer group
+ * @param consumerGroup current consumer group
+ * @param consumerPollers current consumer pollers
+ * @param topicsToSubscribe topics to which new consumers will subsribe to
+ * @param subscriptions set of all topic partitions
+ */
+ def addConsumersToGroupAndWaitForGroupAssignment(numOfConsumersToAdd: Int,
+ consumerGroup: Buffer[KafkaConsumer[Array[Byte],
Array[Byte]]],
+ consumerPollers: Buffer[ConsumerAssignmentPoller],
+ topicsToSubscribe: List[String],
+ subscriptions: Set[TopicPartition]): Unit
= {
+ assertTrue(consumerGroup.size + numOfConsumersToAdd <= subscriptions.size)
+ for (i <- 0 until numOfConsumersToAdd) {
+ val newConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig)
+ consumerGroup += newConsumer
+ consumerPollers += subscribeConsumerAndStartPolling(newConsumer, topicsToSubscribe)
+ }
+
+ // wait until topics get re-assigned and validate assignment
+ validateGroupAssignment(consumerPollers, subscriptions,
+ s"Did not get valid assignment for partitions ${subscriptions.asJava} after we added
${numOfConsumersToAdd} consumer(s)")
+ }
+
+ /**
+ * Wait for consumers to get partition assignment and validate it.
+ *
+ * @param consumerPollers consumer pollers corresponding to the consumer group we are testing
+ * @param subscriptions set of all topic partitions
+ * @param msg message to print when waiting for/validating assignment fails
+ */
+ def validateGroupAssignment(consumerPollers: Buffer[ConsumerAssignmentPoller],
+ subscriptions: Set[TopicPartition],
+ msg: String,
+ waitTime: Long = 10000L): Unit = {
+ TestUtils.waitUntilTrue(() => {
+ val assignments = Buffer[Set[TopicPartition]]()
+ consumerPollers.foreach(assignments += _.consumerAssignment())
+ isPartitionAssignmentValid(assignments, subscriptions)
+ }, msg, waitTime)
+ }
+
+ def changeConsumerGroupSubscriptionAndValidateAssignment(consumerPollers: Buffer[ConsumerAssignmentPoller],
+ topicsToSubscribe: List[String],
+ subscriptions: Set[TopicPartition]):
Unit = {
+ for (poller <- consumerPollers)
+ poller.subscribe(topicsToSubscribe)
+
+ // since subscribe call to poller does not actually call consumer subsribe right away,
wait
+ // until subscribe is called on all consumers
+ TestUtils.waitUntilTrue(() => {
+ consumerPollers forall (poller => poller.isSubscribeRequestProcessed())
+ }, s"Failed to call subscribe on all consumers in the group for subscription ${subscriptions}",
1000L)
+
+ validateGroupAssignment(consumerPollers, subscriptions,
+ s"Did not get valid assignment for partitions ${subscriptions.asJava} after we changed
subscription")
+ }
+
}
|