kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-4147; Fix transient failure in ConsumerCoordinatorTest.testAutoCommitDynamicAssignment
Date Sun, 11 Sep 2016 07:46:27 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk ac99a3c86 -> 1933f12a5


KAFKA-4147; Fix transient failure in ConsumerCoordinatorTest.testAutoCommitDynamicAssignment

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #1841 from hachikuji/KAFKA-4147


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1933f12a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1933f12a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1933f12a

Branch: refs/heads/trunk
Commit: 1933f12a5368f1f60816d986a2777fa324754f80
Parents: ac99a3c
Author: Jason Gustafson <jason@confluent.io>
Authored: Sun Sep 11 08:46:20 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Sun Sep 11 08:46:20 2016 +0100

----------------------------------------------------------------------
 .../consumer/internals/AbstractCoordinator.java | 10 +++++--
 .../internals/ConsumerCoordinatorTest.java      | 29 ++++++++++----------
 2 files changed, 23 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1933f12a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 0766f3d..7ae79c0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -275,6 +275,11 @@ public abstract class AbstractCoordinator implements Closeable {
             heartbeatThread.start();
         }
 
+        doJoinGroup();
+    }
+
+    // visible for testing. Joins the group without starting the heartbeat thread.
+    synchronized void doJoinGroup() {
         while (needRejoin()) {
             ensureCoordinatorReady();
 
@@ -311,7 +316,9 @@ public abstract class AbstractCoordinator implements Closeable {
                             joinFuture = null;
                             state = MemberState.STABLE;
                             needsJoinPrepare = true;
-                            heartbeatThread.enable();
+
+                            if (heartbeatThread != null)
+                                heartbeatThread.enable();
                         }
 
                         onJoinComplete(generation.generationId, generation.memberId, generation.protocol,
value);
@@ -818,7 +825,6 @@ public abstract class AbstractCoordinator implements Closeable {
         @Override
         public void run() {
             try {
-
                 while (true) {
                     synchronized (AbstractCoordinator.this) {
                         if (closed)

http://git-wip-us.apache.org/repos/asf/kafka/blob/1933f12a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 0486e6c..624e144 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -451,7 +451,7 @@ public class ConsumerCoordinatorTest {
             }
         }, syncGroupResponse(singletonList(tp), Errors.NONE.code()));
 
-        coordinator.poll(time.milliseconds());
+        coordinator.doJoinGroup();
 
         assertFalse(coordinator.needRejoin());
         assertEquals(singleton(tp), subscriptions.assignedPartitions());
@@ -471,7 +471,7 @@ public class ConsumerCoordinatorTest {
 
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
-        coordinator.poll(time.milliseconds());
+        coordinator.doJoinGroup();
 
         final AtomicBoolean received = new AtomicBoolean(false);
         client.prepareResponse(new MockClient.RequestMatcher() {
@@ -498,7 +498,7 @@ public class ConsumerCoordinatorTest {
 
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
-        coordinator.poll(time.milliseconds());
+        coordinator.doJoinGroup();
 
         final AtomicBoolean received = new AtomicBoolean(false);
         client.prepareResponse(new MockClient.RequestMatcher() {
@@ -529,7 +529,7 @@ public class ConsumerCoordinatorTest {
         // join initially, but let coordinator rebalance on sync
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(),
Errors.UNKNOWN.code()));
-        coordinator.poll(time.milliseconds());
+        coordinator.doJoinGroup();
     }
 
     @Test
@@ -555,7 +555,7 @@ public class ConsumerCoordinatorTest {
         }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
 
-        coordinator.poll(time.milliseconds());
+        coordinator.doJoinGroup();
 
         assertFalse(coordinator.needRejoin());
         assertEquals(singleton(tp), subscriptions.assignedPartitions());
@@ -578,7 +578,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
 
-        coordinator.poll(time.milliseconds());
+        coordinator.doJoinGroup();
 
         assertFalse(coordinator.needRejoin());
         assertEquals(singleton(tp), subscriptions.assignedPartitions());
@@ -607,7 +607,7 @@ public class ConsumerCoordinatorTest {
         }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
 
-        coordinator.poll(time.milliseconds());
+        coordinator.doJoinGroup();
 
         assertFalse(coordinator.needRejoin());
         assertEquals(singleton(tp), subscriptions.assignedPartitions());
@@ -728,7 +728,7 @@ public class ConsumerCoordinatorTest {
         // join the group once
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
-        coordinator.poll(time.milliseconds());
+        coordinator.doJoinGroup();
 
         assertEquals(1, rebalanceListener.revokedCount);
         assertTrue(rebalanceListener.revoked.isEmpty());
@@ -739,7 +739,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(new HashSet<>(Arrays.asList(topicName, otherTopic)),
rebalanceListener);
         client.prepareResponse(joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
-        coordinator.poll(time.milliseconds());
+        coordinator.doJoinGroup();
 
         assertEquals(2, rebalanceListener.revokedCount);
         assertEquals(singleton(tp), rebalanceListener.revoked);
@@ -759,7 +759,8 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
-        coordinator.poll(time.milliseconds());
+        coordinator.doJoinGroup();
+
         assertFalse(coordinator.needRejoin());
         assertEquals(singleton(tp), subscriptions.assignedPartitions());
         assertEquals(1, rebalanceListener.revokedCount);
@@ -776,7 +777,7 @@ public class ConsumerCoordinatorTest {
 
         // coordinator doesn't like the session timeout
         client.prepareResponse(joinGroupFollowerResponse(0, "consumer", "", Errors.INVALID_SESSION_TIMEOUT.code()));
-        coordinator.poll(time.milliseconds());
+        coordinator.doJoinGroup();
     }
 
     @Test
@@ -810,7 +811,7 @@ public class ConsumerCoordinatorTest {
 
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
-        coordinator.poll(time.milliseconds());
+        coordinator.doJoinGroup();
 
         subscriptions.seek(tp, 100);
 
@@ -839,7 +840,7 @@ public class ConsumerCoordinatorTest {
 
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
-        coordinator.poll(time.milliseconds());
+        coordinator.doJoinGroup();
 
         subscriptions.seek(tp, 100);
 
@@ -935,7 +936,7 @@ public class ConsumerCoordinatorTest {
 
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
-        coordinator.poll(time.milliseconds());
+        coordinator.doJoinGroup();
 
         // now switch to manual assignment
         client.prepareResponse(new LeaveGroupResponse(Errors.NONE.code()).toStruct());


Mime
View raw message