kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2570: commit offsets on rebalance/close when auto-commit is enabled
Date Wed, 30 Sep 2015 00:45:15 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 6b1a92c89 -> 5e769ccd5


KAFKA-2570: commit offsets on rebalance/close when auto-commit is enabled

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Guozhang Wang

Closes #257 from hachikuji/KAFKA-2570


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5e769ccd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5e769ccd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5e769ccd

Branch: refs/heads/trunk
Commit: 5e769ccd5bf81f82c90069d4b0e79182a1cfe3c0
Parents: 6b1a92c
Author: Jason Gustafson <jason@confluent.io>
Authored: Tue Sep 29 17:49:06 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Sep 29 17:49:06 2015 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   | 29 ++-------
 .../clients/consumer/internals/Coordinator.java | 50 +++++++++++++++-
 .../consumer/internals/CoordinatorTest.java     |  6 +-
 .../kafka/api/ConsumerBounceTest.scala          | 33 ++++++++---
 .../integration/kafka/api/ConsumerTest.scala    | 62 ++++++++++++++++++++
 5 files changed, 145 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5e769ccd/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index c8fe95e..a8d791e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -17,7 +17,6 @@ import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.clients.consumer.internals.Coordinator;
-import org.apache.kafka.clients.consumer.internals.DelayedTask;
 import org.apache.kafka.clients.consumer.internals.Fetcher;
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
@@ -414,8 +413,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
     private final SubscriptionState subscriptions;
     private final Metadata metadata;
     private final long retryBackoffMs;
-    private final boolean autoCommit;
-    private final long autoCommitIntervalMs;
     private boolean closed = false;
     private Metadata.Listener metadataListener;
 
@@ -501,8 +498,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
             log.debug("Starting the Kafka consumer");
             this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
             this.time = new SystemTime();
-            this.autoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
-            this.autoCommitIntervalMs = config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
 
             MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
                     .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
@@ -546,7 +541,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
                     this.time,
                     requestTimeoutMs,
                     retryBackoffMs,
-                    new Coordinator.DefaultOffsetCommitCallback());
+                    new Coordinator.DefaultOffsetCommitCallback(),
+                    config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
+                    config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
             if (keyDeserializer == null) {
                 this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                         Deserializer.class);
@@ -581,9 +578,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
             config.logUnused();
             AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
 
-            if (autoCommit)
-                scheduleAutoCommitTask(autoCommitIntervalMs);
-
             log.debug("Kafka consumer created");
         } catch (Throwable t) {
             // call close methods if internal objects are already constructed
@@ -847,21 +841,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
         return fetcher.fetchedRecords();
     }
 
-    private void scheduleAutoCommitTask(final long interval) {
-        DelayedTask task = new DelayedTask() {
-            public void run(long now) {
-                commitAsync(new OffsetCommitCallback() {
-                    @Override
-                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
Exception exception) {
-                        if (exception != null)
-                            log.error("Auto offset commit failed.", exception);
-                    }
-                });
-                client.schedule(this, now + interval);
-            }
-        };
-        client.schedule(task, time.milliseconds() + interval);
-    }
+
 
     /**
      * Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed
list of topics and partitions.
@@ -1178,6 +1158,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
         log.trace("Closing the Kafka consumer.");
         AtomicReference<Throwable> firstException = new AtomicReference<Throwable>();
         this.closed = true;
+        ClientUtils.closeQuietly(coordinator, "coordinator", firstException);
         ClientUtils.closeQuietly(metrics, "consumer metrics", firstException);
         ClientUtils.closeQuietly(client, "consumer network client", firstException);
         ClientUtils.closeQuietly(keyDeserializer, "consumer key deserializer", firstException);

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e769ccd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
index 542c326..8326549 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
@@ -47,6 +47,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -58,7 +59,7 @@ import java.util.concurrent.TimeUnit;
 /**
  * This class manages the coordination process with the consumer coordinator.
  */
