kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: KAFKA-5986; Streams State Restoration never completes when logging is disabled
Date Fri, 29 Sep 2017 14:10:34 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 36556b804 -> 3107a6c5c


KAFKA-5986; Streams State Restoration never completes when logging is disabled

When logging is disabled and there are state stores the task never transitions from restoring
to running. This is because we only ever check if the task has state stores and return false
on initialization if it does. The check should be if we have changelog partitions, i.e., we
need to restore.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>,
tedyu <yuzhihong@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #3983 from dguy/restore-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3107a6c5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3107a6c5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3107a6c5

Branch: refs/heads/trunk
Commit: 3107a6c5c8d1358b8e705c5d5a16b7441d2225a6
Parents: 36556b8
Author: Damian Guy <damian.guy@gmail.com>
Authored: Fri Sep 29 15:07:41 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Fri Sep 29 15:07:41 2017 +0100

----------------------------------------------------------------------
 .../streams/processor/internals/StreamTask.java |  2 +-
 .../integration/RestoreIntegrationTest.java     | 56 ++++++++++++++-----
 .../processor/internals/StreamTaskTest.java     | 58 +++++++++++++++++++-
 3 files changed, 101 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3107a6c5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 3d6c9b9..8180b2c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -164,7 +164,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         initTopology();
         processorContext.initialized();
         taskInitialized = true;
-        return topology.stateStores().isEmpty();
+        return changelogPartitions().isEmpty();
     }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3107a6c5/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index 31b7222..ae36ad8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -27,17 +27,22 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.processor.StateRestoreListener;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
