kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2390; OffsetOutOfRangeException should contain the Offset and Partition info.
Date Wed, 23 Sep 2015 23:49:31 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk d1dd1e902 -> b9ceac3ac


KAFKA-2390; OffsetOutOfRangeException should contain the Offset and Partition info.

Author: Dong Lin <lindong28@gmail.com>
Author: Dong Lin <lindong@cis.upenn.edu>

Reviewers: Jason Gustafson, Ismael Juma, Guozhang Wang

Closes #118 from lindong28/KAFKA-2390


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b9ceac3a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b9ceac3a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b9ceac3a

Branch: refs/heads/trunk
Commit: b9ceac3ace353526da9c5d0f5b486265c871793a
Parents: d1dd1e9
Author: Dong Lin <lindong28@gmail.com>
Authored: Wed Sep 23 16:53:05 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Sep 23 16:53:05 2015 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   |  6 +++
 .../clients/consumer/internals/Fetcher.java     | 47 ++++++++++++++++++--
 .../consumer/internals/SubscriptionState.java   |  4 ++
 .../errors/OffsetOutOfRangeException.java       | 12 +++++
 4 files changed, 65 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b9ceac3a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 0b67915..b52ace0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -828,6 +828,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
 
         // init any new fetches (won't resend pending fetches)
         Cluster cluster = this.metadata.fetch();
+        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
+        // Avoid block waiting for response if we already have data available, e.g. from
another API call to commit.
+        if (!records.isEmpty()) {
+            client.poll(0);
+            return records;
+        }
         fetcher.initFetches(cluster);
         client.poll(timeout);
         return fetcher.fetchedRecords();

