kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-5075; Defer exception to the next pollOnce() if consumer's fetch position has already increased
Date Tue, 18 Apr 2017 00:28:16 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 863867582 -> a26731674


KAFKA-5075; Defer exception to the next pollOnce() if consumer's fetch position has already
increased

Author: Dong Lin <lindong28@gmail.com>
Author: Dong Lin <lindong28@users.noreply.github.com>

Reviewers: Jiangjie Qin <becket.qin@gmail.com>

Closes #2859 from lindong28/KAFKA-5075

This is a backport patch for 0.10.2 after resolving the following conflict.
Conflicts:
	clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a2673167
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a2673167
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a2673167

Branch: refs/heads/0.10.2
Commit: a26731674815a57368ad4500a77e84a218a40e63
Parents: 8638675
Author: Dong Lin <lindong28@gmail.com>
Authored: Sun Apr 16 23:23:25 2017 -0700
Committer: Jiangjie Qin <becket.qin@gmail.com>
Committed: Mon Apr 17 17:27:41 2017 -0700

----------------------------------------------------------------------
 .../clients/consumer/internals/Fetcher.java     | 78 ++++++++++-----
 .../clients/consumer/internals/FetcherTest.java | 99 +++++++++++++++++---
 .../kafka/api/AuthorizerIntegrationTest.scala   |  1 +
 3 files changed, 143 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a2673167/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 6a13d46..c8bbfa3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -95,6 +95,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
     private final Deserializer<V> valueDeserializer;
 
     private PartitionRecords<K, V> nextInLineRecords = null;
