kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2390; followup; add unit test for OffsetOutOfRange exception
Date Thu, 24 Sep 2015 21:36:26 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk ac757eb19 -> bcf374da9


KAFKA-2390; followup; add unit test for OffsetOutOfRange exception

Author: Dong Lin <lindong28@gmail.com>

Reviewers: Jason Gustafson, Guozhang Wang

Closes #239 from lindong28/KAFKA-2390-followup


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bcf374da
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bcf374da
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bcf374da

Branch: refs/heads/trunk
Commit: bcf374da9e0d9ec8d13ecdaf4a1a15691549442b
Parents: ac757eb
Author: Dong Lin <lindong28@gmail.com>
Authored: Thu Sep 24 14:40:03 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Sep 24 14:40:03 2015 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   |  4 ++
 .../clients/consumer/internals/Fetcher.java     |  9 +--
 .../clients/consumer/internals/FetcherTest.java | 69 +++++++++++++++-----
 3 files changed, 63 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bcf374da/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index b52ace0..d2dcbe3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -775,6 +775,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      *
      * @throws NoOffsetForPartitionException If there is no stored offset for a subscribed
partition and no automatic
      *             offset reset policy has been configured.
+     * @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse
and
+     *         the defaultResetPolicy is NONE
      */
     @Override
     public ConsumerRecords<K, V> poll(long timeout) {
@@ -812,6 +814,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      * heart-beating, auto-commits, and offset updates.
      * @param timeout The maximum time to block in the underlying poll
      * @return The fetched records (may be empty)
+     * @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse
and
+     *         the defaultResetPolicy is NONE
      */
     private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long
timeout) {
         // TODO: Sub-requests should take into account the poll timeout (KAFKA-1894)

http://git-wip-us.apache.org/repos/asf/kafka/blob/bcf374da/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 59249af..4608959 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -263,8 +263,8 @@ public class Fetcher<K, V> {
     }
 
     /**
-     * If any partition from previous fetchResponse contains OffsetOutOfRange error, throw
-     * OffsetOutOfRangeException and clear the partition list.
+     * If any partition from previous fetchResponse contains OffsetOutOfRange error and
+     * the defaultResetPolicy is NONE, throw OffsetOutOfRangeException
      *
      * @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse
      */
@@ -279,7 +279,7 @@ public class Fetcher<K, V> {
             }
             Long consumed = subscriptions.consumed(entry.getKey());
             // ignore partition if its consumed offset != offset in fetchResponse, e.g. after
seek()
-            if (consumed != null && entry.getValue() == consumed)
+            if (consumed != null && entry.getValue().equals(consumed))
                 currentOutOfRangePartitions.put(entry.getKey(), entry.getValue());
         }
         this.offsetOutOfRangePartitions.clear();
@@ -291,7 +291,8 @@ public class Fetcher<K, V> {
      * Return the fetched records, empty the record buffer and update the consumed position.
      *
      * @return The fetched records per partition
-     * @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse
+     * @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse
and
+     *         the defaultResetPolicy is NONE
      */
     public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords()
{
         if (this.subscriptions.partitionAssignmentNeeded()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/bcf374da/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 17ec2ce..b3169d8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
@@ -56,6 +57,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class FetcherTest {
     private ConsumerRebalanceListener listener = new NoOpConsumerRebalanceListener();
@@ -73,27 +75,15 @@ public class FetcherTest {
     private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
     private Node node = cluster.nodes().get(0);
     private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
+    private SubscriptionState subscriptionsNoAutoReset = new SubscriptionState(OffsetResetStrategy.NONE);
     private Metrics metrics = new Metrics(time);
     private Map<String, String> metricTags = new LinkedHashMap<String, String>();
     private static final double EPSILON = 0.0001;
     private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata,
time, 100);
 
     private MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024),
CompressionType.NONE);
-
-    private Fetcher<byte[], byte[]> fetcher = new Fetcher<byte[], byte[]>(consumerClient,
-                                                                          minBytes,
-                                                                          maxWaitMs,
-                                                                          fetchSize,
-                                                                          true, // check
crc
-                                                                          new ByteArrayDeserializer(),
-                                                                          new ByteArrayDeserializer(),
-                                                                          metadata,
-                                                                          subscriptions,
-                                                                          metrics,
-                                                                          "consumer" + groupId,
-                                                                          metricTags,
-                                                                          time,
-                                                                          retryBackoffMs);
+    private Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, metrics);
+    private Fetcher<byte[], byte[]> fetcherNoAutoReset = createFetcher(subscriptionsNoAutoReset,
new Metrics(time));
 
     @Before
     public void setup() throws Exception {
@@ -207,6 +197,38 @@ public class FetcherTest {
     }
 
     @Test
+    public void testFetchedRecordsAfterSeek() {
+        subscriptionsNoAutoReset.assign(Arrays.asList(tp));
+        subscriptionsNoAutoReset.seek(tp, 0);
+
+        fetcherNoAutoReset.initFetches(cluster);
+        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(),
100L, 0));
+        consumerClient.poll(0);
+        assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp));
+        subscriptionsNoAutoReset.seek(tp, 2);
+        assertEquals(0, fetcherNoAutoReset.fetchedRecords().size());
+    }
+
+    @Test
+    public void testFetchOffsetOutOfRangeException() {
+        subscriptionsNoAutoReset.assign(Arrays.asList(tp));
+        subscriptionsNoAutoReset.seek(tp, 0);
+
+        fetcherNoAutoReset.initFetches(cluster);
+        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(),
100L, 0));
+        consumerClient.poll(0);
+        assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp));
+        try {
+            fetcherNoAutoReset.fetchedRecords();
+            fail("Should have thrown OffsetOutOfRangeException");
+        } catch (OffsetOutOfRangeException e) {
+            assertTrue(e.offsetOutOfRangePartitions().containsKey(tp));
+            assertEquals(e.offsetOutOfRangePartitions().size(), 1);
+        }
+        assertEquals(0, fetcherNoAutoReset.fetchedRecords().size());
+    }
+
+    @Test
     public void testFetchDisconnected() {
         subscriptions.assign(Arrays.asList(tp));
         subscriptions.seek(tp, 0);
@@ -358,4 +380,21 @@ public class FetcherTest {
         FetchResponse response = new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error,
hw, buffer)), throttleTime);
         return response.toStruct();
     }
+
+    private  Fetcher<byte[], byte[]> createFetcher(SubscriptionState subscriptions,
Metrics metrics) {
+        return new Fetcher<byte[], byte[]>(consumerClient,
+                minBytes,
+                maxWaitMs,
+                fetchSize,
+                true, // check crc
+                new ByteArrayDeserializer(),
+                new ByteArrayDeserializer(),
+                metadata,
+                subscriptions,
+                metrics,
+                "consumer" + groupId,
+                metricTags,
+                time,
+                retryBackoffMs);
+    }
 }


Mime
View raw message