kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maniku...@apache.org
Subject [kafka] branch trunk updated: KAFKA-8098: Fix Flaky Test testConsumerGroups
Date Wed, 20 Mar 2019 05:18:26 GMT
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f6f8da7  KAFKA-8098: Fix Flaky Test testConsumerGroups
f6f8da7 is described below

commit f6f8da70713be6486b9ca085f4130043a3b9e9aa
Author: huxihx <huxi_2b@hotmail.com>
AuthorDate: Wed Mar 20 10:47:34 2019 +0530

    KAFKA-8098: Fix Flaky Test testConsumerGroups
    
    - The flaky failure is caused by the fact that the main thread sometimes issues DescribeConsumerGroup
request before the consumer assignment takes effect. Added a latch to make sure such situation
is not going to happen.
    
    Author: huxihx <huxi_2b@hotmail.com>
    Author: huxi <huxi_2b@hotmail.com>
    Author: Manikumar Reddy <manikumar.reddy@gmail.com>
    
    Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
    
    Closes #6441 from huxihx/KAFKA-8098
---
 .../kafka/api/AdminClientIntegrationTest.scala         | 18 ++++++++++++++----
 1 file changed, 14 insertions(+), 4 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 027266d..5c72cbf 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -19,7 +19,7 @@ package kafka.api
 import java.{time, util}
 import java.util.{Collections, Properties}
 import java.util.Arrays.asList
-import java.util.concurrent.{ExecutionException, TimeUnit}
+import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
 import java.io.File
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 
@@ -52,6 +52,7 @@ import kafka.zk.KafkaZkClient
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, Future}
 import java.lang.{Long => JLong}
+import java.time.{Duration => JDuration}
 
 import kafka.security.auth.{Cluster, Group, Topic}
 
@@ -1158,19 +1159,28 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
       newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId)
       newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId)
       val consumer = createConsumer(configOverrides = newConsumerConfig)
+      val latch = new CountDownLatch(1)
       try {
         // Start a consumer in a thread that will subscribe to a new group.
         val consumerThread = new Thread {
           override def run {
             consumer.subscribe(Collections.singleton(testTopicName))
-            while (true) {
-              consumer.poll(time.Duration.ofSeconds(5L))
-              consumer.commitSync()
+
+            try {
+              while (true) {
+                consumer.poll(JDuration.ofSeconds(5))
+                if (!consumer.assignment.isEmpty && latch.getCount > 0L)
+                  latch.countDown()
+                consumer.commitSync()
+              }
+            } catch {
+              case _: InterruptException => // Suppress the output to stderr
             }
           }
         }
         try {
           consumerThread.start
+          assertTrue(latch.await(30000, TimeUnit.MILLISECONDS))
           // Test that we can list the new group.
           TestUtils.waitUntilTrue(() => {
             val matching = client.listConsumerGroups.all.get().asScala.filter(_.groupId ==
testGroupId)


Mime
View raw message