+    private ExceptionMetadata nextInLineExceptionMetadata = null;
 
     public Fetcher(ConsumerNetworkClient client,
                    int minBytes,
@@ -461,35 +462,54 @@ public class Fetcher<K, V> implements SubscriptionState.Listener
{
      *         the defaultResetPolicy is NONE
      */
     public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords()
{
+        if (nextInLineExceptionMetadata != null) {
+            ExceptionMetadata exceptionMetadata = nextInLineExceptionMetadata;
+            nextInLineExceptionMetadata = null;
+            TopicPartition tp = exceptionMetadata.partition;
+            if (subscriptions.isFetchable(tp) && subscriptions.position(tp) == exceptionMetadata.fetchedOffset)
+                throw exceptionMetadata.exception;
+        }
+
         Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
         int recordsRemaining = maxPollRecords;
+        TopicPartition fetchedPartition = null;
+        long fetchedOffsets = -1L;
 
-        while (recordsRemaining > 0) {
-            if (nextInLineRecords == null || nextInLineRecords.isDrained()) {
-                CompletedFetch completedFetch = completedFetches.poll();
-                if (completedFetch == null)
-                    break;
-
-                nextInLineRecords = parseCompletedFetch(completedFetch);
-            } else {
-                TopicPartition partition = nextInLineRecords.partition;
-                List<ConsumerRecord<K, V>> records = drainRecords(nextInLineRecords,
recordsRemaining);
-                if (!records.isEmpty()) {
-                    List<ConsumerRecord<K, V>> currentRecords = drained.get(partition);
-                    if (currentRecords == null) {
-                        drained.put(partition, records);
-                    } else {
-                        // this case shouldn't usually happen because we only send one fetch
at a time per partition,
-                        // but it might conceivably happen in some rare cases (such as partition
leader changes).
-                        // we have to copy to a new list because the old one may be immutable
-                        List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size()
+ currentRecords.size());
-                        newRecords.addAll(currentRecords);
-                        newRecords.addAll(records);
-                        drained.put(partition, newRecords);
+        try {
+            while (recordsRemaining > 0) {
+                if (nextInLineRecords == null || nextInLineRecords.isDrained()) {
+                    CompletedFetch completedFetch = completedFetches.poll();
+                    if (completedFetch == null) break;
+
+                    fetchedPartition = completedFetch.partition;
+                    fetchedOffsets = completedFetch.fetchedOffset;
+                    nextInLineRecords = parseCompletedFetch(completedFetch);
+                } else {
+                    TopicPartition partition = nextInLineRecords.partition;
+                    fetchedPartition = partition;
+                    fetchedOffsets = subscriptions.position(partition);
+                    List<ConsumerRecord<K, V>> records = drainRecords(nextInLineRecords,
recordsRemaining);
+                    if (!records.isEmpty()) {
+                        List<ConsumerRecord<K, V>> currentRecords = drained.get(partition);
+                        if (currentRecords == null) {
+                            drained.put(partition, records);
+                        } else {
+                            // this case shouldn't usually happen because we only send one
fetch at a time per partition,
+                            // but it might conceivably happen in some rare cases (such as
partition leader changes).
+                            // we have to copy to a new list because the old one may be immutable
+                            List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size()
+ currentRecords.size());
+                            newRecords.addAll(currentRecords);
+                            newRecords.addAll(records);
+                            drained.put(partition, newRecords);
+                        }
+                        recordsRemaining -= records.size();
                     }
-                    recordsRemaining -= records.size();
                 }
             }
+        } catch (KafkaException e) {
+            if (drained.isEmpty())
+                throw e;
+            nextInLineExceptionMetadata = new ExceptionMetadata(fetchedPartition, fetchedOffsets,
e);
         }
 
         return drained;
@@ -1139,4 +1159,16 @@ public class Fetcher<K, V> implements SubscriptionState.Listener
{
         }
     }
 
+    private static class ExceptionMetadata {
+        private final TopicPartition partition;
+        private final long fetchedOffset;
+        private final KafkaException exception;
+
+        private ExceptionMetadata(TopicPartition partition, long fetchedOffset, KafkaException
exception) {
+            this.partition = partition;
+            this.fetchedOffset = fetchedOffset;
+            this.exception = exception;
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a2673167/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 4d388e6..24ba434 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -57,6 +57,7 @@ import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -84,6 +85,7 @@ public class FetcherTest {
     private String groupId = "test-group";
     private final String metricGroup = "consumer" + groupId + "-fetch-manager-metrics";
     private TopicPartition tp = new TopicPartition(topicName, 0);
+    private TopicPartition tp1 = new TopicPartition(topicName, 1);
     private int minBytes = 1;
     private int maxBytes = Integer.MAX_VALUE;
     private int maxWaitMs = 0;
@@ -92,7 +94,7 @@ public class FetcherTest {
     private MockTime time = new MockTime(1);
     private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
     private MockClient client = new MockClient(time, metadata);
-    private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
+    private Cluster cluster = TestUtils.singletonCluster(topicName, 2);
     private Node node = cluster.nodes().get(0);
     private Metrics metrics = new Metrics(time);
     private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
@@ -827,28 +829,28 @@ public class FetcherTest {
     }
 
     private void testGetOffsetsForTimesWithError(Errors errorForTp0,
-                                                 Errors errorForTp1,
+                                                 Errors errorFortp,
                                                  long offsetForTp0,
-                                                 long offsetForTp1,
+                                                 long offsetFortp,
                                                  Long expectedOffsetForTp0,
-                                                 Long expectedOffsetForTp1) {
+                                                 Long expectedOffsetFortp) {
         client.reset();
         TopicPartition tp0 = tp;
-        TopicPartition tp1 = new TopicPartition(topicName, 1);
+        TopicPartition tp = new TopicPartition(topicName, 1);
         // Ensure metadata has both partition.
         Cluster cluster = TestUtils.clusterWith(2, topicName, 2);
         metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         // First try should fail due to metadata error.
         client.prepareResponseFrom(listOffsetResponse(tp0, errorForTp0, offsetForTp0, offsetForTp0),
cluster.leaderFor(tp0));
-        client.prepareResponseFrom(listOffsetResponse(tp1, errorForTp1, offsetForTp1, offsetForTp1),
cluster.leaderFor(tp1));
+        client.prepareResponseFrom(listOffsetResponse(tp, errorFortp, offsetFortp, offsetFortp),
cluster.leaderFor(tp));
         // Second try should succeed.
         client.prepareResponseFrom(listOffsetResponse(tp0, Errors.NONE, offsetForTp0, offsetForTp0),
cluster.leaderFor(tp0));
-        client.prepareResponseFrom(listOffsetResponse(tp1, Errors.NONE, offsetForTp1, offsetForTp1),
cluster.leaderFor(tp1));
+        client.prepareResponseFrom(listOffsetResponse(tp, Errors.NONE, offsetFortp, offsetFortp),
cluster.leaderFor(tp));
 
         Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
         timestampToSearch.put(tp0, 0L);
-        timestampToSearch.put(tp1, 0L);
+        timestampToSearch.put(tp, 0L);
         Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = fetcher.getOffsetsByTimes(timestampToSearch,
Long.MAX_VALUE);
 
         if (expectedOffsetForTp0 == null)
@@ -858,14 +860,87 @@ public class FetcherTest {
             assertEquals(expectedOffsetForTp0.longValue(), offsetAndTimestampMap.get(tp0).offset());
         }
 
-        if (expectedOffsetForTp1 == null)
-            assertNull(offsetAndTimestampMap.get(tp1));
+        if (expectedOffsetFortp == null)
+            assertNull(offsetAndTimestampMap.get(tp));
         else {
-            assertEquals(expectedOffsetForTp1.longValue(), offsetAndTimestampMap.get(tp1).timestamp());
-            assertEquals(expectedOffsetForTp1.longValue(), offsetAndTimestampMap.get(tp1).offset());
+            assertEquals(expectedOffsetFortp.longValue(), offsetAndTimestampMap.get(tp).timestamp());
+            assertEquals(expectedOffsetFortp.longValue(), offsetAndTimestampMap.get(tp).offset());
         }
     }
 
+    @Test
+    public void testFetchPositionAfterException() {
+    // verify the advancement in the next fetch offset equals the number of fetched records
when
+        // some fetched partitions cause Exception. This ensures that consumer won't lose
record upon exception
+        subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(tp, tp1));
+        subscriptionsNoAutoReset.seek(tp, 1);
+        subscriptionsNoAutoReset.seek(tp1, 1);
+
+        assertEquals(1, fetcherNoAutoReset.sendFetches());
+
+        Map<TopicPartition, FetchResponse.PartitionData> partitions = new HashMap<>();
+        partitions.put(tp, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE.code(),
100, MemoryRecords.EMPTY));
+        partitions.put(tp1, new FetchResponse.PartitionData(Errors.NONE.code(), 100, records));
+        client.prepareResponse(new FetchResponse(new LinkedHashMap<>(partitions), 0));
+        consumerClient.poll(0);
+
+        List<ConsumerRecord<byte[], byte[]>> fetchedRecords = new ArrayList<>();
+        List<OffsetOutOfRangeException> exceptions = new ArrayList<>();
+
+        try {
+            for (List<ConsumerRecord<byte[], byte[]>> records: fetcherNoAutoReset.fetchedRecords().values())
+                fetchedRecords.addAll(records);
+        } catch (OffsetOutOfRangeException e) {
+            exceptions.add(e);
+        }
+
+        assertEquals(fetchedRecords.size(), subscriptionsNoAutoReset.position(tp1) - 1);
+
+        try {
+            for (List<ConsumerRecord<byte[], byte[]>> records: fetcherNoAutoReset.fetchedRecords().values())
+                fetchedRecords.addAll(records);
+        } catch (OffsetOutOfRangeException e) {
+            exceptions.add(e);
+        }
+
+        assertEquals(4, subscriptionsNoAutoReset.position(tp1).longValue());
+        assertEquals(3, fetchedRecords.size());
+
+        // Should have received one OffsetOutOfRangeException for partition tp
+        assertEquals(1, exceptions.size());
+        OffsetOutOfRangeException e = exceptions.get(0);
+        assertTrue(e.offsetOutOfRangePartitions().containsKey(tp));
+        assertEquals(e.offsetOutOfRangePartitions().size(), 1);
+    }
+
+    @Test
+    public void testSeekBeforeException() {
+        Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptionsNoAutoReset, new
Metrics(time), 2);
+
+        subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(tp));
+        subscriptionsNoAutoReset.seek(tp, 1);
+        assertEquals(1, fetcher.sendFetches());
+        Map<TopicPartition, FetchResponse.PartitionData> partitions = new HashMap<>();
+        partitions.put(tp, new FetchResponse.PartitionData(Errors.NONE.code(), 100, records));
+        client.prepareResponse(fetchResponse(this.records, Errors.NONE.code(), 100L, 0));
+        consumerClient.poll(0);
+
+        assertEquals(2, fetcher.fetchedRecords().get(tp).size());
+
+        subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(tp, tp1));
+        subscriptionsNoAutoReset.seek(tp1, 1);
+        assertEquals(1, fetcher.sendFetches());
+        partitions = new HashMap<>();
+        partitions.put(tp1, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE.code(),
100, MemoryRecords.EMPTY));
+        client.prepareResponse(new FetchResponse(new LinkedHashMap<>(partitions), 0));
+        consumerClient.poll(0);
+        assertEquals(1, fetcher.fetchedRecords().get(tp).size());
+
+        subscriptionsNoAutoReset.seek(tp1, 10);
+        // Should not throw OffsetOutOfRangeException after the seek
+        assertEquals(0, fetcher.fetchedRecords().size());
+    }
+
     private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp) {
         // matches any list offset request with the provided timestamp
         return new MockClient.RequestMatcher() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a2673167/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index c9d35af..1ba3214 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -605,6 +605,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     try {
       consumer.subscribe(Pattern.compile(".*"), new NoOpConsumerRebalanceListener)
       consumeRecords(consumer)
+      consumeRecords(consumer)
       Assert.fail("Expected TopicAuthorizationException")
     } catch {
       case _: TopicAuthorizationException => //expected


Mime
View raw message