kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-2860: better handling of auto commit errors
Date Fri, 20 Nov 2015 16:27:04 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.9.0 26f797931 -> 07e214130


KAFKA-2860: better handling of auto commit errors

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Guozhang Wang

Closes #553 from hachikuji/KAFKA-2860


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/07e21413
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/07e21413
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/07e21413

Branch: refs/heads/0.9.0
Commit: 07e214130fe7dc74cf2a5628232b8c3c3ef000bc
Parents: 26f7979
Author: Jason Gustafson <jason@confluent.io>
Authored: Wed Nov 18 17:19:47 2015 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Fri Nov 20 08:26:29 2015 -0800

----------------------------------------------------------------------
 .../consumer/internals/AbstractCoordinator.java | 10 ++-
 .../consumer/internals/ConsumerCoordinator.java | 91 +++++++++++++++-----
 2 files changed, 79 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/07e21413/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index a12c6c1..ddaa728 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -242,12 +242,16 @@ public abstract class AbstractCoordinator implements Closeable {
 
     private class HeartbeatTask implements DelayedTask {
 
+        private boolean requestInFlight = false;
+
         public void reset() {
             // start or restart the heartbeat task to be executed at the next chance
             long now = time.milliseconds();
             heartbeat.resetSessionTimeout(now);
             client.unschedule(this);
-            client.schedule(this, now);
+
+            if (!requestInFlight)
+                client.schedule(this, now);
         }
 
         @Override
@@ -270,10 +274,13 @@ public abstract class AbstractCoordinator implements Closeable {
                 client.schedule(this, now + heartbeat.timeToNextHeartbeat(now));
             } else {
                 heartbeat.sentHeartbeat(now);
+                requestInFlight = true;
+
                 RequestFuture<Void> future = sendHeartbeatRequest();
                 future.addListener(new RequestFutureListener<Void>() {
                     @Override
                     public void onSuccess(Void value) {
+                        requestInFlight = false;
                         long now = time.milliseconds();
                         heartbeat.receiveHeartbeat(now);
                         long nextHeartbeatTime = now + heartbeat.timeToNextHeartbeat(now);
@@ -282,6 +289,7 @@ public abstract class AbstractCoordinator implements Closeable {
 
                     @Override
                     public void onFailure(RuntimeException e) {
+                        requestInFlight = false;
                         client.schedule(HeartbeatTask.this, time.milliseconds() + retryBackoffMs);
                     }
                 });

http://git-wip-us.apache.org/repos/asf/kafka/blob/07e21413/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index ca08df0..93be7a8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -69,7 +69,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     private final SubscriptionState subscriptions;
     private final OffsetCommitCallback defaultOffsetCommitCallback;
     private final boolean autoCommitEnabled;
-    private DelayedTask autoCommitTask = null;
+    private final AutoCommitTask autoCommitTask;
 
     /**
      * Initialize the coordination manager.
@@ -112,9 +112,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
         addMetadataListener();
 
-        if (autoCommitEnabled)
-            this.autoCommitTask = scheduleAutoCommitTask(autoCommitIntervalMs);
-
+        this.autoCommitTask = autoCommitEnabled ? new AutoCommitTask(autoCommitIntervalMs)
: null;
         this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
     }
 
@@ -179,6 +177,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         // give the assignor a chance to update internal state based on the received assignment
         assignor.onAssignment(assignment);
 
+        // restart the autocommit task if needed
+        if (autoCommitEnabled)
+            autoCommitTask.enable();
+
         // execute the user's callback after rebalance
         ConsumerRebalanceListener listener = subscriptions.listener();
         log.debug("Setting newly assigned partitions {}", subscriptions.assignedPartitions());
@@ -308,8 +310,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     public void close() {
         client.disableWakeups();
         try {
-            if (autoCommitTask != null)
-                client.unschedule(autoCommitTask);
             maybeAutoCommitOffsetsSync();
         } finally {
             super.close();
@@ -361,25 +361,74 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         }
     }
 
-    private DelayedTask scheduleAutoCommitTask(final long interval) {
-        DelayedTask task = new DelayedTask() {
-            public void run(long now) {
-                commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback()
{
-                    @Override
-                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
Exception exception) {
-                        if (exception != null)
-                            log.error("Auto offset commit failed.", exception);
-                    }
-                });
-                client.schedule(this, now + interval);
+    private class AutoCommitTask implements DelayedTask {
+        private final long interval;
+        private boolean enabled = false;
+        private boolean requestInFlight = false;
+
+        public AutoCommitTask(long interval) {
+            this.interval = interval;
+        }
+
+        public void enable() {
+            if (!enabled) {
+                // there shouldn't be any instances scheduled, but call unschedule anyway
to ensure
+                // that this task is only ever scheduled once
+                client.unschedule(this);
+                this.enabled = true;
+
+                if (!requestInFlight) {
+                    long now = time.milliseconds();
+                    client.schedule(this, interval + now);
+                }
+            }
+        }
+
+        public void disable() {
+            this.enabled = false;
+            client.unschedule(this);
+        }
+
+        private void reschedule(long at) {
+            if (enabled)
+                client.schedule(this, at);
+        }
+
+        public void run(final long now) {
+            if (!enabled)
+                return;
+
+            if (coordinatorUnknown()) {
+                log.debug("Cannot auto-commit offsets now since the coordinator is unknown,
will retry after backoff");
+                client.schedule(this, now + retryBackoffMs);
+                return;
             }
-        };
-        client.schedule(task, time.milliseconds() + interval);
-        return task;
+
+            requestInFlight = true;
+            commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() {
+                @Override
+                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
Exception exception) {
+                    requestInFlight = false;
+                    if (exception == null) {
+                        reschedule(now + interval);
+                    } else if (exception instanceof SendFailedException) {
+                        log.debug("Failed to send automatic offset commit, will retry immediately");
+                        reschedule(now);
+                    } else {
+                        log.warn("Auto offset commit failed: {}", exception.getMessage());
+                        reschedule(now + interval);
+                    }
+                }
+            });
+        }
     }
 
     private void maybeAutoCommitOffsetsSync() {
         if (autoCommitEnabled) {
+            // disable periodic commits prior to committing synchronously. note that they
will
+            // be re-enabled after a rebalance completes
+            autoCommitTask.disable();
+
             try {
                 commitOffsetsSync(subscriptions.allConsumed());
             } catch (WakeupException e) {
@@ -387,7 +436,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 throw e;
             } catch (Exception e) {
                 // consistent with async auto-commit failures, we do not propagate the exception
-                log.error("Auto offset commit failed.", e);
+                log.warn("Auto offset commit failed: ", e.getMessage());
             }
         }
     }


Mime
View raw message