kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: MINOR: ignore wakeups when committing offsets on consumer close
Date Wed, 14 Oct 2015 01:51:22 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk d8c575079 -> 27c099b04


MINOR: ignore wakeups when committing offsets on consumer close

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Gwen Shapira

Closes #306 from hachikuji/handle-wakeup-in-consumer-close


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/27c099b0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/27c099b0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/27c099b0

Branch: refs/heads/trunk
Commit: 27c099b043704d16a22c33e0602f0ae7295cab25
Parents: d8c5750
Author: Jason Gustafson <jason@confluent.io>
Authored: Tue Oct 13 18:50:32 2015 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Tue Oct 13 18:50:32 2015 -0700

----------------------------------------------------------------------
 .../clients/consumer/internals/Coordinator.java | 14 ++++++++-
 .../integration/kafka/api/ConsumerTest.scala    | 30 ++++++++++++++++++++
 2 files changed, 43 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/27c099b0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
index 701a81b..98193e8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
@@ -14,6 +14,7 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerWakeupException;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.common.KafkaException;
@@ -235,7 +236,15 @@ public final class Coordinator implements Closeable {
     @Override
     public void close() {
         // commit offsets prior to closing if auto-commit enabled
-        maybeAutoCommitOffsetsSync();
+        while (true) {
+            try {
+                maybeAutoCommitOffsetsSync();
+                return;
+            } catch (ConsumerWakeupException e) {
+                // ignore wakeups while closing to ensure we have a chance to commit
+                continue;
+            }
+        }
     }
 
     private class HeartbeatTask implements DelayedTask {
@@ -430,6 +439,9 @@ public final class Coordinator implements Closeable {
         if (autoCommitEnabled) {
             try {
                 commitOffsetsSync(subscriptions.allConsumed());
+            } catch (ConsumerWakeupException e) {
+                // rethrow wakeups since they are triggered by the user
+                throw e;
             } catch (Exception e) {
                 // consistent with async auto-commit failures, we do not propagate the exception
                 log.error("Auto offset commit failed.", e);

http://git-wip-us.apache.org/repos/asf/kafka/blob/27c099b0/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
index 166b914..0a02b03 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
@@ -116,6 +116,36 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
   }
 
   @Test
+  def testAutoCommitOnCloseAfterWakeup() {
+    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
+    val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new
ByteArrayDeserializer())
+
+    val numRecords = 10000
+    sendRecords(numRecords)
+
+    consumer0.subscribe(List(topic))
+
+    val assignment = Set(tp, tp2)
+    TestUtils.waitUntilTrue(() => {
+      consumer0.poll(50)
+      consumer0.assignment() == assignment.asJava
+    }, s"Expected partitions ${assignment.asJava} but actually got ${consumer0.assignment()}")
+
+    // should auto-commit seeked positions before closing
+    consumer0.seek(tp, 300)
+    consumer0.seek(tp2, 500)
+
+    // wakeup the consumer before closing to simulate trying to break a poll
+    // loop from another thread
+    consumer0.wakeup()
+    consumer0.close()
+
+    // now we should see the committed positions from another consumer
+    assertEquals(300, this.consumers(0).committed(tp).offset)
+    assertEquals(500, this.consumers(0).committed(tp2).offset)
+  }
+
+  @Test
   def testAutoCommitOnRebalance() {
     val topic2 = "topic2"
     TestUtils.createTopic(this.zkClient, topic2, 2, serverCount, this.servers)


Mime
View raw message