http://git-wip-us.apache.org/repos/asf/kafka/blob/b9ceac3a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 487fb0d..59249af 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.InvalidMetadataException;
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
@@ -78,6 +79,8 @@ public class Fetcher<K, V> {
     private final Deserializer<K> keyDeserializer;
     private final Deserializer<V> valueDeserializer;
 
+    private final Map<TopicPartition, Long> offsetOutOfRangePartitions;
+
     public Fetcher(ConsumerNetworkClient client,
                    int minBytes,
                    int maxWaitMs,
@@ -106,6 +109,7 @@ public class Fetcher<K, V> {
         this.valueDeserializer = valueDeserializer;
 
         this.records = new LinkedList<PartitionRecords<K, V>>();
+        this.offsetOutOfRangePartitions = new HashMap<>();
 
         this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix, metricTags);
         this.retryBackoffMs = retryBackoffMs;
@@ -137,6 +141,7 @@ public class Fetcher<K, V> {
     /**
      * Update the fetch positions for the provided partitions.
      * @param partitions
+     * @throws NoOffsetForPartitionException If no offset is stored for a given partition
and no reset policy is available
      */
     public void updateFetchPositions(Set<TopicPartition> partitions) {
         // reset the fetch position to the committed position
@@ -258,15 +263,43 @@ public class Fetcher<K, V> {
     }
 
     /**
+     * If any partition from previous fetchResponse contains OffsetOutOfRange error, throw
+     * OffsetOutOfRangeException and clear the partition list.
+     *
+     * @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse
+     */
+    private void throwIfOffsetOutOfRange() throws OffsetOutOfRangeException {
+        Map<TopicPartition, Long> currentOutOfRangePartitions = new HashMap<>();
+
+        // filter offsetOutOfRangePartitions to retain only the fetchable partitions
+        for (Map.Entry<TopicPartition, Long> entry: this.offsetOutOfRangePartitions.entrySet())
{
+            if (!subscriptions.isFetchable(entry.getKey())) {
+                log.debug("Ignoring fetched records for {} since it is no longer fetchable",
entry.getKey());
+                continue;
+            }
+            Long consumed = subscriptions.consumed(entry.getKey());
+            // ignore partition if its consumed offset != offset in fetchResponse, e.g. after
seek()
+            if (consumed != null && entry.getValue() == consumed)
+                currentOutOfRangePartitions.put(entry.getKey(), entry.getValue());
+        }
+        this.offsetOutOfRangePartitions.clear();
+        if (!currentOutOfRangePartitions.isEmpty())
+            throw new OffsetOutOfRangeException(currentOutOfRangePartitions);
+    }
+
+    /**
      * Return the fetched records, empty the record buffer and update the consumed position.
      *
      * @return The fetched records per partition
+     * @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse
      */
     public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords()
{
         if (this.subscriptions.partitionAssignmentNeeded()) {
             return Collections.emptyMap();
         } else {
             Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new
HashMap<>();
+            throwIfOffsetOutOfRange();
+
             for (PartitionRecords<K, V> part : this.records) {
                 if (!subscriptions.isFetchable(part.partition)) {
                     log.debug("Ignoring fetched records for {} since it is no longer fetchable",
part.partition);
@@ -378,8 +411,11 @@ public class Fetcher<K, V> {
                     fetchable.put(node, fetch);
                 }
 
-                long offset = this.subscriptions.fetched(partition);
-                fetch.put(partition, new FetchRequest.PartitionData(offset, this.fetchSize));
+                long fetched = this.subscriptions.fetched(partition);
+                long consumed = this.subscriptions.consumed(partition);
+                // Only fetch data for partitions whose previously fetched data has been
consumed
+                if (consumed == fetched)
+                    fetch.put(partition, new FetchRequest.PartitionData(fetched, this.fetchSize));
             }
         }
 
@@ -447,9 +483,12 @@ public class Fetcher<K, V> {
                     || partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
                     this.metadata.requestUpdate();
                 } else if (partition.errorCode == Errors.OFFSET_OUT_OF_RANGE.code()) {
-                    // TODO: this could be optimized by grouping all out-of-range partitions
+                    long fetchOffset = request.fetchData().get(tp).offset;
+                    if (subscriptions.hasDefaultOffsetResetPolicy())
+                        subscriptions.needOffsetReset(tp);
+                    else
+                        this.offsetOutOfRangePartitions.put(tp, fetchOffset);
                     log.info("Fetch offset {} is out of range, resetting offset", subscriptions.fetched(tp));
-                    subscriptions.needOffsetReset(tp);
                 } else if (partition.errorCode == Errors.UNKNOWN.code()) {
                     log.warn("Unknown error fetching data for topic-partition {}", tp);
                 } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b9ceac3a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 1f4deea..9b610d8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -244,6 +244,10 @@ public class SubscriptionState {
         needOffsetReset(partition, defaultResetStrategy);
     }
 
+    public boolean hasDefaultOffsetResetPolicy() {
+        return defaultResetStrategy != OffsetResetStrategy.NONE;
+    }
+
     public boolean isOffsetResetNeeded(TopicPartition partition) {
         return assignedState(partition).awaitingReset;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b9ceac3a/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java
b/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java
index fc7c6e3..4983bc0 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java
@@ -12,6 +12,9 @@
  */
 package org.apache.kafka.common.errors;
 
+import org.apache.kafka.common.TopicPartition;
+import java.util.Map;
+
 /**
  * This offset is either larger or smaller than the range of offsets the server has for the
given partition.
  * 
@@ -19,10 +22,15 @@ package org.apache.kafka.common.errors;
 public class OffsetOutOfRangeException extends RetriableException {
 
     private static final long serialVersionUID = 1L;
+    private Map<TopicPartition, Long> offsetOutOfRangePartitions = null;
 
     public OffsetOutOfRangeException() {
     }
 
+    public OffsetOutOfRangeException(Map<TopicPartition, Long> offsetOutOfRangePartitions)
{
+        this.offsetOutOfRangePartitions = offsetOutOfRangePartitions;
+    }
+
     public OffsetOutOfRangeException(String message) {
         super(message);
     }
@@ -35,4 +43,8 @@ public class OffsetOutOfRangeException extends RetriableException {
         super(message, cause);
     }
 
+    public Map<TopicPartition, Long> offsetOutOfRangePartitions() {
+        return offsetOutOfRangePartitions;
+    }
+
 }


Mime
View raw message