kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-3299: Ensure that reading config log on rebalance doesn't hang the herder
Date Fri, 04 Mar 2016 22:21:30 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 6d65edeba -> b7d6fae59


KAFKA-3299: Ensure that reading config log on rebalance doesn't hang the herder

Author: Gwen Shapira <cshapi@gmail.com>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #981 from gwenshap/KAFKA-3299


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b7d6fae5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b7d6fae5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b7d6fae5

Branch: refs/heads/trunk
Commit: b7d6fae59ed782833981675efa0304ce2b12a59e
Parents: 6d65ede
Author: Gwen Shapira <cshapi@gmail.com>
Authored: Fri Mar 4 14:21:22 2016 -0800
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Fri Mar 4 14:21:22 2016 -0800

----------------------------------------------------------------------
 .../runtime/distributed/DistributedHerder.java  | 32 ++++++++++++--------
 .../runtime/distributed/WorkerGroupMember.java  |  4 +++
 .../distributed/DistributedHerderTest.java      |  2 ++
 3 files changed, 25 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b7d6fae5/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 84ad6e0..16b950b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -100,6 +100,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable
{
     // and the from other nodes are safe to process
     private boolean rebalanceResolved;
     private ConnectProtocol.Assignment assignment;
+    private boolean canReadConfigs;
 
     // To handle most external requests, like creating or destroying a connector, we can
use a generic request where
     // the caller specifies all the code that should be executed.
@@ -150,6 +151,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable
{
 
         rebalanceResolved = true; // If we still need to follow up after a rebalance occurred,
starting up tasks
         needsReconfigRebalance = false;
+        canReadConfigs = true; // We didn't try yet, but Configs are readable until proven
otherwise
 
         forwardRequestExecutor = Executors.newSingleThreadExecutor();
     }
@@ -206,6 +208,11 @@ public class DistributedHerder extends AbstractHerder implements Runnable
{
         // blocking up this thread (especially those in callbacks due to rebalance events).
 
         try {
+            // if we failed to read to end of log before, we need to make sure the issue
was resolved before joining group
+            // Joining and immediately leaving for failure to read configs is exceedingly
impolite
+            if (!canReadConfigs && !readConfigToEnd(workerSyncTimeoutMs))
+                return; // Safe to return and tick immediately because readConfigToEnd will
do the backoff for us
+
             member.ensureActive();
             // Ensure we're in a good state in our group. If not restart and everything should
be setup to rejoin
             if (!handleRebalanceCompleted()) return;
@@ -574,21 +581,18 @@ public class DistributedHerder extends AbstractHerder implements Runnable
{
         //      even attempting to. If we can't we should drop out of the group because we
will block everyone from making
         //      progress. We can backoff and try rejoining later.
         //  1b. We are not the leader. We might need to catch up. If we're already caught
up we can rejoin immediately,
-        //      otherwise, we just want to wait indefinitely to catch up and rejoin whenever
we're finally ready.
+        //      otherwise, we just want to wait reasonable amount of time to catch up and
rejoin if we are ready.
         // 2. Assignment succeeded.
         //  2a. We are caught up on configs. Awesome! We can proceed to run our assigned
work.
-        //  2b. We need to try to catch up. We can do this potentially indefinitely because
if it takes to long, we'll
-        //      be kicked out of the group anyway due to lack of heartbeats.
+        //  2b. We need to try to catch up - try reading configs for reasonable amount of
time.
 
         boolean needsReadToEnd = false;
-        long syncConfigsTimeoutMs = Long.MAX_VALUE;
         boolean needsRejoin = false;
         if (assignment.failed()) {
             needsRejoin = true;
             if (isLeader()) {
                 log.warn("Join group completed, but assignment failed and we are the leader.
Reading to end of config and retrying.");
                 needsReadToEnd = true;
-                syncConfigsTimeoutMs = workerSyncTimeoutMs;
             } else if (configState.offset() < assignment.offset()) {
                 log.warn("Join group completed, but assignment failed and we lagging. Reading
to end of config and retrying.");
                 needsReadToEnd = true;
@@ -604,11 +608,13 @@ public class DistributedHerder extends AbstractHerder implements Runnable
{
 
         if (needsReadToEnd) {
             // Force exiting this method to avoid creating any connectors/tasks and require
immediate rejoining if
-            // we timed out. This should only happen if we were the leader and didn't finish
quickly enough, in which
-            // case we've waited a long time and should have already left the group OR the
timeout should have been
-            // very long and not having finished also indicates we've waited longer than
the session timeout.
-            if (!readConfigToEnd(syncConfigsTimeoutMs))
+            // we timed out. This should only happen if we failed to read configuration for
long enough,
+            // in which case giving back control to the main loop will prevent hanging around
indefinitely after getting kicked out of the group.
+            // We also indicate to the main loop that we failed to readConfigs so it will
check that the issue was resolved before trying to join the group
+            if (!readConfigToEnd(workerSyncTimeoutMs)) {
+                canReadConfigs = false;
                 needsRejoin = true;
+            }
         }
 
         if (needsRejoin) {
@@ -646,11 +652,11 @@ public class DistributedHerder extends AbstractHerder implements Runnable
{
             log.info("Finished reading to end of log and updated config snapshot, new config
log offset: {}", configState.offset());
             return true;
         } catch (TimeoutException e) {
+            // in case reading the log takes too long, leave the group to ensure a quick
rebalance (although by default we should be out of the group already)
+            // and back off to avoid a tight loop of rejoin-attempt-to-catch-up-leave
             log.warn("Didn't reach end of config log quickly enough", e);
-            // TODO: With explicit leave group support, it would be good to explicitly leave
the group *before* this
-            // backoff since it'll be longer than the session timeout
-            if (isLeader())
-                backoff(workerUnsyncBackoffMs);
+            member.maybeLeaveGroup();
+            backoff(workerUnsyncBackoffMs);
             return false;
         } catch (InterruptedException | ExecutionException e) {
             throw new ConnectException("Error trying to catch up after assignment", e);

http://git-wip-us.apache.org/repos/asf/kafka/blob/b7d6fae5/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index 9f05040..7294ed4 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -170,6 +170,10 @@ public class WorkerGroupMember {
         coordinator.requestRejoin();
     }
 
+    public void maybeLeaveGroup() {
+        coordinator.maybeLeaveGroup();
+    }
+
     private void stop(boolean swallowException) {
         log.trace("Stopping the Connect group member.");
         AtomicReference<Throwable> firstException = new AtomicReference<Throwable>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/b7d6fae5/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 8017ecf..aa747f6 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -440,6 +440,8 @@ public class DistributedHerderTest {
         TestFuture<Void> readToEndFuture = new TestFuture<>();
         readToEndFuture.resolveOnGet(new TimeoutException());
         EasyMock.expect(configStorage.readToEnd()).andReturn(readToEndFuture);
+        member.maybeLeaveGroup();
+        EasyMock.expectLastCall();
         PowerMock.expectPrivate(herder, "backoff", DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_DEFAULT);
         member.requestRejoin();
 


Mime
View raw message