-import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -65,19 +70,15 @@ public class RestoreIntegrationTest {
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER =
             new EmbeddedKafkaCluster(NUM_BROKERS);
-    private final String inputStream = "input-stream";
+    private static final String INPUT_STREAM = "input-stream";
     private final int numberOfKeys = 10000;
     private KafkaStreams kafkaStreams;
     private String applicationId = "restore-test";
 
 
-    private void createTopics() throws InterruptedException {
-        CLUSTER.createTopic(inputStream, 2, 1);
-    }
-
-    @Before
-    public void before() throws IOException, InterruptedException {
-        createTopics();
+    @BeforeClass
+    public static void createTopics() throws InterruptedException {
+        CLUSTER.createTopic(INPUT_STREAM, 2, 1);
     }
 
     private Properties props() {
@@ -107,7 +108,7 @@ public class RestoreIntegrationTest {
 
         createStateForRestoration();
 
-        builder.table(inputStream, Consumed.with(Serdes.Integer(), Serdes.Integer()))
+        builder.table(INPUT_STREAM, Consumed.with(Serdes.Integer(), Serdes.Integer()))
                 .toStream()
                 .foreach(new ForeachAction<Integer, Integer>() {
                     @Override
@@ -153,6 +154,35 @@ public class RestoreIntegrationTest {
     }
 
 
+    @Test
+    public void shouldSuccessfullyStartWhenLoggingDisabled() throws InterruptedException
{
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<Integer, Integer> stream = builder.stream(INPUT_STREAM);
+        stream.groupByKey()
+                .reduce(new Reducer<Integer>() {
+                    @Override
+                    public Integer apply(final Integer value1, final Integer value2) {
+                        return value1 + value2;
+                    }
+                }, Materialized.<Integer, Integer, KeyValueStore<Bytes, byte[]>>as("reduce-store").withLoggingDisabled());
+
+        final CountDownLatch startupLatch = new CountDownLatch(1);
+        kafkaStreams = new KafkaStreams(builder.build(), props());
+        kafkaStreams.setStateListener(new KafkaStreams.StateListener() {
+            @Override
+            public void onChange(final KafkaStreams.State newState, final KafkaStreams.State
oldState) {
+                if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING)
{
+                    startupLatch.countDown();
+                }
+            }
+        });
+
+        kafkaStreams.start();
+
+        assertTrue(startupLatch.await(30, TimeUnit.SECONDS));
+    }
+    
     private void createStateForRestoration()
             throws ExecutionException, InterruptedException {
         final Properties producerConfig = new Properties();
@@ -162,7 +192,7 @@ public class RestoreIntegrationTest {
                      new KafkaProducer<>(producerConfig, new IntegerSerializer(), new
IntegerSerializer())) {
 
             for (int i = 0; i < numberOfKeys; i++) {
-                producer.send(new ProducerRecord<>(inputStream, i, i));
+                producer.send(new ProducerRecord<>(INPUT_STREAM, i, i));
             }
         }
 
@@ -173,8 +203,8 @@ public class RestoreIntegrationTest {
         consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
 
         final Consumer consumer = new KafkaConsumer(consumerConfig);
-        final List<TopicPartition> partitions = Arrays.asList(new TopicPartition(inputStream,
0),
-                                                              new TopicPartition(inputStream,
1));
+        final List<TopicPartition> partitions = Arrays.asList(new TopicPartition(INPUT_STREAM,
0),
+                                                              new TopicPartition(INPUT_STREAM,
1));
 
         consumer.assign(partitions);
         consumer.seekToEnd(partitions);

http://git-wip-us.apache.org/repos/asf/kafka/blob/3107a6c5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index bbed615..897a2c5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -53,6 +53,7 @@ import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.MockProcessorNode;
 import org.apache.kafka.test.MockSourceNode;
 import org.apache.kafka.test.MockStateRestoreListener;
+import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.NoOpProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
@@ -125,7 +126,6 @@ public class StreamTaskTest {
     private final MockTime time = new MockTime();
     private File baseDir = TestUtils.tempDirectory();
     private StateDirectory stateDirectory;
-    private final RecordCollectorImpl recordCollector = new RecordCollectorImpl(producer,
"taskId", new LogContext("taskId "));
     private StreamsConfig config;
     private StreamsConfig eosConfig;
     private StreamTask task;
@@ -1018,6 +1018,62 @@ public class StreamTaskTest {
         }
     }
 
+    @Test
+    public void shouldBeInitializedIfChangelogPartitionsIsEmpty() {
+        final ProcessorTopology topology = new ProcessorTopology(Collections.<ProcessorNode>singletonList(source1),
+                                                                 Collections.<String,
SourceNode>singletonMap(topic1[0], source1),
+                                                                 Collections.<String,
SinkNode>emptyMap(),
+                                                                 Collections.<StateStore>singletonList(
+                                                                         new MockStateStoreSupplier.MockStateStore("store",
+                                                                                        
                          false)),
+                                                                 Collections.<String,
String>emptyMap(),
+                                                                 Collections.<StateStore>emptyList());
+
+
+        final StreamTask task = new StreamTask(taskId00,
+                                               applicationId,
+                                               Utils.mkSet(partition1),
+                                               topology,
+                                               consumer,
+                                               changelogReader,
+                                               config,
+                                               streamsMetrics,
+                                               stateDirectory,
+                                               null,
+                                               time,
+                                               producer);
+
+        assertTrue(task.initialize());
+    }
+
+    @Test
+    public void shouldNotBeInitializedIfChangelogPartitionsIsNonEmpty() {
+        final ProcessorTopology topology = new ProcessorTopology(Collections.<ProcessorNode>singletonList(source1),
+                                                                 Collections.<String,
SourceNode>singletonMap(topic1[0], source1),
+                                                                 Collections.<String,
SinkNode>emptyMap(),
+                                                                 Collections.<StateStore>singletonList(
+                                                                         new MockStateStoreSupplier.MockStateStore("store",
+                                                                                        
                          false)),
+                                                                 Collections.singletonMap("store",
"changelog"),
+                                                                 Collections.<StateStore>emptyList());
+
+
+        final StreamTask task = new StreamTask(taskId00,
+                                               applicationId,
+                                               Utils.mkSet(partition1),
+                                               topology,
+                                               consumer,
+                                               changelogReader,
+                                               config,
+                                               streamsMetrics,
+                                               stateDirectory,
+                                               null,
+                                               time,
+                                               producer);
+
+        assertFalse(task.initialize());
+    }
+
     @SuppressWarnings("unchecked")
     private StreamTask createTaskThatThrowsExceptionOnClose() {
         final MockSourceNode processorNode = new MockSourceNode(topic1, intDeserializer,
intDeserializer) {


Mime
View raw message