kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2743: Make forwarded task reconfiguration requests asynchronous on backoff on retrying.
Date Thu, 05 Nov 2015 16:44:14 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 7d6515fb8 -> b30d68a4e


KAFKA-2743: Make forwarded task reconfiguration requests asynchronous on backoff on retrying.

Author: Ewen Cheslack-Postava <me@ewencp.org>

Reviewers: Guozhang Wang

Closes #422 from ewencp/task-reconfiguration-async-with-backoff


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b30d68a4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b30d68a4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b30d68a4

Branch: refs/heads/trunk
Commit: b30d68a4e31cb49c2e664bb2e3263e056a27db40
Parents: 7d6515f
Author: Ewen Cheslack-Postava <me@ewencp.org>
Authored: Thu Nov 5 08:49:58 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Nov 5 08:49:58 2015 -0800

----------------------------------------------------------------------
 .../runtime/distributed/DistributedHerder.java  | 264 +++++++++++++------
 .../distributed/DistributedHerderTest.java      |   5 +-
 2 files changed, 187 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b30d68a4/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
index 4c88737..96de1ca 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
@@ -19,6 +19,8 @@ package org.apache.kafka.copycat.runtime.distributed;
 
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.copycat.connector.ConnectorContext;
 import org.apache.kafka.copycat.errors.AlreadyExistsException;
