kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4160: Ensure rebalance listener not called with coordinator lock
Date Thu, 15 Sep 2016 05:31:56 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 084a19e9a -> f197ad499


KAFKA-4160: Ensure rebalance listener not called with coordinator lock

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1855 from hachikuji/KAFKA-4160


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f197ad49
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f197ad49
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f197ad49

Branch: refs/heads/trunk
Commit: f197ad4997032a848540a7d577b5846f76a26bfb
Parents: 084a19e
Author: Jason Gustafson <jason@confluent.io>
Authored: Wed Sep 14 22:31:52 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Sep 14 22:31:52 2016 -0700

----------------------------------------------------------------------
 .../consumer/internals/AbstractCoordinator.java | 100 +++++++++++--------
 .../internals/ConsumerCoordinatorTest.java      |  28 +++---
 .../tests/streams/streams_smoke_test.py         |   1 -
 3 files changed, 73 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f197ad49/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 2f36239..159ac27 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -265,21 +265,28 @@ public abstract class AbstractCoordinator implements Closeable {
     /**
      * Ensure that the group is active (i.e. joined and synced)
      */
-    public synchronized void ensureActiveGroup() {
+    public void ensureActiveGroup() {
         // always ensure that the coordinator is ready because we may have been disconnected
         // when sending heartbeats and does not necessarily require us to rejoin the group.
         ensureCoordinatorReady();
+        startHeartbeatThreadIfNeeded();
+        joinGroupIfNeeded();
+    }
 
+    private synchronized void startHeartbeatThreadIfNeeded() {
         if (heartbeatThread == null) {
             heartbeatThread = new HeartbeatThread();
             heartbeatThread.start();
         }
+    }
 
-        doJoinGroup();
+    private synchronized void disableHeartbeatThread() {
+        if (heartbeatThread != null)
+            heartbeatThread.disable();
     }
 
     // visible for testing. Joins the group without starting the heartbeat thread.
-    synchronized void doJoinGroup() {
+    void joinGroupIfNeeded() {
         while (needRejoin()) {
             ensureCoordinatorReady();
 
@@ -293,6 +300,11 @@ public abstract class AbstractCoordinator implements Closeable {
                 needsJoinPrepare = false;
             }
 
+            // fence off the heartbeat thread explicitly so that it cannot interfere with
the join group.
+            // Note that this must come after the call to onJoinPrepare since we must be
able to continue
+            // sending heartbeats if that callback takes some time.
+            disableHeartbeatThread();
+
             // ensure that there are no pending requests to the coordinator. This is important
             // in particular to avoid resending a pending JoinGroup request.
             if (client.pendingRequestCount(this.coordinator) > 0) {
@@ -300,46 +312,14 @@ public abstract class AbstractCoordinator implements Closeable {
                 continue;
             }
 
-            // we store the join future in case we are woken up by the user after beginning
the
-            // rebalance in the call to poll below. This ensures that we do not mistakenly
attempt
-            // to rejoin before the pending rebalance has completed.
-            if (joinFuture == null) {
-                state = MemberState.REBALANCING;
-                joinFuture = sendJoinGroupRequest();
-                joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
-                    @Override
-                    public void onSuccess(ByteBuffer value) {
-                        // handle join completion in the callback so that the callback will
be invoked
-                        // even if the consumer is woken up before finishing the rebalance
-                        synchronized (AbstractCoordinator.this) {
-                            log.info("Successfully joined group {} with generation {}", groupId,
generation.generationId);
-                            joinFuture = null;
-                            state = MemberState.STABLE;
-                            needsJoinPrepare = true;
-
-                            if (heartbeatThread != null)
-                                heartbeatThread.enable();
-                        }
-
-                        onJoinComplete(generation.generationId, generation.memberId, generation.protocol,
value);
-                    }
-
-                    @Override
-                    public void onFailure(RuntimeException e) {
-                        // we handle failures below after the request finishes. if the join
completes
-                        // after having been woken up, the exception is ignored and we will
rejoin
-                        synchronized (AbstractCoordinator.this) {
-                            joinFuture = null;
-                            state = MemberState.UNJOINED;
-                        }
-                    }
-                });
-            }
-
-            RequestFuture<ByteBuffer> future = joinFuture;
+            RequestFuture<ByteBuffer> future = initiateJoinGroup();
             client.poll(future);
+            resetJoinGroupFuture();
 
-            if (future.failed()) {
+            if (future.succeeded()) {
+                needsJoinPrepare = true;
+                onJoinComplete(generation.generationId, generation.memberId, generation.protocol,
future.value());
+            } else {
                 RuntimeException exception = future.exception();
                 if (exception instanceof UnknownMemberIdException ||
                         exception instanceof RebalanceInProgressException ||
@@ -352,6 +332,44 @@ public abstract class AbstractCoordinator implements Closeable {
         }
     }
 
+    private synchronized void resetJoinGroupFuture() {
+        this.joinFuture = null;
+    }
+
+    private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
+        // we store the join future in case we are woken up by the user after beginning the
+        // rebalance in the call to poll below. This ensures that we do not mistakenly attempt
+        // to rejoin before the pending rebalance has completed.
+        if (joinFuture == null) {
+            state = MemberState.REBALANCING;
+            joinFuture = sendJoinGroupRequest();
+            joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
+                @Override
+                public void onSuccess(ByteBuffer value) {
+                    // handle join completion in the callback so that the callback will be
invoked
+                    // even if the consumer is woken up before finishing the rebalance
+                    synchronized (AbstractCoordinator.this) {
+                        log.info("Successfully joined group {} with generation {}", groupId,
generation.generationId);
+                        state = MemberState.STABLE;
+
+                        if (heartbeatThread != null)
+                            heartbeatThread.enable();
+                    }
+                }
+
+                @Override
+                public void onFailure(RuntimeException e) {
+                    // we handle failures below after the request finishes. if the join completes
+                    // after having been woken up, the exception is ignored and we will rejoin
+                    synchronized (AbstractCoordinator.this) {
+                        state = MemberState.UNJOINED;
+                    }
+                }
+            });
+        }
+        return joinFuture;
+    }
+
     /**
      * Join the group and return the assignment for the next generation. This function handles
both
      * JoinGroup and SyncGroup, delegating to {@link #performAssignment(String, String, Map)}
if

http://git-wip-us.apache.org/repos/asf/kafka/blob/f197ad49/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 624e144..924a582 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -451,7 +451,7 @@ public class ConsumerCoordinatorTest {
             }
         }, syncGroupResponse(singletonList(tp), Errors.NONE.code()));
 
-        coordinator.doJoinGroup();
+        coordinator.joinGroupIfNeeded();
 
         assertFalse(coordinator.needRejoin());
         assertEquals(singleton(tp), subscriptions.assignedPartitions());
@@ -471,7 +471,7 @@ public class ConsumerCoordinatorTest {
 
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
-        coordinator.doJoinGroup();
+        coordinator.joinGroupIfNeeded();
 
         final AtomicBoolean received = new AtomicBoolean(false);
         client.prepareResponse(new MockClient.RequestMatcher() {
@@ -498,7 +498,7 @@ public class ConsumerCoordinatorTest {
 
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
-        coordinator.doJoinGroup();
+        coordinator.joinGroupIfNeeded();
 
         final AtomicBoolean received = new AtomicBoolean(false);
         client.prepareResponse(new MockClient.RequestMatcher() {
@@ -529,7 +529,7 @@ public class ConsumerCoordinatorTest {
         // join initially, but let coordinator rebalance on sync
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(),
Errors.UNKNOWN.code()));
-        coordinator.doJoinGroup();
+        coordinator.joinGroupIfNeeded();
     }
 
     @Test
@@ -555,7 +555,7 @@ public class ConsumerCoordinatorTest {
         }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
 
-        coordinator.doJoinGroup();
+        coordinator.joinGroupIfNeeded();
 
         assertFalse(coordinator.needRejoin());
         assertEquals(singleton(tp), subscriptions.assignedPartitions());
@@ -578,7 +578,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
 
-        coordinator.doJoinGroup();
+        coordinator.joinGroupIfNeeded();
 
         assertFalse(coordinator.needRejoin());
         assertEquals(singleton(tp), subscriptions.assignedPartitions());
@@ -607,7 +607,7 @@ public class ConsumerCoordinatorTest {
         }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
 
-        coordinator.doJoinGroup();
+        coordinator.joinGroupIfNeeded();
 
         assertFalse(coordinator.needRejoin());
         assertEquals(singleton(tp), subscriptions.assignedPartitions());
@@ -728,7 +728,7 @@ public class ConsumerCoordinatorTest {
         // join the group once
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
-        coordinator.doJoinGroup();
+        coordinator.joinGroupIfNeeded();
 
         assertEquals(1, rebalanceListener.revokedCount);
         assertTrue(rebalanceListener.revoked.isEmpty());
@@ -739,7 +739,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(new HashSet<>(Arrays.asList(topicName, otherTopic)),
rebalanceListener);
         client.prepareResponse(joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
-        coordinator.doJoinGroup();
+        coordinator.joinGroupIfNeeded();
 
         assertEquals(2, rebalanceListener.revokedCount);
         assertEquals(singleton(tp), rebalanceListener.revoked);
@@ -759,7 +759,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
-        coordinator.doJoinGroup();
+        coordinator.joinGroupIfNeeded();
 
         assertFalse(coordinator.needRejoin());
         assertEquals(singleton(tp), subscriptions.assignedPartitions());
@@ -777,7 +777,7 @@ public class ConsumerCoordinatorTest {
 
         // coordinator doesn't like the session timeout
         client.prepareResponse(joinGroupFollowerResponse(0, "consumer", "", Errors.INVALID_SESSION_TIMEOUT.code()));
-        coordinator.doJoinGroup();
+        coordinator.joinGroupIfNeeded();
     }
 
     @Test
@@ -811,7 +811,7 @@ public class ConsumerCoordinatorTest {
 
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
-        coordinator.doJoinGroup();
+        coordinator.joinGroupIfNeeded();
 
         subscriptions.seek(tp, 100);
 
@@ -840,7 +840,7 @@ public class ConsumerCoordinatorTest {
 
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
-        coordinator.doJoinGroup();
+        coordinator.joinGroupIfNeeded();
 
         subscriptions.seek(tp, 100);
 
@@ -936,7 +936,7 @@ public class ConsumerCoordinatorTest {
 
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code()));
-        coordinator.doJoinGroup();
+        coordinator.joinGroupIfNeeded();
 
         // now switch to manual assignment
         client.prepareResponse(new LeaveGroupResponse(Errors.NONE.code()).toStruct());

http://git-wip-us.apache.org/repos/asf/kafka/blob/f197ad49/tests/kafkatest/tests/streams/streams_smoke_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams/streams_smoke_test.py b/tests/kafkatest/tests/streams/streams_smoke_test.py
index ea05c5f..e3c465a 100644
--- a/tests/kafkatest/tests/streams/streams_smoke_test.py
+++ b/tests/kafkatest/tests/streams/streams_smoke_test.py
@@ -44,7 +44,6 @@ class StreamsSmokeTest(KafkaTest):
         self.processor3 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
         self.processor4 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
 
-    @ignore
     def test_streams(self):
         """
         Start a few smoke test clients, then repeat start a new one, stop (cleanly) running
one a few times.


Mime
View raw message