kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2942: inadvertent auto-commit when pre-fetching can cause message loss
Date Thu, 03 Dec 2015 19:01:25 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.9.0 2b4c5dc29 -> 7b46a99d6


KAFKA-2942: inadvertent auto-commit when pre-fetching can cause message loss

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Guozhang Wang

Closes #623 from hachikuji/KAFKA-2942

(cherry picked from commit 13e483adeee8d968397a21bde3bb159516f26ff0)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7b46a99d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7b46a99d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7b46a99d

Branch: refs/heads/0.9.0
Commit: 7b46a99d682e7d79e4114931a85e6daca2bcc954
Parents: 2b4c5dc
Author: Jason Gustafson <jason@confluent.io>
Authored: Thu Dec 3 11:01:08 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Dec 3 11:01:20 2015 -0800

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java    | 11 +++++------
 .../internals/ConsumerNetworkClient.java         | 19 +++++++++++++++----
 2 files changed, 20 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7b46a99d/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index b67705a..f7397ec 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -831,13 +831,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
                     // and avoid block waiting for their responses to enable pipelining while
the user
                     // is handling the fetched records.
                     //
-                    // NOTE that in this case we need to disable wakeups for the non-blocking
poll since
-                    // the consumed positions has already been updated and hence we must
return these
-                    // records to users to process before being interrupted
+                    // NOTE that we use quickPoll() in this case which disables wakeups and
delayed
+                    // task execution since the consumed positions has already been updated
and we
+                    // must return these records to users to process before being interrupted
or
+                    // auto-committing offsets
                     fetcher.initFetches(metadata.fetch());
-                    client.disableWakeups();
-                    client.poll(0);
-                    client.enableWakeups();
+                    client.quickPoll();
                     return new ConsumerRecords<>(records);
                 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7b46a99d/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 84c312e..f707d6f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -175,7 +175,7 @@ public class ConsumerNetworkClient implements Closeable {
         long remaining = timeout;
         long now = begin;
         do {
-            poll(remaining, now);
+            poll(remaining, now, true);
             now = time.milliseconds();
             long elapsed = now - begin;
             remaining = timeout - elapsed;
@@ -190,10 +190,20 @@ public class ConsumerNetworkClient implements Closeable {
      * @throws WakeupException if {@link #wakeup()} is called from another thread
      */
     public void poll(long timeout) {
-        poll(timeout, time.milliseconds());
+        poll(timeout, time.milliseconds(), true);
     }
 
-    private void poll(long timeout, long now) {
+    /**
+     * Poll for network IO and return immediately. This will not trigger wakeups,
+     * nor will it execute any delayed tasks.
+     */
+    public void quickPoll() {
+        disableWakeups();
+        poll(0, time.milliseconds(), false);
+        enableWakeups();
+    }
+
+    private void poll(long timeout, long now, boolean executeDelayedTasks) {
         // send all the requests we can send now
         trySend(now);
 
@@ -209,7 +219,8 @@ public class ConsumerNetworkClient implements Closeable {
         checkDisconnects(now);
 
         // execute scheduled tasks
-        delayedTasks.poll(now);
+        if (executeDelayedTasks)
+            delayedTasks.poll(now);
 
         // try again to send requests since buffer space may have been
         // cleared or a connect finished in the poll


Mime
View raw message