kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: KAFKA-5986; Streams State Restoration never completes when logging is disabled
Date Fri, 29 Sep 2017 14:24:44 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 84bc74a4a -> fae2d2386


KAFKA-5986; Streams State Restoration never completes when logging is disabled

When logging is disabled and there are state stores the task never transitions from restoring
to running. This is because we only ever check if the task has state stores and return false
on initialization if it does. The check should be if we have changelog partitions, i.e., we
need to restore.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>,
tedyu <yuzhihong@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #3983 from dguy/restore-test

(cherry picked from commit 3107a6c5c8d1358b8e705c5d5a16b7441d2225a6)
Signed-off-by: Damian Guy <damian.guy@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fae2d238
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fae2d238
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fae2d238

Branch: refs/heads/0.11.0
Commit: fae2d23868e22ee2e6cd59809db0a6defc3734bc
Parents: 84bc74a
Author: Damian Guy <damian.guy@gmail.com>
Authored: Fri Sep 29 15:07:41 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Fri Sep 29 15:24:32 2017 +0100

----------------------------------------------------------------------
 .../streams/processor/internals/StreamTask.java | 18 ++++---
 .../processor/internals/StreamTaskTest.java     | 57 ++++++++++++++++++++
 2 files changed, 67 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fae2d238/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 149b938..de45800 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -146,6 +146,16 @@ public class StreamTask extends AbstractTask implements Punctuator {
         }
     }
 
+    @Override
+    public boolean initialize() {
+        log.trace("Initializing");
+        initializeStateStores();
+        initTopology();
+        processorContext.initialized();
+        taskInitialized = true;
+        return changelogPartitions().isEmpty();
+    }
+
     /**
      * <pre>
      * - re-initialize the task
@@ -561,12 +571,4 @@ public class StreamTask extends AbstractTask implements Punctuator {
         return new RecordCollectorImpl(producer, id.toString());
     }
 
-    public boolean initialize() {
-        log.debug("{} Initializing", logPrefix);
-        initializeStateStores();
-        initTopology();
-        processorContext.initialized();
-        return topology.stateStores().isEmpty();
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fae2d238/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index cff145e..781979e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -44,6 +44,7 @@ import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.MockProcessorNode;
 import org.apache.kafka.test.MockSourceNode;
+import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.NoOpProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
@@ -800,6 +801,62 @@ public class StreamTaskTest {
         }
     }
 
+    @Test
+    public void shouldBeInitializedIfChangelogPartitionsIsEmpty() {
+        final ProcessorTopology topology = new ProcessorTopology(Collections.<ProcessorNode>singletonList(source1),
+                                                                 Collections.<String,
SourceNode>singletonMap(topic1[0], source1),
+                                                                 Collections.<String,
SinkNode>emptyMap(),
+                                                                 Collections.<StateStore>singletonList(
+                                                                         new MockStateStoreSupplier.MockStateStore("store",
+                                                                                        
                          false)),
+                                                                 Collections.<String,
String>emptyMap(),
+                                                                 Collections.<StateStore>emptyList());
+
+
+        final StreamTask task = new StreamTask(taskId00,
+                                               applicationId,
+                                               Utils.mkSet(partition1),
+                                               topology,
+                                               consumer,
+                                               changelogReader,
+                                               config,
+                                               streamsMetrics,
+                                               stateDirectory,
+                                               null,
+                                               time,
+                                               producer);
+
+        assertTrue(task.initialize());
+    }
+
+    @Test
+    public void shouldNotBeInitializedIfChangelogPartitionsIsNonEmpty() {
+        final ProcessorTopology topology = new ProcessorTopology(Collections.<ProcessorNode>singletonList(source1),
+                                                                 Collections.<String,
SourceNode>singletonMap(topic1[0], source1),
+                                                                 Collections.<String,
SinkNode>emptyMap(),
+                                                                 Collections.<StateStore>singletonList(
+                                                                         new MockStateStoreSupplier.MockStateStore("store",
+                                                                                        
                          false)),
+                                                                 Collections.singletonMap("store",
"changelog"),
+                                                                 Collections.<StateStore>emptyList());
+
+
+        final StreamTask task = new StreamTask(taskId00,
+                                               applicationId,
+                                               Utils.mkSet(partition1),
+                                               topology,
+                                               consumer,
+                                               changelogReader,
+                                               config,
+                                               streamsMetrics,
+                                               stateDirectory,
+                                               null,
+                                               time,
+                                               producer);
+
+        assertFalse(task.initialize());
+    }
+
 
     @SuppressWarnings("unchecked")
     private StreamTask createTaskThatThrowsExceptionOnClose() {


Mime
View raw message