kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2978: consumer stops fetching when consumed and fetch positions get out of sync
Date Mon, 14 Dec 2015 22:54:26 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 3fed57909 -> e08b922aa


KAFKA-2978: consumer stops fetching when consumed and fetch positions get out of sync

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Michal Turek, Ismael Juma, Guozhang Wang

Closes #666 from hachikuji/KAFKA-2978


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e08b922a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e08b922a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e08b922a

Branch: refs/heads/trunk
Commit: e08b922aad485a96fceb51d5c27877729156ddab
Parents: 3fed579
Author: Jason Gustafson <jason@confluent.io>
Authored: Mon Dec 14 14:54:22 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Dec 14 14:54:22 2015 -0800

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   |  4 +-
 .../kafka/clients/consumer/MockConsumer.java    |  6 +--
 .../clients/consumer/internals/Fetcher.java     | 51 +++++++++-----------
 .../consumer/internals/SubscriptionState.java   | 40 +++++----------
 .../clients/consumer/internals/FetcherTest.java | 27 ++++-------
 .../internals/SubscriptionStateTest.java        | 29 +++--------
 6 files changed, 54 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e08b922a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index a7ec02b..2c0db37 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1071,10 +1071,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
         try {
             if (!this.subscriptions.isAssigned(partition))
                 throw new IllegalArgumentException("You can only check the position for partitions
assigned to this consumer.");
-            Long offset = this.subscriptions.consumed(partition);
+            Long offset = this.subscriptions.position(partition);
             if (offset == null) {
                 updateFetchPositions(Collections.singleton(partition));
-                offset = this.subscriptions.consumed(partition);
+                offset = this.subscriptions.position(partition);
             }
             return offset;
         } finally {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e08b922a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 72fbe9e..c7f0a46 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -154,7 +154,7 @@ public class MockConsumer<K, V> implements Consumer<K, V>
{
             if (!subscriptions.isPaused(entry.getKey())) {
                 List<ConsumerRecord<K, V>> recs = entry.getValue();
                 if (!recs.isEmpty())
-                    this.subscriptions.consumed(entry.getKey(), recs.get(recs.size() - 1).offset()
+ 1);
+                    this.subscriptions.position(entry.getKey(), recs.get(recs.size() - 1).offset()
+ 1);
             }
         }
 
@@ -229,10 +229,10 @@ public class MockConsumer<K, V> implements Consumer<K, V>
{
         ensureNotClosed();
         if (!this.subscriptions.isAssigned(partition))
             throw new IllegalArgumentException("You can only check the position for partitions
assigned to this consumer.");
-        Long offset = this.subscriptions.consumed(partition);
+        Long offset = this.subscriptions.position(partition);
         if (offset == null) {
             updateFetchPosition(partition);
-            offset = this.subscriptions.consumed(partition);
+            offset = this.subscriptions.position(partition);
         }
         return offset;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e08b922a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index e152088..c06e899 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -333,9 +333,9 @@ public class Fetcher<K, V> {
                 log.debug("Ignoring fetched records for {} since it is no longer fetchable",
entry.getKey());
                 continue;
             }
-            Long consumed = subscriptions.consumed(entry.getKey());
-            // ignore partition if its consumed offset != offset in fetchResponse, e.g. after
seek()
-            if (consumed != null && entry.getValue().equals(consumed))
+            Long position = subscriptions.position(entry.getKey());
+            // ignore partition if the current position != the offset in fetchResponse, e.g.
after seek()
+            if (position != null && entry.getValue().equals(position))
                 currentOutOfRangePartitions.put(entry.getKey(), entry.getValue());
         }
         this.offsetOutOfRangePartitions.clear();
@@ -401,18 +401,15 @@ public class Fetcher<K, V> {
 
                 // note that the consumed position should always be available
                 // as long as the partition is still assigned
-                long consumed = subscriptions.consumed(part.partition);
+                long position = subscriptions.position(part.partition);
                 if (!subscriptions.isFetchable(part.partition)) {
-                    // this can happen when a partition consumption paused before fetched
records are returned to the consumer's poll call
+                    // this can happen when a partition is paused before fetched records
are returned to the consumer's poll call
                     log.debug("Not returning fetched records for assigned partition {} since
it is no longer fetchable", part.partition);
-
-                    // we also need to reset the fetch positions to pretend we did not fetch
-                    // this partition in the previous request at all
-                    subscriptions.fetched(part.partition, consumed);
-                } else if (part.fetchOffset == consumed) {
+                } else if (part.fetchOffset == position) {
                     long nextOffset = part.records.get(part.records.size() - 1).offset()
+ 1;
 
-                    log.trace("Returning fetched records for assigned partition {} and update
consumed position to {}", part.partition, nextOffset);
+                    log.trace("Returning fetched records at offset {} for assigned partition
{} and update " +
+                            "position to {}", position, part.partition, nextOffset);
 
                     List<ConsumerRecord<K, V>> records = drained.get(part.partition);
                     if (records == null) {
@@ -421,11 +418,13 @@ public class Fetcher<K, V> {
                     } else {
                         records.addAll(part.records);
                     }
-                    subscriptions.consumed(part.partition, nextOffset);
+
+                    subscriptions.position(part.partition, nextOffset);
                 } else {
                     // these records aren't next in line based on the last consumed position,
ignore them
                     // they must be from an obsolete request
-                    log.debug("Ignoring fetched records for {} at offset {}", part.partition,
part.fetchOffset);
+                    log.debug("Ignoring fetched records for {} at offset {} since the current
position is {}",
+                            part.partition, part.fetchOffset, position);
                 }
             }
             this.records.clear();
@@ -513,11 +512,9 @@ public class Fetcher<K, V> {
                     fetchable.put(node, fetch);
                 }
 
-                long fetched = this.subscriptions.fetched(partition);
-                long consumed = this.subscriptions.consumed(partition);
-                // Only fetch data for partitions whose previously fetched data has been
consumed
-                if (consumed == fetched)
-                    fetch.put(partition, new FetchRequest.PartitionData(fetched, this.fetchSize));
+                long position = this.subscriptions.position(partition);
+                fetch.put(partition, new FetchRequest.PartitionData(position, this.fetchSize));
+                log.trace("Added fetch request for partition {} at offset {}", partition,
position);
             }
         }
 
@@ -550,29 +547,25 @@ public class Fetcher<K, V> {
 
                 // we are interested in this fetch only if the beginning offset matches the
                 // current consumed position
-                Long consumed = subscriptions.consumed(tp);
-                if (consumed == null) {
-                    continue;
-                } else if (consumed != fetchOffset) {
-                    // the fetched position has gotten out of sync with the consumed position
-                    // (which might happen when a rebalance occurs with a fetch in-flight),
-                    // so we need to reset the fetch position so the next fetch is right
-                    subscriptions.fetched(tp, consumed);
+                Long position = subscriptions.position(tp);
+                if (position == null || position != fetchOffset) {
+                    log.debug("Discarding fetch response for partition {} since its offset
{} does not match " +
+                            "the expected offset {}", tp, fetchOffset, position);
                     continue;
                 }
 
                 int bytes = 0;
                 ByteBuffer buffer = partition.recordSet;
                 MemoryRecords records = MemoryRecords.readableRecords(buffer);
-                List<ConsumerRecord<K, V>> parsed = new ArrayList<ConsumerRecord<K,
V>>();
+                List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
                 for (LogEntry logEntry : records) {
                     parsed.add(parseRecord(tp, logEntry));
                     bytes += logEntry.size();
                 }
 
                 if (!parsed.isEmpty()) {
+                    log.trace("Adding fetched record for partition {} with offset {} to buffered
record list", tp, position);
                     ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
-                    this.subscriptions.fetched(tp, record.offset() + 1);
                     this.records.add(new PartitionRecords<>(fetchOffset, tp, parsed));
                     this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
                 } else if (buffer.limit() > 0) {
@@ -594,7 +587,7 @@ public class Fetcher<K, V> {
                     subscriptions.needOffsetReset(tp);
                 else
                     this.offsetOutOfRangePartitions.put(tp, fetchOffset);
-                log.info("Fetch offset {} is out of range, resetting offset", subscriptions.fetched(tp));
+                log.info("Fetch offset {} is out of range, resetting offset", fetchOffset);
             } else if (partition.errorCode == Errors.TOPIC_AUTHORIZATION_FAILED.code()) {
                 log.warn("Not authorized to read from topic {}.", tp.topic());
                 unauthorizedTopics.add(tp.topic());

http://git-wip-us.apache.org/repos/asf/kafka/blob/e08b922a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index a142196..9efaf8c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -214,14 +214,6 @@ public class SubscriptionState {
         return this.groupSubscription;
     }
 
-    public Long fetched(TopicPartition tp) {
-        return assignedState(tp).fetched;
-    }
-
-    public void fetched(TopicPartition tp, long offset) {
-        assignedState(tp).fetched(offset);
-    }
-
     private TopicPartitionState assignedState(TopicPartition tp) {
         TopicPartitionState state = this.assignment.get(tp);
         if (state == null)
@@ -270,12 +262,12 @@ public class SubscriptionState {
         return !this.subscription.isEmpty();
     }
 
-    public void consumed(TopicPartition tp, long offset) {
-        assignedState(tp).consumed(offset);
+    public void position(TopicPartition tp, long offset) {
+        assignedState(tp).position(offset);
     }
 
-    public Long consumed(TopicPartition tp) {
-        return assignedState(tp).consumed;
+    public Long position(TopicPartition tp) {
+        return assignedState(tp).position;
     }
 
     public Map<TopicPartition, OffsetAndMetadata> allConsumed() {
@@ -283,7 +275,7 @@ public class SubscriptionState {
         for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignment.entrySet())
{
             TopicPartitionState state = entry.getValue();
             if (state.hasValidPosition)
-                allConsumed.put(entry.getKey(), new OffsetAndMetadata(state.consumed));
+                allConsumed.put(entry.getKey(), new OffsetAndMetadata(state.position));
         }
         return allConsumed;
     }
@@ -356,8 +348,7 @@ public class SubscriptionState {
     }
 
     private static class TopicPartitionState {
-        private Long consumed;   // offset exposed to the user
-        private Long fetched;    // current fetch position
+        private Long position;
         private OffsetAndMetadata committed;  // last committed position
 
         private boolean hasValidPosition; // whether we have valid consumed and fetched positions
@@ -367,8 +358,7 @@ public class SubscriptionState {
 
         public TopicPartitionState() {
             this.paused = false;
-            this.consumed = null;
-            this.fetched = null;
+            this.position = null;
             this.committed = null;
             this.awaitingReset = false;
             this.hasValidPosition = false;
@@ -378,29 +368,21 @@ public class SubscriptionState {
         private void awaitReset(OffsetResetStrategy strategy) {
             this.awaitingReset = true;
             this.resetStrategy = strategy;
-            this.consumed = null;
-            this.fetched = null;
+            this.position = null;
             this.hasValidPosition = false;
         }
 
         private void seek(long offset) {
-            this.consumed = offset;
-            this.fetched = offset;
+            this.position = offset;
             this.awaitingReset = false;
             this.resetStrategy = null;
             this.hasValidPosition = true;
         }
 
-        private void fetched(long offset) {
+        private void position(long offset) {
             if (!hasValidPosition)
                 throw new IllegalStateException("Cannot update fetch position without valid
consumed/fetched positions");
-            this.fetched = offset;
-        }
-
-        private void consumed(long offset) {
-            if (!hasValidPosition)
-                throw new IllegalStateException("Cannot update consumed position without
valid consumed/fetched positions");
-            this.consumed = offset;
+            this.position = offset;
         }
 
         private void committed(OffsetAndMetadata offset) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e08b922a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 7e8bd40..79e47c0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -119,8 +119,7 @@ public class FetcherTest {
         consumerClient.poll(0);
         records = fetcher.fetchedRecords().get(tp);
         assertEquals(3, records.size());
-        assertEquals(4L, (long) subscriptions.fetched(tp)); // this is the next fetching
position
-        assertEquals(4L, (long) subscriptions.consumed(tp));
+        assertEquals(4L, (long) subscriptions.position(tp)); // this is the next fetching
position
         long offset = 1;
         for (ConsumerRecord<byte[], byte[]> record : records) {
             assertEquals(offset, record.offset());
@@ -149,8 +148,7 @@ public class FetcherTest {
         consumerClient.poll(0);
         consumerRecords = fetcher.fetchedRecords().get(tp);
         assertEquals(3, consumerRecords.size());
-        assertEquals(31L, (long) subscriptions.fetched(tp)); // this is the next fetching
position
-        assertEquals(31L, (long) subscriptions.consumed(tp));
+        assertEquals(31L, (long) subscriptions.position(tp)); // this is the next fetching
position
 
         assertEquals(15L, consumerRecords.get(0).offset());
         assertEquals(20L, consumerRecords.get(1).offset());
@@ -267,8 +265,7 @@ public class FetcherTest {
         consumerClient.poll(0);
         assertTrue(subscriptions.isOffsetResetNeeded(tp));
         assertEquals(0, fetcher.fetchedRecords().size());
-        assertEquals(null, subscriptions.fetched(tp));
-        assertEquals(null, subscriptions.consumed(tp));
+        assertEquals(null, subscriptions.position(tp));
     }
 
     @Test
@@ -316,8 +313,7 @@ public class FetcherTest {
         // disconnects should have no affect on subscription state
         assertFalse(subscriptions.isOffsetResetNeeded(tp));
         assertTrue(subscriptions.isFetchable(tp));
-        assertEquals(0, (long) subscriptions.fetched(tp));
-        assertEquals(0, (long) subscriptions.consumed(tp));
+        assertEquals(0, (long) subscriptions.position(tp));
     }
 
     @Test
@@ -329,8 +325,7 @@ public class FetcherTest {
 
         fetcher.updateFetchPositions(Collections.singleton(tp));
         assertTrue(subscriptions.isFetchable(tp));
-        assertEquals(5, (long) subscriptions.fetched(tp));
-        assertEquals(5, (long) subscriptions.consumed(tp));
+        assertEquals(5, (long) subscriptions.position(tp));
     }
 
     @Test
@@ -343,8 +338,7 @@ public class FetcherTest {
         fetcher.updateFetchPositions(Collections.singleton(tp));
         assertFalse(subscriptions.isOffsetResetNeeded(tp));
         assertTrue(subscriptions.isFetchable(tp));
-        assertEquals(5, (long) subscriptions.fetched(tp));
-        assertEquals(5, (long) subscriptions.consumed(tp));
+        assertEquals(5, (long) subscriptions.position(tp));
     }
 
     @Test
@@ -357,8 +351,7 @@ public class FetcherTest {
         fetcher.updateFetchPositions(Collections.singleton(tp));
         assertFalse(subscriptions.isOffsetResetNeeded(tp));
         assertTrue(subscriptions.isFetchable(tp));
-        assertEquals(5, (long) subscriptions.fetched(tp));
-        assertEquals(5, (long) subscriptions.consumed(tp));
+        assertEquals(5, (long) subscriptions.position(tp));
     }
 
     @Test
@@ -371,8 +364,7 @@ public class FetcherTest {
         fetcher.updateFetchPositions(Collections.singleton(tp));
         assertFalse(subscriptions.isOffsetResetNeeded(tp));
         assertTrue(subscriptions.isFetchable(tp));
-        assertEquals(5, (long) subscriptions.fetched(tp));
-        assertEquals(5, (long) subscriptions.consumed(tp));
+        assertEquals(5, (long) subscriptions.position(tp));
     }
 
     @Test
@@ -390,8 +382,7 @@ public class FetcherTest {
         fetcher.updateFetchPositions(Collections.singleton(tp));
         assertFalse(subscriptions.isOffsetResetNeeded(tp));
         assertTrue(subscriptions.isFetchable(tp));
-        assertEquals(5, (long) subscriptions.fetched(tp));
-        assertEquals(5, (long) subscriptions.consumed(tp));
+        assertEquals(5, (long) subscriptions.position(tp));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/e08b922a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index 6566025..439ded7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -60,13 +60,11 @@ public class SubscriptionStateTest {
     public void partitionReset() {
         state.assignFromUser(Arrays.asList(tp0));
         state.seek(tp0, 5);
-        assertEquals(5L, (long) state.fetched(tp0));
-        assertEquals(5L, (long) state.consumed(tp0));
+        assertEquals(5L, (long) state.position(tp0));
         state.needOffsetReset(tp0);
         assertFalse(state.isFetchable(tp0));
         assertTrue(state.isOffsetResetNeeded(tp0));
-        assertEquals(null, state.fetched(tp0));
-        assertEquals(null, state.consumed(tp0));
+        assertEquals(null, state.position(tp0));
 
         // seek should clear the reset and make the partition fetchable
         state.seek(tp0, 0);
@@ -114,33 +112,20 @@ public class SubscriptionStateTest {
     }
 
     @Test(expected = IllegalStateException.class)
-    public void invalidConsumedPositionUpdate() {
+    public void invalidPositionUpdate() {
         state.subscribe(Arrays.asList(topic), rebalanceListener);
         state.assignFromSubscribed(asList(tp0));
-        state.consumed(tp0, 0);
+        state.position(tp0, 0);
     }
 
     @Test(expected = IllegalStateException.class)
-    public void invalidFetchPositionUpdate() {
-        state.subscribe(Arrays.asList(topic), rebalanceListener);
-        state.assignFromSubscribed(asList(tp0));
-        state.fetched(tp0, 0);
+    public void cantChangePositionForNonAssignedPartition() {
+        state.position(tp0, 1);
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void cantChangeFetchPositionForNonAssignedPartition() {
-        state.fetched(tp0, 1);
-    }
-    
-    @Test(expected = IllegalStateException.class)
-    public void cantChangeConsumedPositionForNonAssignedPartition() {
-        state.consumed(tp0, 1);
-    }
-    
     public void assertAllPositions(TopicPartition tp, Long offset) {
         assertEquals(offset.longValue(), state.committed(tp).offset());
-        assertEquals(offset, state.fetched(tp));
-        assertEquals(offset, state.consumed(tp));
+        assertEquals(offset, state.position(tp));
     }
 
     @Test(expected = IllegalStateException.class)


Mime
View raw message