kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7604; Fix flaky unit test `testRebalanceAfterTopicUnavailableWithPatternSubscribe` (#5889)
Date Thu, 08 Nov 2018 13:37:16 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 29383d6  KAFKA-7604; Fix flaky unit test `testRebalanceAfterTopicUnavailableWithPatternSubscribe`
(#5889)
29383d6 is described below

commit 29383d6d6a3d42d30e815fbbb084275d449928c8
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Thu Nov 8 05:37:05 2018 -0800

    KAFKA-7604; Fix flaky unit test `testRebalanceAfterTopicUnavailableWithPatternSubscribe`
(#5889)
    
    The problem is the concurrent metadata updates in the foreground and in the heartbeat
thread. Changed the code to use ConsumerNetworkClient.poll, which enforces mutual exclusion
when accessing the underlying client.
---
 .../clients/consumer/internals/ConsumerCoordinatorTest.java    | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 72808c8..b430078 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -558,7 +558,7 @@ public class ConsumerCoordinatorTest {
         // Refresh the metadata again. Since there have been no changes since the last refresh,
it won't trigger
         // rebalance again.
         metadata.requestUpdate();
-        client.poll(Long.MAX_VALUE, time.milliseconds());
+        consumerClient.poll(time.timer(Long.MAX_VALUE));
         assertFalse(coordinator.rejoinNeededOrPending());
     }
 
@@ -1010,13 +1010,13 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
         Map<String, List<String>> memberSubscriptions = singletonMap(consumerId,
singletonList(topic1));
-        partitionAssignor.prepare(Collections.<String, List<TopicPartition>>emptyMap());
+        partitionAssignor.prepare(Collections.emptyMap());
 
         client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions,
Errors.NONE));
-        client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(),
Errors.NONE));
+        client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.NONE));
         coordinator.poll(time.timer(Long.MAX_VALUE));
         assertFalse(coordinator.rejoinNeededOrPending());
-        assertEquals(Collections.<TopicPartition>emptySet(), rebalanceListener.assigned);
+        assertEquals(Collections.emptySet(), rebalanceListener.assigned);
         assertTrue("Metadata refresh not requested for unavailable partitions", metadata.updateRequested());
 
         Map<String, Errors> topicErrors = new HashMap<>();
@@ -1026,7 +1026,7 @@ public class ConsumerCoordinatorTest {
         client.prepareMetadataUpdate(TestUtils.metadataUpdateWith("kafka-cluster", 1,
                 topicErrors, singletonMap(topic1, 1)));
 
-        client.poll(0, time.milliseconds());
+        consumerClient.poll(time.timer(0));
         client.prepareResponse(joinGroupLeaderResponse(2, consumerId, memberSubscriptions,
Errors.NONE));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
         coordinator.poll(time.timer(Long.MAX_VALUE));


Mime
View raw message