kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch trunk updated: MINOR: remove duplicate map in StoreChangelogReader (#5143)
Date Fri, 08 Jun 2018 01:47:55 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bb260e9  MINOR: remove duplicate map in StoreChangelogReader (#5143)
bb260e9 is described below

commit bb260e924f1fada8f6f138b13d809b58dcc85b2c
Author: Matthias J. Sax <mjsax@apache.org>
AuthorDate: Thu Jun 7 18:47:49 2018 -0700

    MINOR: remove duplicate map in StoreChangelogReader (#5143)
    
    Reviewers: Guozhang Wang <guozhang@confluent.io>, John Roesler <john@confluent.io>,
Bill Bejeck <bill@confluent.io>
---
 .../processor/internals/StoreChangelogReader.java        | 16 ++++++----------
 1 file changed, 6 insertions(+), 10 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index af5ff47..bb0ed06 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -49,7 +49,6 @@ public class StoreChangelogReader implements ChangelogReader {
     private final Map<TopicPartition, StateRestorer> stateRestorers = new HashMap<>();
     private final Map<TopicPartition, StateRestorer> needsRestoring = new HashMap<>();
     private final Map<TopicPartition, StateRestorer> needsInitializing = new HashMap<>();
-    private Map<TopicPartition, Long> updatedEndOffsets = new HashMap<>();
 
     public StoreChangelogReader(final Consumer<byte[], byte[]> restoreConsumer,
                                 final StateRestoreListener userStateRestoreListener,
@@ -69,9 +68,6 @@ public class StoreChangelogReader implements ChangelogReader {
     public Collection<TopicPartition> restore(final RestoringTasks active) {
         if (!needsInitializing.isEmpty()) {
             initialize();
-            final Set<TopicPartition> remainingPartitions = new HashSet<>(needsRestoring.keySet());
-            remainingPartitions.removeAll(updatedEndOffsets.keySet());
-            updatedEndOffsets.putAll(restoreConsumer.endOffsets(remainingPartitions));
         }
 
         if (needsRestoring.isEmpty()) {
@@ -85,11 +81,11 @@ public class StoreChangelogReader implements ChangelogReader {
             while (iterator.hasNext()) {
                 final TopicPartition partition = iterator.next();
                 final StateRestorer restorer = stateRestorers.get(partition);
-                final long pos = processNext(records.records(partition), restorer, updatedEndOffsets.get(partition));
+                final long pos = processNext(records.records(partition), restorer, endOffsets.get(partition));
                 restorer.setRestoredOffset(pos);
-                if (restorer.hasCompleted(pos, updatedEndOffsets.get(partition))) {
+                if (restorer.hasCompleted(pos, endOffsets.get(partition))) {
                     restorer.restoreDone();
-                    updatedEndOffsets.remove(partition);
+                    endOffsets.remove(partition);
                     iterator.remove();
                 }
             }
@@ -120,8 +116,8 @@ public class StoreChangelogReader implements ChangelogReader {
         // the needsInitializing map is not empty, meaning we do not know the metadata for
some of them yet
         refreshChangelogInfo();
 
-        Map<TopicPartition, StateRestorer> initializable = new HashMap<>();
-        for (Map.Entry<TopicPartition, StateRestorer> entry : needsInitializing.entrySet())
{
+        final Map<TopicPartition, StateRestorer> initializable = new HashMap<>();
+        for (final Map.Entry<TopicPartition, StateRestorer> entry : needsInitializing.entrySet())
{
             final TopicPartition topicPartition = entry.getKey();
             if (hasPartition(topicPartition)) {
                 initializable.put(entry.getKey(), entry.getValue());
@@ -253,7 +249,7 @@ public class StoreChangelogReader implements ChangelogReader {
                              final Long endOffset) {
         final List<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<>();
         long nextPosition = -1;
-        int numberRecords = records.size();
+        final int numberRecords = records.size();
         int numberRestored = 0;
         long lastRestoredOffset = -1;
         for (final ConsumerRecord<byte[], byte[]> record : records) {

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.

Mime
View raw message