kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4937: Batch offset fetches in the Consumer
Date Thu, 20 Apr 2017 20:36:55 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 588ed4644 -> 98d62a880


KAFKA-4937: Batch offset fetches in the Consumer

change `consumer.position` so that it always updates any partitions that need an update. Keep
track of partitions that `seekToBeginning` in `StoreChangeLogReader` and do the `consumer.position`
call after all `seekToBeginning` calls.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Guozhang Wang, Jason Gustafson, Ismael Juma

Closes #2769 from dguy/kafka-4937


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/98d62a88
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/98d62a88
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/98d62a88

Branch: refs/heads/trunk
Commit: 98d62a880d91ac1b552ac3a049c5d202c2d63520
Parents: 588ed46
Author: Damian Guy <damian.guy@gmail.com>
Authored: Thu Apr 20 13:36:52 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Apr 20 13:36:52 2017 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   |  3 +-
 .../clients/consumer/internals/Fetcher.java     | 61 +++++++++++++-------
 .../clients/consumer/KafkaConsumerTest.java     |  6 +-
 .../internals/StoreChangelogReader.java         | 25 ++++----
 4 files changed, 59 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/98d62a88/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 15434bb..9c703b5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1269,7 +1269,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
                 throw new IllegalArgumentException("You can only check the position for partitions
assigned to this consumer.");
             Long offset = this.subscriptions.position(partition);
             if (offset == null) {
-                updateFetchPositions(Collections.singleton(partition));
+                // batch update fetch positions for any partitions without a valid position
+                updateFetchPositions(subscriptions.assignedPartitions());
                 offset = this.subscriptions.position(partition);
             }
             return offset;

http://git-wip-us.apache.org/repos/asf/kafka/blob/98d62a88/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 6b0adc7..e337f4e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -71,7 +71,6 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -227,10 +226,13 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
      * @param partitions the partitions to reset
      */
     public void resetOffsetsIfNeeded(Set<TopicPartition> partitions) {
+        final Set<TopicPartition> needsOffsetReset = new HashSet<>();
         for (TopicPartition tp : partitions) {
-            // TODO: If there are several offsets to reset, we could submit offset requests
in parallel
             if (subscriptions.isAssigned(tp) && subscriptions.isOffsetResetNeeded(tp))
-                resetOffset(tp);
+                needsOffsetReset.add(tp);
+        }
+        if (!needsOffsetReset.isEmpty()) {
+            resetOffsets(needsOffsetReset);
         }
     }
 
@@ -240,23 +242,28 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
      * @throws NoOffsetForPartitionException If no offset is stored for a given partition
and no reset policy is available
      */
     public void updateFetchPositions(Set<TopicPartition> partitions) {
+        final Set<TopicPartition> needsOffsetReset = new HashSet<>();
         // reset the fetch position to the committed position
         for (TopicPartition tp : partitions) {
             if (!subscriptions.isAssigned(tp) || subscriptions.hasValidPosition(tp))
                 continue;
 
             if (subscriptions.isOffsetResetNeeded(tp)) {
-                resetOffset(tp);
+                needsOffsetReset.add(tp);
             } else if (subscriptions.committed(tp) == null) {
                 // there's no committed position, so we need to reset with the default strategy
                 subscriptions.needOffsetReset(tp);
-                resetOffset(tp);
+                needsOffsetReset.add(tp);
             } else {
                 long committed = subscriptions.committed(tp).offset();
                 log.debug("Resetting offset for partition {} to the committed offset {}",
tp, committed);
                 subscriptions.seek(tp, committed);
             }
         }
+        
+        if (!needsOffsetReset.isEmpty()) {
+            resetOffsets(needsOffsetReset);
+        }
     }
 
     /**
@@ -357,15 +364,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
             return client.send(node, request);
     }
 
-    /**
-     * Reset offsets for the given partition using the offset reset strategy.
-     *
-     * @param partition The given partition that needs reset offset
-     * @throws org.apache.kafka.clients.consumer.NoOffsetForPartitionException If no offset
reset strategy is defined
-     */
-    private void resetOffset(TopicPartition partition) {
+    private long offsetResetStrategyTimestamp(final TopicPartition partition) {
         OffsetResetStrategy strategy = subscriptions.resetStrategy(partition);
-        log.debug("Resetting offset for partition {} to {} offset.", partition, strategy.name().toLowerCase(Locale.ROOT));
         final long timestamp;
         if (strategy == OffsetResetStrategy.EARLIEST)
             timestamp = ListOffsetRequest.EARLIEST_TIMESTAMP;
@@ -373,15 +373,32 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
             timestamp = ListOffsetRequest.LATEST_TIMESTAMP;
         else
             throw new NoOffsetForPartitionException(partition);
-        Map<TopicPartition, OffsetData> offsetsByTimes = retrieveOffsetsByTimes(
-                Collections.singletonMap(partition, timestamp), Long.MAX_VALUE, false);
-        OffsetData offsetData = offsetsByTimes.get(partition);
-        if (offsetData == null)
-            throw new NoOffsetForPartitionException(partition);
-        long offset = offsetData.offset;
-        // we might lose the assignment while fetching the offset, so check it is still active
-        if (subscriptions.isAssigned(partition))
-            this.subscriptions.seek(partition, offset);
+        return timestamp;
+    }
+
+    /**
+     * Reset offsets for the given partition using the offset reset strategy.
+     *
+     * @param partitions  The partitions that need offsets reset
+     * @throws org.apache.kafka.clients.consumer.NoOffsetForPartitionException If no offset
reset strategy is defined
+     */
+    private void resetOffsets(final Set<TopicPartition> partitions) {
+        final Map<TopicPartition, Long> offsetResets = new HashMap<>();
+        for (final TopicPartition partition : partitions) {
+            offsetResets.put(partition, offsetResetStrategyTimestamp(partition));
+        }
+        final Map<TopicPartition, OffsetData> offsetsByTimes = retrieveOffsetsByTimes(offsetResets,
Long.MAX_VALUE, false);
+        for (final TopicPartition partition : partitions) {
+            final OffsetData offsetData = offsetsByTimes.get(partition);
+            if (offsetData == null) {
+                throw new NoOffsetForPartitionException(partition);
+            }
+            // we might lose the assignment while fetching the offset, so check it is still
active
+            if (subscriptions.isAssigned(partition)) {
+                log.debug("Resetting offset for partition {} to {} offset.", partition, offsetData.offset);
+                this.subscriptions.seek(partition, offsetData.offset);
+            }
+        }
     }
 
     public Map<TopicPartition, OffsetAndTimestamp> getOffsetsByTimes(Map<TopicPartition,
Long> timestampsToSearch,

http://git-wip-us.apache.org/repos/asf/kafka/blob/98d62a88/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 6a9f3eb..b9600b4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1151,8 +1151,10 @@ public class KafkaConsumerTest {
         assertEquals(0, consumer.committed(tp1).offset());
 
         // fetch and verify consumer's position in the two partitions
-        client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 3L), Errors.NONE));
-        client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp1, 3L), Errors.NONE));
+        final Map<TopicPartition, Long> offsetResponse = new HashMap<>();
+        offsetResponse.put(tp0, 3L);
+        offsetResponse.put(tp1, 3L);
+        client.prepareResponse(listOffsetsResponse(offsetResponse, Errors.NONE));
         assertEquals(3L, consumer.position(tp0));
         assertEquals(3L, consumer.position(tp1));
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/98d62a88/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 8639382..375992b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -120,24 +121,26 @@ public class StoreChangelogReader implements ChangelogReader {
             log.info("{} Starting restoring state stores from changelog topics {}", logPrefix,
needsRestoring.keySet());
 
             consumer.assign(needsRestoring.keySet());
-
+            final List<StateRestorer> needsPositionUpdate = new ArrayList<>();
             for (final StateRestorer restorer : needsRestoring.values()) {
                 if (restorer.checkpoint() != StateRestorer.NO_CHECKPOINT) {
                     consumer.seek(restorer.partition(), restorer.checkpoint());
                     logRestoreOffsets(restorer.partition(),
                                       restorer.checkpoint(),
                                       endOffsets.get(restorer.partition()));
+                    restorer.setStartingOffset(consumer.position(restorer.partition()));
                 } else {
                     consumer.seekToBeginning(Collections.singletonList(restorer.partition()));
-                    logRestoreOffsets(restorer.partition(),
-                                      consumer.position(restorer.partition()),
-                                      endOffsets.get(restorer.partition()));
+                    needsPositionUpdate.add(restorer);
                 }
-                // TODO: each consumer.position() call after seekToBeginning will cause
-                // a blocking round trip to reset the position for that partition; we should
-                // batch them into a single round trip to reset for all necessary partitions
+            }
 
-                restorer.setStartingOffset(consumer.position(restorer.partition()));
+            for (final StateRestorer restorer : needsPositionUpdate) {
+                final long position = consumer.position(restorer.partition());
+                restorer.setStartingOffset(position);
+                logRestoreOffsets(restorer.partition(),
+                                  position,
+                                  endOffsets.get(restorer.partition()));
             }
 
             final Set<TopicPartition> partitions = new HashSet<>(needsRestoring.keySet());
@@ -154,12 +157,12 @@ public class StoreChangelogReader implements ChangelogReader {
         }
     }
 
-    private void logRestoreOffsets(final TopicPartition partition, final long checkpoint,
final Long aLong) {
+    private void logRestoreOffsets(final TopicPartition partition, final long startingOffset,
final Long endOffset) {
         log.debug("{} Restoring partition {} from offset {} to endOffset {}",
                   logPrefix,
                   partition,
-                  checkpoint,
-                  aLong);
+                  startingOffset,
+                  endOffset);
     }
 
     @Override


Mime
View raw message