-public final class Coordinator {
+public final class Coordinator implements Closeable {
 
     private static final Logger log = LoggerFactory.getLogger(Coordinator.class);
 
@@ -74,6 +75,8 @@ public final class Coordinator {
     private final long requestTimeoutMs;
     private final long retryBackoffMs;
     private final OffsetCommitCallback defaultOffsetCommitCallback;
+    private final boolean autoCommitEnabled;
+
     private Node consumerCoordinator;
     private String consumerId;
     private int generation;
@@ -93,7 +96,9 @@ public final class Coordinator {
                        Time time,
                        long requestTimeoutMs,
                        long retryBackoffMs,
-                       OffsetCommitCallback defaultOffsetCommitCallback) {
+                       OffsetCommitCallback defaultOffsetCommitCallback,
+                       boolean autoCommitEnabled,
+                       long autoCommitIntervalMs) {
         this.client = client;
         this.time = time;
         this.generation = -1;
@@ -109,6 +114,10 @@ public final class Coordinator {
         this.requestTimeoutMs = requestTimeoutMs;
         this.retryBackoffMs = retryBackoffMs;
         this.defaultOffsetCommitCallback = defaultOffsetCommitCallback;
+        this.autoCommitEnabled = autoCommitEnabled;
+
+        if (autoCommitEnabled)
+            scheduleAutoCommitTask(autoCommitIntervalMs);
     }
 
     /**
@@ -157,6 +166,9 @@ public final class Coordinator {
         if (!subscriptions.partitionAssignmentNeeded())
             return;
 
+        // commit offsets prior to rebalance if auto-commit enabled
+        maybeAutoCommitOffsetsSync();
+
         ConsumerRebalanceListener listener = subscriptions.listener();
 
         // execute the user's listener before rebalance
@@ -219,6 +231,13 @@ public final class Coordinator {
         }
     }
 
+
+    @Override
+    public void close() {
+        // commit offsets prior to closing if auto-commit enabled
+        maybeAutoCommitOffsetsSync();
+    }
+
     private class HeartbeatTask implements DelayedTask {
 
         public void reset() {
@@ -391,6 +410,33 @@ public final class Coordinator {
         }
     }
 
+    private void scheduleAutoCommitTask(final long interval) {
+        DelayedTask task = new DelayedTask() {
+            public void run(long now) {
+                commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback()
{
+                    @Override
+                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
Exception exception) {
+                        if (exception != null)
+                            log.error("Auto offset commit failed.", exception);
+                    }
+                });
+                client.schedule(this, now + interval);
+            }
+        };
+        client.schedule(task, time.milliseconds() + interval);
+    }
+
+    private void maybeAutoCommitOffsetsSync() {
+        if (autoCommitEnabled) {
+            try {
+                commitOffsetsSync(subscriptions.allConsumed());
+            } catch (Exception e) {
+                // consistent with async auto-commit failures, we do not propagate the exception
+                log.error("Auto offset commit failed.", e);
+            }
+        }
+    }
+
     /**
      * Commit offsets for the specified list of topics and partitions. This is a non-blocking
call
      * which returns a request future that can be polled in the case of a synchronous commit
or ignored in the

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e769ccd/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
index 82586ac..12aee11 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
@@ -65,6 +65,8 @@ public class CoordinatorTest {
     private int heartbeatIntervalMs = 2;
     private long retryBackoffMs = 100;
     private long requestTimeoutMs = 5000;
+    private boolean autoCommitEnabled = false;
+    private long autoCommitIntervalMs = 5000;
     private String rebalanceStrategy = "not-matter";
     private MockTime time;
     private MockClient client;
@@ -104,7 +106,9 @@ public class CoordinatorTest {
                 time,
                 requestTimeoutMs,
                 retryBackoffMs,
-                defaultOffsetCommitCallback);
+                defaultOffsetCommitCallback,
+                autoCommitEnabled,
+                autoCommitIntervalMs);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e769ccd/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 3f17e79..16d7c26 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -13,10 +13,13 @@
 
 package kafka.api
 
+import java.util
+
 import kafka.server.KafkaConfig
 import kafka.utils.{Logging, ShutdownableThread, TestUtils}
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
+import org.apache.kafka.common.errors.{IllegalGenerationException, UnknownConsumerIdException}
 import org.apache.kafka.common.TopicPartition
 import org.junit.Assert._
 import org.junit.{Test, Before}
@@ -73,25 +76,39 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
     sendRecords(numRecords)
     this.producers.foreach(_.close)
 
-    var consumed = 0
+    var consumed = 0L
     val consumer = this.consumers(0)
-    consumer.subscribe(List(topic))
+
+    consumer.subscribe(List(topic), new ConsumerRebalanceListener {
+      override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]) {
+        // TODO: until KAFKA-2017 is merged, we have to handle the case in which the
+        // the commit fails on prior to rebalancing on coordinator fail-over.
+        consumer.seek(tp, consumed)
+      }
+      override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]) {}
+    })
 
     val scheduler = new BounceBrokerScheduler(numIters)
     scheduler.start()
 
     while (scheduler.isRunning.get()) {
       for (record <- consumer.poll(100)) {
-        assertEquals(consumed.toLong, record.offset())
+        assertEquals(consumed, record.offset())
         consumed += 1
       }
 
-      consumer.commitSync()
-      assertEquals(consumer.position(tp), consumer.committed(tp).offset)
+      try {
+        consumer.commitSync()
+        assertEquals(consumer.position(tp), consumer.committed(tp).offset)
 
-      if (consumer.position(tp) == numRecords) {
-        consumer.seekToBeginning()
-        consumed = 0
+        if (consumer.position(tp) == numRecords) {
+          consumer.seekToBeginning()
+          consumed = 0
+        }
+      } catch {
+        // TODO: should be no need to catch these exceptions once KAFKA-2017 is
+        // merged since coordinator fail-over will not cause a rebalance
+        case _: UnknownConsumerIdException | _: IllegalGenerationException =>
       }
     }
     scheduler.shutdown()

http://git-wip-us.apache.org/repos/asf/kafka/blob/5e769ccd/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
index af81a83..166b914 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
@@ -90,6 +90,68 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
   }
 
   @Test
+  def testAutoCommitOnClose() {
+    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+    val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new
ByteArrayDeserializer())
+
+    val numRecords = 10000
+    sendRecords(numRecords)
+
+    consumer0.subscribe(List(topic))
+
+    val assignment = Set(tp, tp2)
+    TestUtils.waitUntilTrue(() => {
+      consumer0.poll(50)
+      consumer0.assignment() == assignment.asJava
+    }, s"Expected partitions ${assignment.asJava} but actually got ${consumer0.assignment()}")
+
+    // should auto-commit seeked positions before closing
+    consumer0.seek(tp, 300)
+    consumer0.seek(tp2, 500)
+    consumer0.close()
+
+    // now we should see the committed positions from another consumer
+    assertEquals(300, this.consumers(0).committed(tp).offset)
+    assertEquals(500, this.consumers(0).committed(tp2).offset)
+  }
+
+  @Test
+  def testAutoCommitOnRebalance() {
+    val topic2 = "topic2"
+    TestUtils.createTopic(this.zkClient, topic2, 2, serverCount, this.servers)
+
+    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+    val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new
ByteArrayDeserializer())
+
+    val numRecords = 10000
+    sendRecords(numRecords)
+
+    consumer0.subscribe(List(topic))
+
+    val assignment = Set(tp, tp2)
+    TestUtils.waitUntilTrue(() => {
+      consumer0.poll(50)
+      consumer0.assignment() == assignment.asJava
+    }, s"Expected partitions ${assignment.asJava} but actually got ${consumer0.assignment()}")
+
+    consumer0.seek(tp, 300)
+    consumer0.seek(tp2, 500)
+
+    // change subscription to trigger rebalance
+    consumer0.subscribe(List(topic, topic2))
+
+    val newAssignment = Set(tp, tp2, new TopicPartition(topic2, 0), new TopicPartition(topic2,
1))
+    TestUtils.waitUntilTrue(() => {
+      consumer0.poll(50)
+      consumer0.assignment() == newAssignment.asJava
+    }, s"Expected partitions ${newAssignment.asJava} but actually got ${consumer0.assignment()}")
+
+    // after rebalancing, we should have reset to the committed positions
+    assertEquals(300, consumer0.committed(tp).offset)
+    assertEquals(500, consumer0.committed(tp2).offset)
+  }
+
+  @Test
   def testPatternSubscription() {
     val numRecords = 10000
     sendRecords(numRecords)


Mime
View raw message