This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push:
new 5918ac2 KAFKA-8098: Fix Flaky Test testConsumerGroups
5918ac2 is described below
commit 5918ac2981caa5d08eaa92765e818cb164449757
Author: huxi <huxi_2b@hotmail.com>
AuthorDate: Wed Mar 20 22:01:31 2019 +0800
KAFKA-8098: Fix Flaky Test testConsumerGroups
- The flaky failure is caused by the fact that the main thread sometimes issues DescribeConsumerGroup
request before the consumer assignment takes effect. Added a latch to make sure such situation
is not going to happen.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
---
.../kafka/api/AdminClientIntegrationTest.scala | 17 +++++++++++++----
1 file changed, 13 insertions(+), 4 deletions(-)
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index e740283..08b2b01 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -19,7 +19,7 @@ package kafka.api
import java.util
import java.util.{Collections, Properties}
import java.util.Arrays.asList
-import java.util.concurrent.{ExecutionException, TimeUnit}
+import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
import java.io.File
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
@@ -54,6 +54,7 @@ import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import java.lang.{Long => JLong}
+import java.time.{Duration => JDuration}
/**
* An integration test of the KafkaAdminClient.
@@ -1123,19 +1124,27 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId)
newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId)
val consumer = createConsumer(configOverrides = newConsumerConfig)
+ val latch = new CountDownLatch(1)
try {
// Start a consumer in a thread that will subscribe to a new group.
val consumerThread = new Thread {
override def run {
consumer.subscribe(Collections.singleton(testTopicName))
- while (true) {
- consumer.poll(5000)
- consumer.commitSync()
+ try {
+ while (true) {
+ consumer.poll(JDuration.ofSeconds(5))
+ if (!consumer.assignment.isEmpty && latch.getCount > 0L)
+ latch.countDown()
+ consumer.commitSync()
+ }
+ } catch {
+ case _: InterruptException => // Suppress the output to stderr
}
}
}
try {
consumerThread.start
+ assertTrue(latch.await(30000, TimeUnit.MILLISECONDS))
// Test that we can list the new group.
TestUtils.waitUntilTrue(() => {
val matching = client.listConsumerGroups.all.get().asScala.filter(_.groupId ==
testGroupId)
|