kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5797: Handle metadata not available in store registration
Date Fri, 08 Sep 2017 16:07:02 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 d93bd1aff -> 5f0c060e0


KAFKA-5797: Handle metadata not available in store registration

This is the backport of #3748 for trunk

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Damian Guy <damian.guy@gmail.com>

Closes #3806 from guozhangwang/K5797-handle-metadata-available-0110


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5f0c060e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5f0c060e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5f0c060e

Branch: refs/heads/0.11.0
Commit: 5f0c060e0cc5d2fc483f83fbf556ad77b3ae0e96
Parents: d93bd1a
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Fri Sep 8 09:07:00 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Sep 8 09:07:00 2017 -0700

----------------------------------------------------------------------
 .../processor/internals/AssignedTasks.java      |   2 +-
 .../processor/internals/ChangelogReader.java    |  12 +-
 .../internals/ProcessorStateManager.java        |   1 -
 .../internals/StoreChangelogReader.java         | 146 ++++++++++---------
 .../processor/internals/StreamThread.java       |  18 +--
 .../processor/internals/AbstractTaskTest.java   |   3 +-
 .../processor/internals/AssignedTasksTest.java  |  15 +-
 .../processor/internals/StandbyTaskTest.java    |   3 +-
 .../internals/StoreChangelogReaderTest.java     |  93 ++----------
 .../processor/internals/StreamTaskTest.java     |   3 +-
 .../processor/internals/StreamThreadTest.java   |  69 +++------
 .../StreamThreadStateStoreProviderTest.java     |   2 +-
 .../apache/kafka/test/MockChangelogReader.java  |   9 +-
 .../kafka/test/ProcessorTopologyTestDriver.java |   7 +-
 14 files changed, 142 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5f0c060e/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 1e9ec60..cb3404f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -95,7 +95,7 @@ class AssignedTasks<T extends AbstractTask> {
                 it.remove();
             } catch (final LockException e) {
                 // made this trace as it will spam the logs in the poll loop.
-                log.trace("{} Could not create {} {} due to {}; will retry", logPrefix, taskTypeName,
entry.getKey(), e.getMessage());
+                log.trace("{} Could not create {} {} due to {}; will retry in the next run
loop", logPrefix, taskTypeName, entry.getKey(), e.getMessage());
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f0c060e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
index d8ed35e..5ebc34c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
@@ -28,18 +28,10 @@ import java.util.Map;
  */
 public interface ChangelogReader {
     /**
-     * Validate that the partition exists on the cluster.
-     * @param topicPartition    partition to validate.
-     * @param storeName         name of the store the partition is for.
-     * @throws org.apache.kafka.streams.errors.StreamsException if partition doesn't exist
-     */
-    void validatePartitionExists(final TopicPartition topicPartition, final String storeName);
-
-    /**
      * Register a state store and it's partition for later restoration.
-     * @param restorationInfo
+     * @param restorer the state restorer to register
      */
-    void register(final StateRestorer restorationInfo);
+    void register(final StateRestorer restorer);
 
     /**
      * Restore all registered state stores by reading from their changelogs.

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f0c060e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 110b03d..34a87ce 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -155,7 +155,6 @@ public class ProcessorStateManager implements StateManager {
         }
 
         final TopicPartition storePartition = new TopicPartition(topic, getPartition(topic));
-        changelogReader.validatePartitionExists(storePartition, store.name());
 
         if (isStandby) {
             if (store.persistent()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f0c060e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 9ae75f2..003ded8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -22,8 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.errors.StreamsException;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,6 +31,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -39,59 +39,23 @@ import java.util.Set;
 public class StoreChangelogReader implements ChangelogReader {
     private static final Logger log = LoggerFactory.getLogger(StoreChangelogReader.class);
 
-    private final Consumer<byte[], byte[]> consumer;
     private final String logPrefix;
-    private final Time time;
-    private final long partitionValidationTimeoutMs;
+    private final Consumer<byte[], byte[]> consumer;
+    private final Map<TopicPartition, Long> endOffsets = new HashMap<>();
     private final Map<String, List<PartitionInfo>> partitionInfo = new HashMap<>();
     private final Map<TopicPartition, StateRestorer> stateRestorers = new HashMap<>();
     private final Map<TopicPartition, StateRestorer> needsRestoring = new HashMap<>();
     private final Map<TopicPartition, StateRestorer> needsInitializing = new HashMap<>();
-    private final Map<TopicPartition, Long> endOffsets = new HashMap<>();
-    private boolean initialized = false;
 
-    public StoreChangelogReader(final String threadId, final Consumer<byte[], byte[]>
consumer, final Time time, final long partitionValidationTimeoutMs) {
-        this.time = time;
+    public StoreChangelogReader(final String threadId,
+                                final Consumer<byte[], byte[]> consumer) {
         this.consumer = consumer;
-        this.partitionValidationTimeoutMs = partitionValidationTimeoutMs;
 
         this.logPrefix = String.format("stream-thread [%s]", threadId);
     }
 
-    public StoreChangelogReader(final Consumer<byte[], byte[]> consumer, final Time
time, final long partitionValidationTimeoutMs) {
-        this("", consumer, time, partitionValidationTimeoutMs);
-    }
-
-    @Override
-    public void validatePartitionExists(final TopicPartition topicPartition, final String
storeName) {
-        final long start = time.milliseconds();
-        // fetch all info on all topics to avoid multiple remote calls
-        if (partitionInfo.isEmpty()) {
-            try {
-                partitionInfo.putAll(consumer.listTopics());
-            } catch (final TimeoutException e) {
-                log.warn("{} Could not list topics so will fall back to partition by partition
fetching", logPrefix);
-            }
-        }
-
-        final long endTime = time.milliseconds() + partitionValidationTimeoutMs;
-        while (!hasPartition(topicPartition) && time.milliseconds() < endTime)
{
-            try {
-                final List<PartitionInfo> partitions = consumer.partitionsFor(topicPartition.topic());
-                if (partitions != null) {
-                    partitionInfo.put(topicPartition.topic(), partitions);
-                }
-            } catch (final TimeoutException e) {
-                throw new StreamsException(String.format("Could not fetch partition info
for topic: %s before expiration of the configured request timeout",
-                                                         topicPartition.topic()));
-            }
-        }
-
-        if (!hasPartition(topicPartition)) {
-            throw new StreamsException(String.format("Store %s's change log (%s) does not
contain partition %s",
-                                                     storeName, topicPartition.topic(), topicPartition.partition()));
-        }
-        log.debug("{} Took {}ms to validate that partition {} exists", logPrefix, time.milliseconds()
- start, topicPartition);
+    public StoreChangelogReader(final Consumer<byte[], byte[]> consumer) {
+        this("", consumer);
     }
 
     @Override
@@ -100,7 +64,6 @@ public class StoreChangelogReader implements ChangelogReader {
         needsInitializing.put(restorer.partition(), restorer);
     }
 
-
     public Collection<TopicPartition> restore() {
         if (!needsInitializing.isEmpty()) {
             initialize();
@@ -125,42 +88,75 @@ public class StoreChangelogReader implements ChangelogReader {
     }
 
     private void initialize() {
-        final Map<TopicPartition, StateRestorer> newTasksNeedingRestoration = new HashMap<>();
-
         if (!consumer.subscription().isEmpty()) {
-            throw new IllegalStateException(String.format("Restore consumer should have not
subscribed to any partitions (%s) beforehand", consumer.subscription()));
+            throw new IllegalStateException("Restore consumer should not be subscribed to
any topics (" + consumer.subscription() + ")");
+        }
+
+        // first refresh the changelog partition information from brokers, since initialize
is only called when
+        // the needsInitializing map is not empty, meaning we do not know the metadata for
some of them yet
+        refreshChangelogInfo();
+
+        Map<TopicPartition, StateRestorer> initializable = new HashMap<>();
+        for (Map.Entry<TopicPartition, StateRestorer> entry : needsInitializing.entrySet())
{
+            final TopicPartition topicPartition = entry.getKey();
+            if (hasPartition(topicPartition)) {
+                initializable.put(entry.getKey(), entry.getValue());
+            }
+        }
+
+        // try to fetch end offsets for the initializable restorers and remove any partitions
+        // where we already have all of the data
+        try {
+            endOffsets.putAll(consumer.endOffsets(initializable.keySet()));
+        } catch (final TimeoutException e) {
+            // if timeout exception gets thrown we just give up this time and retry in the
next run loop
+            log.debug("{} Could not fetch end offset for {}; will fall back to partition
by partition fetching", logPrefix, initializable);
+            return;
         }
-        endOffsets.putAll(consumer.endOffsets(needsInitializing.keySet()));
-
-        // remove any partitions where we already have all of the data
-        for (final Map.Entry<TopicPartition, Long> entry : endOffsets.entrySet()) {
-            TopicPartition topicPartition = entry.getKey();
-            Long offset = entry.getValue();
-            final StateRestorer restorer = needsInitializing.get(topicPartition);
-            // might be null as has was initialized in a previous invocation.
-            if (restorer != null) {
-                if (restorer.checkpoint() >= offset) {
+
+        final Iterator<TopicPartition> iter = initializable.keySet().iterator();
+        while (iter.hasNext()) {
+            final TopicPartition topicPartition = iter.next();
+            final Long endOffset = endOffsets.get(topicPartition);
+
+            // offset should not be null; but since the consumer API does not guarantee it
+            // we add this check just in case
+            if (endOffset != null) {
+                final StateRestorer restorer = needsInitializing.get(topicPartition);
+                if (restorer.checkpoint() >= endOffset) {
                     restorer.setRestoredOffset(restorer.checkpoint());
-                } else if (restorer.offsetLimit() == 0 || endOffsets.get(topicPartition)
== 0) {
+                    iter.remove();
+                } else if (restorer.offsetLimit() == 0 || endOffset == 0) {
                     restorer.setRestoredOffset(0);
-                } else {
-                    newTasksNeedingRestoration.put(topicPartition, restorer);
+                    iter.remove();
                 }
+                needsInitializing.remove(topicPartition);
+            } else {
+                log.info("{} End offset cannot be found form the returned metadata; removing
this partition from the current run loop", logPrefix);
+                iter.remove();
             }
         }
 
-        log.debug("{} Starting restoring state stores from changelog topics {}", logPrefix,
newTasksNeedingRestoration.keySet());
+        // set up restorer for those initializable
+        if (!initializable.isEmpty()) {
+            startRestoration(initializable);
+        }
+    }
+
+    private void startRestoration(final Map<TopicPartition, StateRestorer> initialized)
{
+        log.debug("{} Start restoring state stores from changelog topics {}", logPrefix,
initialized.keySet());
 
         final Set<TopicPartition> assignment = new HashSet<>(consumer.assignment());
-        assignment.addAll(newTasksNeedingRestoration.keySet());
+        assignment.addAll(initialized.keySet());
         consumer.assign(assignment);
+
         final List<StateRestorer> needsPositionUpdate = new ArrayList<>();
-        for (final StateRestorer restorer : newTasksNeedingRestoration.values()) {
+        for (final StateRestorer restorer : initialized.values()) {
             if (restorer.checkpoint() != StateRestorer.NO_CHECKPOINT) {
                 consumer.seek(restorer.partition(), restorer.checkpoint());
                 logRestoreOffsets(restorer.partition(),
-                                  restorer.checkpoint(),
-                                  endOffsets.get(restorer.partition()));
+                        restorer.checkpoint(),
+                        endOffsets.get(restorer.partition()));
                 restorer.setStartingOffset(consumer.position(restorer.partition()));
             } else {
                 consumer.seekToBeginning(Collections.singletonList(restorer.partition()));
@@ -170,14 +166,13 @@ public class StoreChangelogReader implements ChangelogReader {
 
         for (final StateRestorer restorer : needsPositionUpdate) {
             final long position = consumer.position(restorer.partition());
-            restorer.setStartingOffset(position);
             logRestoreOffsets(restorer.partition(),
-                              position,
-                              endOffsets.get(restorer.partition()));
+                    position,
+                    endOffsets.get(restorer.partition()));
+            restorer.setStartingOffset(position);
         }
 
-        needsRestoring.putAll(newTasksNeedingRestoration);
-        needsInitializing.clear();
+        needsRestoring.putAll(initialized);
     }
 
     private void logRestoreOffsets(final TopicPartition partition, final long startingOffset,
final Long endOffset) {
@@ -195,6 +190,14 @@ public class StoreChangelogReader implements ChangelogReader {
         return completed;
     }
 
+    private void refreshChangelogInfo() {
+        try {
+            partitionInfo.putAll(consumer.listTopics());
+        } catch (final TimeoutException e) {
+            log.debug("{} Could not fetch topic metadata within the timeout, will retry in
the next run loop", logPrefix);
+        }
+    }
+
     @Override
     public Map<TopicPartition, Long> restoredOffsets() {
         final Map<TopicPartition, Long> restoredOffsets = new HashMap<>();
@@ -271,6 +274,5 @@ public class StoreChangelogReader implements ChangelogReader {
         }
 
         return false;
-
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f0c060e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 69cb328..c0acae5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -425,23 +425,23 @@ public class StreamThread extends Thread {
 
         consumer = clientSupplier.getConsumer(consumerConfigs);
         log.info("{} Creating restore consumer client", logPrefix);
+
         restoreConsumer = clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(threadClientId));
         // standby KTables
         standbyRecords = new HashMap<>();
 
 
         this.stateDirectory = stateDirectory;
-        pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
-        commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
+        this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
+        this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
 
         this.time = time;
-        timerStartedMs = time.milliseconds();
-        lastCommitMs = timerStartedMs;
-        final Integer requestTimeOut = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
-        rebalanceListener = new RebalanceListener(time);
-        active = new AssignedTasks<>(logPrefix, "stream task", Time.SYSTEM);
-        standby = new AssignedTasks<>(logPrefix, "standby task", Time.SYSTEM);
-        storeChangelogReader = new StoreChangelogReader(getName(), restoreConsumer, time,
requestTimeOut);
+        this.timerStartedMs = time.milliseconds();
+        this.lastCommitMs = timerStartedMs;
+        this.rebalanceListener = new RebalanceListener(time);
+        this.active = new AssignedTasks<>(logPrefix, "stream task", Time.SYSTEM);
+        this.standby = new AssignedTasks<>(logPrefix, "standby task", Time.SYSTEM);
+        this.storeChangelogReader = new StoreChangelogReader(getName(), restoreConsumer);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f0c060e/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index 3e2fc39..8ba22f4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
@@ -119,7 +118,7 @@ public class AbstractTaskTest {
                                                       Collections.<String, String>emptyMap(),
                                                       Collections.<StateStore>emptyList()),
                                 consumer,
-                                new StoreChangelogReader(consumer, Time.SYSTEM, 5000),
+                                new StoreChangelogReader(consumer),
                                 false,
                                 stateDirectory,
                                 config) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f0c060e/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
index 4daff64..4904594 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
@@ -66,8 +66,8 @@ public class AssignedTasksTest {
     public void shouldGetPartitionsFromNewTasksThatHaveStateStores() {
         EasyMock.expect(t1.hasStateStores()).andReturn(true);
         EasyMock.expect(t2.hasStateStores()).andReturn(true);
-        EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
-        EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2));
+        EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes();
+        EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2)).anyTimes();
         EasyMock.replay(t1, t2);
 
         assignedTasks.addNewTask(t1);
@@ -82,6 +82,8 @@ public class AssignedTasksTest {
     public void shouldNotGetPartitionsFromNewTasksWithoutStateStores() {
         EasyMock.expect(t1.hasStateStores()).andReturn(false);
         EasyMock.expect(t2.hasStateStores()).andReturn(false);
+        EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes();
+        EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2)).anyTimes();
         EasyMock.replay(t1, t2);
 
         assignedTasks.addNewTask(t1);
@@ -95,6 +97,7 @@ public class AssignedTasksTest {
     @Test
     public void shouldInitializeNewTasks() {
         EasyMock.expect(t1.initialize()).andReturn(false);
+        EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes();
         EasyMock.replay(t1);
 
         assignedTasks.addNewTask(t1);
@@ -107,7 +110,8 @@ public class AssignedTasksTest {
     public void shouldMoveInitializedTasksNeedingRestoreToRestoring() {
         EasyMock.expect(t1.initialize()).andReturn(false);
         EasyMock.expect(t2.initialize()).andReturn(true);
-        EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2));
+        EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes();
+        EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2)).anyTimes();
         EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
 
         EasyMock.replay(t1, t2);
@@ -125,7 +129,7 @@ public class AssignedTasksTest {
     @Test
     public void shouldMoveInitializedTasksThatDontNeedRestoringToRunning() {
         EasyMock.expect(t2.initialize()).andReturn(true);
-        EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2));
+        EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2)).anyTimes();
         EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
 
         EasyMock.replay(t2);
@@ -169,6 +173,7 @@ public class AssignedTasksTest {
     public void shouldCloseUnInitializedTasksOnSuspend() {
         t1.close(false, false);
         EasyMock.expectLastCall();
+        EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes();
         EasyMock.replay(t1);
 
         assignedTasks.addNewTask(t1);
@@ -217,7 +222,7 @@ public class AssignedTasksTest {
 
     private void mockInitializedTask() {
         EasyMock.expect(t1.initialize()).andReturn(true);
-        EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
+        EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes();
         EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f0c060e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 8621470..329aab2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -29,7 +29,6 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
@@ -126,7 +125,7 @@ public class StandbyTaskTest {
 
     private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
     private final MockRestoreConsumer restoreStateConsumer = new MockRestoreConsumer();
-    private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer,
Time.SYSTEM, 5000);
+    private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer);
 
     private final byte[] recordValue = intSerializer.serialize(null, 10);
     private final byte[] recordKey = intSerializer.serialize(null, 1);

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f0c060e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 3144bb3..a43f083 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -22,11 +22,8 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.test.MockRestoreCallback;
 import org.hamcrest.CoreMatchers;
 import org.junit.Test;
@@ -35,6 +32,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsEqual.equalTo;
@@ -44,88 +42,28 @@ import static org.junit.Assert.fail;
 public class StoreChangelogReaderTest {
 
     private final MockRestoreCallback callback = new MockRestoreCallback();
-    private MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-    private StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, new
MockTime(), 0);
+    private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+    private final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer);
     private final TopicPartition topicPartition = new TopicPartition("topic", 0);
-    private final PartitionInfo partitionInfo = new PartitionInfo(topicPartition.topic(),
0, null, null, null);
 
     @SuppressWarnings("unchecked")
     @Test
-    public void shouldThrowStreamsExceptionWhenTimeoutExceptionThrown() throws Exception
{
-        final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST)
{
+    public void shouldRequestTopicsAndHandleTimeoutException() throws Exception {
+        final AtomicBoolean functionCalled = new AtomicBoolean(false);
+        final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST)
{
             @Override
             public Map<String, List<PartitionInfo>> listTopics() {
+                functionCalled.set(true);
                 throw new TimeoutException("KABOOM!");
             }
         };
-        final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, new
MockTime(), 0);
-        try {
-            changelogReader.validatePartitionExists(topicPartition, "store");
-            fail("Should have thrown streams exception");
-        } catch (final StreamsException e) {
-            // pass
-        }
-    }
-
-    @Test(expected = StreamsException.class)
-    public void shouldThrowStreamsExceptionIfPartitionDoesntExistAfterMaxWait() throws Exception
{
-        changelogReader.validatePartitionExists(topicPartition, "store");
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void shouldFallbackToPartitionsForIfPartitionNotInAllPartitionsList() throws Exception
{
-        final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST)
{
-            @Override
-            public List<PartitionInfo> partitionsFor(final String topic) {
-                return Collections.singletonList(partitionInfo);
-            }
-        };
 
-        final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, new
MockTime(), 10);
-        changelogReader.validatePartitionExists(topicPartition, "store");
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void shouldThrowStreamsExceptionIfTimeoutOccursDuringPartitionsFor() throws Exception
{
-        final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST)
{
-            @Override
-            public List<PartitionInfo> partitionsFor(final String topic) {
-                throw new TimeoutException("KABOOM!");
-            }
-        };
-        final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, new
MockTime(), 5);
-        try {
-            changelogReader.validatePartitionExists(topicPartition, "store");
-            fail("Should have thrown streams exception");
-        } catch (final StreamsException e) {
-            // pass
-        }
-    }
-
-    @Test
-    public void shouldPassIfTopicPartitionExists() throws Exception {
-        consumer.updatePartitions(topicPartition.topic(), Collections.singletonList(partitionInfo));
-        changelogReader.validatePartitionExists(topicPartition, "store");
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void shouldRequestPartitionInfoIfItDoesntExist() throws Exception {
-        final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST)
{
-            @Override
-            public Map<String, List<PartitionInfo>> listTopics() {
-                return Collections.emptyMap();
-            }
-        };
-
-        consumer.updatePartitions(topicPartition.topic(), Collections.singletonList(partitionInfo));
-        final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, Time.SYSTEM,
5000);
-        changelogReader.validatePartitionExists(topicPartition, "store");
+        final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer);
+        changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE,
true));
+        changelogReader.restore();
+        assertTrue(functionCalled.get());
     }
 
-
     @Test
     public void shouldThrowExceptionIfConsumerHasCurrentSubscription() throws Exception {
         consumer.subscribe(Collections.singleton("sometopic"));
@@ -172,7 +110,6 @@ public class StoreChangelogReaderTest {
         setupConsumer(10, topicPartition);
         final StateRestorer restorer = new StateRestorer(topicPartition, callback, null,
3, true);
         changelogReader.register(restorer);
-
         changelogReader.restore();
         assertThat(callback.restored.size(), equalTo(3));
         assertThat(restorer.restoredOffset(), equalTo(3L));
@@ -315,18 +252,18 @@ public class StoreChangelogReaderTest {
 
         assertTrue(changelogReader.restore().isEmpty());
 
+        addRecords(9, topicPartition, 1);
+
         final TopicPartition postInitialization = new TopicPartition("other", 0);
+        setupConsumer(3, postInitialization);
         consumer.updateBeginningOffsets(Collections.singletonMap(postInitialization, 0L));
         consumer.updateEndOffsets(Collections.singletonMap(postInitialization, 3L));
 
         changelogReader.register(new StateRestorer(postInitialization, callbackTwo, null,
Long.MAX_VALUE, false));
 
-        addRecords(9, topicPartition, 1);
-
         final Collection<TopicPartition> expected = Utils.mkSet(topicPartition, postInitialization);
-
         consumer.assign(expected);
-        addRecords(3, postInitialization, 0);
+
         assertThat(changelogReader.restore(), equalTo(expected));
         assertThat(callback.restored.size(), equalTo(10));
         assertThat(callbackTwo.restored.size(), equalTo(3));

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f0c060e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index ee3f778..cff145e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -32,7 +32,6 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
@@ -104,7 +103,7 @@ public class StreamTaskTest {
     private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
     private final MockProducer<byte[], byte[]> producer = new MockProducer<>(false,
bytesSerializer, bytesSerializer);
     private final MockConsumer<byte[], byte[]> restoreStateConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-    private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer,
Time.SYSTEM, 5000);
+    private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer);
     private final byte[] recordValue = intSerializer.serialize(null, 10);
     private final byte[] recordKey = intSerializer.serialize(null, 1);
     private final String applicationId = "applicationId";

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f0c060e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 9209b56..230d8bb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -82,7 +82,6 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 public class StreamThreadTest {
 
@@ -92,11 +91,11 @@ public class StreamThreadTest {
     private final Metrics metrics = new Metrics();
     private final MockClientSupplier clientSupplier = new MockClientSupplier();
     private UUID processId = UUID.randomUUID();
-    final KStreamBuilder builder = new KStreamBuilder();
+    private final KStreamBuilder builder = new KStreamBuilder();
     private final StreamsConfig config = new StreamsConfig(configProps(false));
     private final String stateDir = TestUtils.tempDirectory().getPath();
     private final StateDirectory stateDirectory  = new StateDirectory("applicationId", stateDir,
mockTime);
-    private final ChangelogReader changelogReader = new StoreChangelogReader(clientSupplier.restoreConsumer,
Time.SYSTEM, 5000);
+    private final ChangelogReader changelogReader = new StoreChangelogReader(clientSupplier.restoreConsumer);
 
     @Before
     public void setUp() throws Exception {
@@ -151,9 +150,8 @@ public class StreamThreadTest {
     // task0 is unused
     private final TaskId task1 = new TaskId(0, 1);
     private final TaskId task2 = new TaskId(0, 2);
-    private final TaskId task3 = new TaskId(0, 3);
-    private final TaskId task4 = new TaskId(1, 1);
-    private final TaskId task5 = new TaskId(1, 2);
+    private final TaskId task3 = new TaskId(1, 1);
+    private final TaskId task4 = new TaskId(1, 2);
 
     private Properties configProps(final boolean enableEos) {
         return new Properties() {
@@ -396,10 +394,10 @@ public class StreamThreadTest {
         rebalanceListener.onPartitionsAssigned(assignedPartitions);
         thread.runOnce(-1);
 
+        assertTrue(thread.tasks().containsKey(task3));
         assertTrue(thread.tasks().containsKey(task4));
-        assertTrue(thread.tasks().containsKey(task5));
-        assertEquals(expectedGroup1, thread.tasks().get(task4).partitions());
-        assertEquals(expectedGroup2, thread.tasks().get(task5).partitions());
+        assertEquals(expectedGroup1, thread.tasks().get(task3).partitions());
+        assertEquals(expectedGroup2, thread.tasks().get(task4).partitions());
         assertEquals(2, thread.tasks().size());
 
         // revoke four partitions and assign three partitions of both subtopologies
@@ -410,14 +408,14 @@ public class StreamThreadTest {
         expectedGroup1 = new HashSet<>(Collections.singleton(t1p1));
         expectedGroup2 = new HashSet<>(Arrays.asList(t2p1, t3p1));
         activeTasks.put(new TaskId(0, 1), expectedGroup1);
-        activeTasks.remove(task5);
+        activeTasks.remove(task4);
         rebalanceListener.onPartitionsAssigned(assignedPartitions);
         thread.runOnce(-1);
 
         assertTrue(thread.tasks().containsKey(task1));
-        assertTrue(thread.tasks().containsKey(task4));
+        assertTrue(thread.tasks().containsKey(task3));
         assertEquals(expectedGroup1, thread.tasks().get(task1).partitions());
-        assertEquals(expectedGroup2, thread.tasks().get(task4).partitions());
+        assertEquals(expectedGroup2, thread.tasks().get(task3).partitions());
         assertEquals(2, thread.tasks().size());
 
         // revoke all three partitons and reassign the same three partitions (from different
subtopologies)
@@ -430,9 +428,9 @@ public class StreamThreadTest {
         thread.runOnce(-1);
 
         assertTrue(thread.tasks().containsKey(task1));
-        assertTrue(thread.tasks().containsKey(task4));
+        assertTrue(thread.tasks().containsKey(task3));
         assertEquals(expectedGroup1, thread.tasks().get(task1).partitions());
-        assertEquals(expectedGroup2, thread.tasks().get(task4).partitions());
+        assertEquals(expectedGroup2, thread.tasks().get(task3).partitions());
         assertEquals(2, thread.tasks().size());
 
         // revoke all partitions and assign nothing
@@ -1579,12 +1577,8 @@ public class StreamThreadTest {
         thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
         thread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions);
         thread.runOnce(-1);
-        try {
-            thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
-            fail("should have thrown exception");
-        } catch (final Exception e) {
-            // expected
-        }
+        thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+
         assertFalse(testStreamTask.committed);
     }
 
@@ -1641,12 +1635,8 @@ public class StreamThreadTest {
         thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
         thread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions);
         thread.runOnce(-1);
-        try {
-            thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
-            fail("should have thrown exception");
-        } catch (final Exception e) {
-            // expected
-        }
+        thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+
         assertFalse(testStreamTask.committed);
     }
 
@@ -1745,15 +1735,7 @@ public class StreamThreadTest {
         final StateDirectory stateDirMock = mockStateDirInteractions(taskId);
         final StreamThread thread = setupTest(taskId, stateDirMock);
 
-
-        try {
-            thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
-            fail("Should have thrown exception");
-        } catch (final Exception e) {
-            //
-        } finally {
-            thread.close();
-        }
+        thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
 
         EasyMock.verify(stateDirMock);
     }
@@ -1764,9 +1746,10 @@ public class StreamThreadTest {
         final StateDirectory stateDirMock = mockStateDirInteractions(taskId);
 
         final StreamThread thread = setupTest(taskId, stateDirMock);
-        thread.start();
+
         thread.close();
-        thread.join();
+        thread.shutdown(true);
+
         EasyMock.verify(stateDirMock);
     }
 
@@ -1840,14 +1823,8 @@ public class StreamThreadTest {
         final StateDirectory stateDirMock = mockStateDirInteractions(taskId);
         final StreamThread thread = setupStandbyTest(taskId, stateDirMock);
 
-        try {
-            thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
-            fail("Should have thrown exception");
-        } catch (final Exception e) {
-            // ok
-        } finally {
-            thread.close();
-        }
+        thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+
         EasyMock.verify(stateDirMock);
     }
 
@@ -1916,7 +1893,7 @@ public class StreamThreadTest {
                     partitions,
                     builder.build(0),
                     clientSupplier.consumer,
-                    new StoreChangelogReader(getName(), clientSupplier.restoreConsumer, mockTime,
1000),
+                    new StoreChangelogReader(getName(), clientSupplier.restoreConsumer),
                     StreamThreadTest.this.config,
                     new StreamsMetricsImpl(new Metrics(), "groupName", Collections.<String,
String>emptyMap()),
                     stateDirectory) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f0c060e/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 549c67c..e24f535 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -201,7 +201,7 @@ public class StreamThreadStateStoreProviderTest {
             Collections.singletonList(new TopicPartition(topicName, taskId.partition)),
             topology,
             clientSupplier.consumer,
-            new StoreChangelogReader(clientSupplier.restoreConsumer, Time.SYSTEM, 5000),
+            new StoreChangelogReader(clientSupplier.restoreConsumer),
             streamsConfig,
             new MockStreamsMetrics(new Metrics()),
             stateDirectory,

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f0c060e/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java b/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java
index 86c0eb5..54fd858 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java
@@ -30,13 +30,8 @@ public class MockChangelogReader implements ChangelogReader {
     private final Set<TopicPartition> registered = new HashSet<>();
 
     @Override
-    public void validatePartitionExists(final TopicPartition topicPartition, final String
storeName) {
-
-    }
-
-    @Override
-    public void register(final StateRestorer restorationInfo) {
-        registered.add(restorationInfo.partition());
+    public void register(final StateRestorer restorer) {
+        registered.add(restorer.partition());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f0c060e/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index fa09c01..228171a 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -216,10 +216,7 @@ public class ProcessorTopologyTestDriver {
                                   partitionsByTopic.values(),
                                   topology,
                                   consumer,
-                                  new StoreChangelogReader(
-                                      createRestoreConsumer(topology.storeToChangelogTopic()),
-                                      Time.SYSTEM,
-                                      5000),
+                                  new StoreChangelogReader(createRestoreConsumer(topology.storeToChangelogTopic())),
                                   config,
                                   streamsMetrics, stateDirectory,
                                   cache,
@@ -361,7 +358,7 @@ public class ProcessorTopologyTestDriver {
      * @return the state store, or null if no store has been registered with the given name
      * @see #getKeyValueStore(String)
      */
-    public StateStore getStateStore(final String name) {
+    private StateStore getStateStore(final String name) {
         return ((ProcessorContextImpl) task.context()).getStateMgr().getStore(name);
     }
 


Mime
View raw message