kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maniku...@apache.org
Subject [kafka] branch 2.2 updated: KAFKA-8098: Fix Flaky Test testConsumerGroups
Date Wed, 20 Mar 2019 14:01:52 GMT
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new 5918ac2  KAFKA-8098: Fix Flaky Test testConsumerGroups
5918ac2 is described below

commit 5918ac2981caa5d08eaa92765e818cb164449757
Author: huxi <huxi_2b@hotmail.com>
AuthorDate: Wed Mar 20 22:01:31 2019 +0800

    KAFKA-8098: Fix Flaky Test testConsumerGroups
    
    - The flaky failure is caused by the fact that the main thread sometimes issues DescribeConsumerGroup
request before the consumer assignment takes effect. Added a latch to make sure such situation
is not going to happen.
    
    Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
---
 .../kafka/api/AdminClientIntegrationTest.scala          | 17 +++++++++++++----
 1 file changed, 13 insertions(+), 4 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index e740283..08b2b01 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -19,7 +19,7 @@ package kafka.api
 import java.util
 import java.util.{Collections, Properties}
 import java.util.Arrays.asList
-import java.util.concurrent.{ExecutionException, TimeUnit}
+import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
 import java.io.File
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 
@@ -54,6 +54,7 @@ import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, Future}
 
 import java.lang.{Long => JLong}
+import java.time.{Duration => JDuration}
 
 /**
  * An integration test of the KafkaAdminClient.
@@ -1123,19 +1124,27 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
       newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId)
       newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId)
       val consumer = createConsumer(configOverrides = newConsumerConfig)
+      val latch = new CountDownLatch(1)
       try {
         // Start a consumer in a thread that will subscribe to a new group.
         val consumerThread = new Thread {
           override def run {
             consumer.subscribe(Collections.singleton(testTopicName))
-            while (true) {
-              consumer.poll(5000)
-              consumer.commitSync()
+            try {
+              while (true) {
+                consumer.poll(JDuration.ofSeconds(5))
+                if (!consumer.assignment.isEmpty && latch.getCount > 0L)
+                  latch.countDown()
+                consumer.commitSync()
+              }
+            } catch {
+              case _: InterruptException => // Suppress the output to stderr
             }
           }
         }
         try {
           consumerThread.start
+          assertTrue(latch.await(30000, TimeUnit.MILLISECONDS))
           // Test that we can list the new group.
           TestUtils.waitUntilTrue(() => {
             val matching = client.listConsumerGroups.all.get().asScala.filter(_.groupId ==
testGroupId)


Mime
View raw message