kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2342; KafkaConsumer rebalance with in-flight fetch can cause invalid position
Date Wed, 22 Jul 2015 19:59:59 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 204089046 -> fd3b4cc41


KAFKA-2342; KafkaConsumer rebalance with in-flight fetch can cause invalid position

Author: Jason Gustafson <jason@confluent.io>

Closes #88 from hachikuji/KAFKA-2342 and squashes the following commits:

cabb017 [Jason Gustafson] KAFKA-2342; KafkaConsumer rebalance with in-flight fetch can cause
invalid position


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fd3b4cc4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fd3b4cc4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fd3b4cc4

Branch: refs/heads/trunk
Commit: fd3b4cc41e9249ec6848cde8137691d32b2e79e5
Parents: 2040890
Author: Jason Gustafson <jason@confluent.io>
Authored: Wed Jul 22 13:00:03 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Jul 22 13:00:03 2015 -0700

----------------------------------------------------------------------
 .../clients/consumer/internals/Fetcher.java     | 16 ++++++++++++++-
 .../clients/consumer/internals/FetcherTest.java | 21 ++++++++++++++++++--
 .../kafka/api/ConsumerBounceTest.scala          |  2 +-
 3 files changed, 35 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fd3b4cc4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index d595c1c..d2a0e2b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -219,7 +219,7 @@ public class Fetcher<K, V> {
             for (PartitionRecords<K, V> part : this.records) {
                 Long consumed = subscriptions.consumed(part.partition);
                 if (this.subscriptions.assignedPartitions().contains(part.partition)
-                    && (consumed == null || part.fetchOffset == consumed)) {
+                    && consumed != null && part.fetchOffset == consumed)
{
                     List<ConsumerRecord<K, V>> records = drained.get(part.partition);
                     if (records == null) {
                         records = part.records;
@@ -364,6 +364,20 @@ public class Fetcher<K, V> {
                         parsed.add(parseRecord(tp, logEntry));
                         bytes += logEntry.size();
                     }
+
+                    // we are interested in this fetch only if the beginning offset matches
the
+                    // current consumed position
+                    Long consumed = subscriptions.consumed(tp);
+                    if (consumed == null) {
+                        continue;
+                    } else if (consumed != fetchOffset) {
+                        // the fetched position has gotten out of sync with the consumed
position
+                        // (which might happen when a rebalance occurs with a fetch in-flight),
+                        // so we need to reset the fetch position so the next fetch is right
+                        subscriptions.fetched(tp, consumed);
+                        continue;
+                    }
+
                     if (parsed.size() > 0) {
                         ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
                         this.subscriptions.fetched(tp, record.offset() + 1);

http://git-wip-us.apache.org/repos/asf/kafka/blob/fd3b4cc4/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 7a4e586..4002679 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -36,6 +36,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -115,8 +116,25 @@ public class FetcherTest {
     }
 
     @Test
+    public void testFetchDuringRebalance() {
+        subscriptions.subscribe(topicName);
+        subscriptions.changePartitionAssignment(Arrays.asList(tp));
+        subscriptions.fetched(tp, 0);
+        subscriptions.consumed(tp, 0);
+
+        fetcher.initFetches(cluster);
+
+        // Now the rebalance happens and fetch positions are cleared
+        subscriptions.changePartitionAssignment(Arrays.asList(tp));
+        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L));
+        consumerClient.poll(0);
+
+        // The active fetch should be ignored since its position is no longer valid
+        assertTrue(fetcher.fetchedRecords().isEmpty());
+    }
+
+    @Test
     public void testFetchFailed() {
-        List<ConsumerRecord<byte[], byte[]>> records;
         subscriptions.subscribe(tp);
         subscriptions.fetched(tp, 0);
         subscriptions.consumed(tp, 0);
@@ -148,7 +166,6 @@ public class FetcherTest {
 
     @Test
     public void testFetchOutOfRange() {
-        List<ConsumerRecord<byte[], byte[]>> records;
         subscriptions.subscribe(tp);
         subscriptions.fetched(tp, 5);
         subscriptions.consumed(tp, 5);

http://git-wip-us.apache.org/repos/asf/kafka/blob/fd3b4cc4/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index b0750fa..d8eee52 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -58,7 +58,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
     TestUtils.createTopic(this.zkClient, topic, 1, serverCount, this.servers)
   }
 
-  def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(20)
+  def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(10)
 
   /*
    * 1. Produce a bunch of messages


Mime
View raw message