@@ -46,12 +48,14 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.PriorityQueue;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -79,9 +83,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
 public class DistributedHerder implements Herder, Runnable {
     private static final Logger log = LoggerFactory.getLogger(DistributedHerder.class);
 
+    private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS = 250;
+
     private final Worker worker;
     private final KafkaConfigStorage configStorage;
     private ClusterConfigState configState;
+    private final Time time;
 
     private final int workerSyncTimeoutMs;
     private final int workerUnsyncBackoffMs;
@@ -97,18 +104,20 @@ public class DistributedHerder implements Herder, Runnable {
 
     // To handle most external requests, like creating or destroying a connector, we can
use a generic request where
     // the caller specifies all the code that should be executed.
-    private final Queue<HerderRequest> requests = new LinkedBlockingDeque<>();
+    private final Queue<HerderRequest> requests = new PriorityQueue<>();
     // Config updates can be collected and applied together when possible. Also, we need
to take care to rebalance when
     // needed (e.g. task reconfiguration, which requires everyone to coordinate offset commits).
     private Set<String> connectorConfigUpdates = new HashSet<>();
     private boolean needsReconfigRebalance;
 
+    private final ExecutorService forwardRequestExecutor;
+
     public DistributedHerder(DistributedConfig config, Worker worker, String restUrl) {
-        this(config, worker, null, null, restUrl);
+        this(config, worker, null, null, restUrl, new SystemTime());
     }
 
     // public for testing
-    public DistributedHerder(DistributedConfig config, Worker worker, KafkaConfigStorage
configStorage, WorkerGroupMember member, String restUrl) {
+    public DistributedHerder(DistributedConfig config, Worker worker, KafkaConfigStorage
configStorage, WorkerGroupMember member, String restUrl, Time time) {
         this.worker = worker;
         if (configStorage != null) {
             // For testing. Assume configuration has already been performed
@@ -118,6 +127,7 @@ public class DistributedHerder implements Herder, Runnable {
             this.configStorage.configure(config.originals());
         }
         configState = ClusterConfigState.EMPTY;
+        this.time = time;
 
         this.workerSyncTimeoutMs = config.getInt(DistributedConfig.WORKER_SYNC_TIMEOUT_MS_CONFIG);
         this.workerUnsyncBackoffMs = config.getInt(DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_CONFIG);
@@ -127,11 +137,13 @@ public class DistributedHerder implements Herder, Runnable {
 
         rebalanceResolved = true; // If we still need to follow up after a rebalance occurred,
starting up tasks
         needsReconfigRebalance = false;
+
+        forwardRequestExecutor = Executors.newSingleThreadExecutor();
     }
 
     @Override
     public void start() {
-        Thread thread = new Thread(this);
+        Thread thread = new Thread(this, "DistributedHerder");
         thread.start();
     }
 
@@ -152,6 +164,7 @@ public class DistributedHerder implements Herder, Runnable {
             log.info("Herder stopped");
         } catch (Throwable t) {
             log.error("Uncaught exception in herder work thread, exiting: ", t);
+            stopLatch.countDown();
             System.exit(1);
         } finally {
             stopLatch.countDown();
@@ -177,14 +190,27 @@ public class DistributedHerder implements Herder, Runnable {
         }
 
         // Process any external requests
-        while (!requests.isEmpty()) {
-            HerderRequest request = requests.poll();
-            Callback<Void> cb = request.callback();
+        final long now = time.milliseconds();
+        long nextRequestTimeoutMs = Long.MAX_VALUE;
+        while (true) {
+            final HerderRequest next;
+            synchronized (this) {
+                next = requests.peek();
+                if (next == null) {
+                    break;
+                } else if (now >= next.at) {
+                    requests.poll();
+                } else {
+                    nextRequestTimeoutMs = next.at - now;
+                    break;
+                }
+            }
+
             try {
-                request.action().call();
-                cb.onCompletion(null, null);
+                next.action().call();
+                next.callback().onCompletion(null, null);
             } catch (Throwable t) {
-                cb.onCompletion(t, null);
+                next.callback().onCompletion(t, null);
             }
         }
 
@@ -237,7 +263,7 @@ public class DistributedHerder implements Herder, Runnable {
 
         // Let the group take any actions it needs to
         try {
-            member.poll(Long.MAX_VALUE);
+            member.poll(nextRequestTimeoutMs);
             // Ensure we're in a good state in our group. If not restart and everything should
be setup to rejoin
             if (!handleRebalanceCompleted()) return;
         } catch (WakeupException e) { // FIXME should not be WakeupException
@@ -292,13 +318,24 @@ public class DistributedHerder implements Herder, Runnable {
                 // ignore, should not happen
             }
         }
+
+
+        forwardRequestExecutor.shutdown();
+        try {
+            if (!forwardRequestExecutor.awaitTermination(10000, TimeUnit.MILLISECONDS))
+                forwardRequestExecutor.shutdownNow();
+        } catch (InterruptedException e) {
+            // ignore
+        }
+
+        log.info("Herder stopped");
     }
 
     @Override
     public synchronized void connectors(final Callback<Collection<String>> callback)
{
         log.trace("Submitting connector listing request");
 
-        requests.add(new HerderRequest(
+        addRequest(
                 new Callable<Void>() {
                     @Override
                     public Void call() throws Exception {
@@ -308,16 +345,16 @@ public class DistributedHerder implements Herder, Runnable {
                         callback.onCompletion(null, configState.connectors());
                         return null;
                     }
-                }
-        ));
-        member.wakeup();
+                },
+                forwardErrorCallback(callback)
+        );
     }
 
     @Override
     public synchronized void connectorInfo(final String connName, final Callback<ConnectorInfo>
callback) {
         log.trace("Submitting connector info request {}", connName);
 
-        requests.add(new HerderRequest(
+        addRequest(
                 new Callable<Void>() {
                     @Override
                     public Void call() throws Exception {
@@ -331,9 +368,9 @@ public class DistributedHerder implements Herder, Runnable {
                         }
                         return null;
                     }
-                }
-        ));
-        member.wakeup();
+                },
+                forwardErrorCallback(callback)
+        );
     }
 
     @Override
@@ -366,7 +403,7 @@ public class DistributedHerder implements Herder, Runnable {
 
         log.trace("Submitting connector config write request {}", connName);
 
-        requests.add(new HerderRequest(
+        addRequest(
                 new Callable<Void>() {
                     @Override
                     public Void call() throws Exception {
@@ -399,31 +436,38 @@ public class DistributedHerder implements Herder, Runnable {
 
                         return null;
                     }
-                }));
-        member.wakeup();
+                },
+                forwardErrorCallback(callback)
+        );
     }
 
     @Override
     public synchronized void requestTaskReconfiguration(final String connName) {
         log.trace("Submitting connector task reconfiguration request {}", connName);
 
-        requests.add(new HerderRequest(
+        addRequest(
                 new Callable<Void>() {
                     @Override
                     public Void call() throws Exception {
-                        reconfigureConnector(connName);
+                        reconfigureConnectorTasksWithRetry(connName);
                         return null;
                     }
+                },
+                new Callback<Void>() {
+                    @Override
+                    public void onCompletion(Throwable error, Void result) {
+                        log.error("Unexpected error during task reconfiguration: ", error);
+                        log.error("Task reconfiguration for {} failed unexpectedly, this
connector will not be properly reconfigured unless manually triggered.", connName);
+                    }
                 }
-        ));
-        member.wakeup();
+        );
     }
 
     @Override
     public synchronized void taskConfigs(final String connName, final Callback<List<TaskInfo>>
callback) {
         log.trace("Submitting get task configuration request {}", connName);
 
-        requests.add(new HerderRequest(
+        addRequest(
                 new Callable<Void>() {
                     @Override
                     public Void call() throws Exception {
@@ -442,16 +486,16 @@ public class DistributedHerder implements Herder, Runnable {
                         }
                         return null;
                     }
-                }
-        ));
-        member.wakeup();
+                },
+                forwardErrorCallback(callback)
+        );
     }
 
     @Override
     public synchronized void putTaskConfigs(final String connName, final List<Map<String,
String>> configs, final Callback<Void> callback) {
         log.trace("Submitting put task configuration request {}", connName);
 
-        requests.add(new HerderRequest(
+        addRequest(
                 new Callable<Void>() {
                     @Override
                     public Void call() throws Exception {
@@ -465,9 +509,9 @@ public class DistributedHerder implements Herder, Runnable {
                         }
                         return null;
                     }
-                }
-        ));
-        member.wakeup();
+                },
+                forwardErrorCallback(callback)
+        );
     }
 
 
@@ -626,48 +670,92 @@ public class DistributedHerder implements Herder, Runnable {
         // Immediately request configuration since this could be a brand new connector. However,
also only update those
         // task configs if they are actually different from the existing ones to avoid unnecessary
updates when this is
         // just restoring an existing connector.
-        reconfigureConnector(connName);
+        reconfigureConnectorTasksWithRetry(connName);
+    }
+
+    private void reconfigureConnectorTasksWithRetry(final String connName) {
+        reconfigureConnector(connName, new Callback<Void>() {
+            @Override
+            public void onCompletion(Throwable error, Void result) {
+                // If we encountered an error, we don't have much choice but to just retry.
If we don't, we could get
+                // stuck with a connector that thinks it has generated tasks, but wasn't
actually successful and therefore
+                // never makes progress. The retry has to run through a HerderRequest since
this callback could be happening
+                // from the HTTP request forwarding thread.
+                if (error != null) {
+                    log.error("Failed to reconfigure connector's tasks, retrying after backoff:",
error);
+                    addRequest(RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS,
+                            new Callable<Void>() {
+                                @Override
+                                public Void call() throws Exception {
+                                    reconfigureConnectorTasksWithRetry(connName);
+                                    return null;
+                                }
+                            }, new Callback<Void>() {
+                                @Override
+                                public void onCompletion(Throwable error, Void result) {
+                                    log.error("Unexpected error during connector task reconfiguration:
", error);
+                                    log.error("Task reconfiguration for {} failed unexpectedly,
this connector will not be properly reconfigured unless manually triggered.", connName);
+                                }
+                            }
+                    );
+                }
+            }
+        });
     }
 
     // Updates configurations for a connector by requesting them from the connector, filling
in parameters provided
     // by the system, then checks whether any configs have actually changed before submitting
the new configs to storage
-    private void reconfigureConnector(String connName) {
-        Map<String, String> configs = configState.connectorConfig(connName);
-        ConnectorConfig connConfig = new ConnectorConfig(configs);
-
-        List<String> sinkTopics = null;
-        if (SinkConnector.class.isAssignableFrom(connConfig.getClass(ConnectorConfig.CONNECTOR_CLASS_CONFIG)))
-            sinkTopics = connConfig.getList(ConnectorConfig.TOPICS_CONFIG);
-
-        List<Map<String, String>> taskProps
-                = worker.connectorTaskConfigs(connName, connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG),
sinkTopics);
-        boolean changed = false;
-        int currentNumTasks = configState.taskCount(connName);
-        if (taskProps.size() != currentNumTasks) {
-            log.debug("Change in connector task count from {} to {}, writing updated task
configurations", currentNumTasks, taskProps.size());
-            changed = true;
-        } else {
-            int index = 0;
-            for (Map<String, String> taskConfig : taskProps) {
-                if (!taskConfig.equals(configState.taskConfig(new ConnectorTaskId(connName,
index)))) {
-                    log.debug("Change in task configurations, writing updated task configurations");
-                    changed = true;
-                    break;
+    private void reconfigureConnector(final String connName, final Callback<Void> cb)
{
+        try {
+            Map<String, String> configs = configState.connectorConfig(connName);
+            ConnectorConfig connConfig = new ConnectorConfig(configs);
+
+            List<String> sinkTopics = null;
+            if (SinkConnector.class.isAssignableFrom(connConfig.getClass(ConnectorConfig.CONNECTOR_CLASS_CONFIG)))
+                sinkTopics = connConfig.getList(ConnectorConfig.TOPICS_CONFIG);
+
+            final List<Map<String, String>> taskProps
+                    = worker.connectorTaskConfigs(connName, connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG),
sinkTopics);
+            boolean changed = false;
+            int currentNumTasks = configState.taskCount(connName);
+            if (taskProps.size() != currentNumTasks) {
+                log.debug("Change in connector task count from {} to {}, writing updated
task configurations", currentNumTasks, taskProps.size());
+                changed = true;
+            } else {
+                int index = 0;
+                for (Map<String, String> taskConfig : taskProps) {
+                    if (!taskConfig.equals(configState.taskConfig(new ConnectorTaskId(connName,
index)))) {
+                        log.debug("Change in task configurations, writing updated task configurations");
+                        changed = true;
+                        break;
+                    }
+                    index++;
                 }
-                index++;
             }
-        }
-        if (changed) {
-            if (isLeader()) {
-                configStorage.putTaskConfigs(taskConfigListAsMap(connName, taskProps));
-            } else {
-                try {
-                    String reconfigUrl = RestServer.urlJoin(leaderUrl(), "/connectors/" +
connName + "/tasks");
-                    RestServer.httpRequest(reconfigUrl, "POST", taskProps, null);
-                } catch (CopycatException e) {
-                    log.error("Request to leader to reconfigure connector tasks failed",
e);
+            if (changed) {
+                if (isLeader()) {
+                    configStorage.putTaskConfigs(taskConfigListAsMap(connName, taskProps));
+                    cb.onCompletion(null, null);
+                } else {
+                    // We cannot forward the request on the same thread because this reconfiguration
can happen in as a
+                    // result of . If we blocked
+                    forwardRequestExecutor.submit(new Runnable() {
+                        @Override
+                        public void run() {
+                            try {
+                                String reconfigUrl = RestServer.urlJoin(leaderUrl(), "/connectors/"
+ connName + "/tasks");
+                                RestServer.httpRequest(reconfigUrl, "POST", taskProps, null);
+                                cb.onCompletion(null, null);
+                            } catch (CopycatException e) {
+                                log.error("Request to leader to reconfigure connector tasks
failed", e);
+                                cb.onCompletion(e, null);
+                            }
+                        }
+                    });
                 }
             }
+        } catch (Throwable t) {
+            cb.onCompletion(t, null);
         }
     }
 
@@ -684,21 +772,28 @@ public class DistributedHerder implements Herder, Runnable {
         return true;
     }
 
+    private void addRequest(Callable<Void> action, Callback<Void> callback) {
+        addRequest(0, action, callback);
+    }
+
+    private void addRequest(long delayMs, Callable<Void> action, Callback<Void>
callback) {
+        HerderRequest req = new HerderRequest(time.milliseconds() + delayMs, action, callback);
+        requests.add(req);
+        if (requests.peek() == req)
+            member.wakeup();
+    }
 
-    private class HerderRequest {
+    private class HerderRequest implements Comparable<HerderRequest> {
+        private final long at;
         private final Callable<Void> action;
         private final Callback<Void> callback;
 
-        public HerderRequest(Callable<Void> action, Callback<Void> callback)
{
+        public HerderRequest(long at, Callable<Void> action, Callback<Void> callback)
{
+            this.at = at;
             this.action = action;
             this.callback = callback;
         }
 
-        public HerderRequest(Callable<Void> action) {
-            this.action = action;
-            this.callback = DEFAULT_CALLBACK;
-        }
-
         public Callable<Void> action() {
             return action;
         }
@@ -706,14 +801,21 @@ public class DistributedHerder implements Herder, Runnable {
         public Callback<Void> callback() {
             return callback;
         }
-    }
 
-    private static final Callback<Void> DEFAULT_CALLBACK = new Callback<Void>()
{
         @Override
-        public void onCompletion(Throwable error, Void result) {
-            if (error != null)
-                log.error("HerderRequest's action threw an exception: ", error);
+        public int compareTo(HerderRequest o) {
+            return Long.compare(at, o.at);
         }
+    }
+
+    private static final Callback<Void> forwardErrorCallback(final Callback<?>
callback) {
+        return new Callback<Void>() {
+            @Override
+            public void onCompletion(Throwable error, Void result) {
+                if (error != null)
+                    callback.onCompletion(error, null);
+            }
+        };
     };
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b30d68a4/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
index 8f28f5f..7873447 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.copycat.runtime.distributed;
 
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.copycat.connector.ConnectorContext;
 import org.apache.kafka.copycat.errors.AlreadyExistsException;
 import org.apache.kafka.copycat.runtime.ConnectorConfig;
@@ -124,6 +125,7 @@ public class DistributedHerderTest {
 
     @Mock private KafkaConfigStorage configStorage;
     @Mock private WorkerGroupMember member;
+    private MockTime time;
     private DistributedHerder herder;
     @Mock private Worker worker;
     @Mock private Callback<Herder.Created<ConnectorInfo>> putConnectorCallback;
@@ -135,9 +137,10 @@ public class DistributedHerderTest {
     @Before
     public void setUp() throws Exception {
         worker = PowerMock.createMock(Worker.class);
+        time = new MockTime();
 
         herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff"},
-                new DistributedConfig(HERDER_CONFIG), worker, configStorage, member, MEMBER_URL);
+                new DistributedConfig(HERDER_CONFIG), worker, configStorage, member, MEMBER_URL,
time);
         connectorConfigCallback = Whitebox.invokeMethod(herder, "connectorConfigCallback");
         taskConfigCallback = Whitebox.invokeMethod(herder, "taskConfigCallback");
         rebalanceListener = Whitebox.invokeMethod(herder, "rebalanceListener");


Mime
View raw message