kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject [1/3] kafka git commit: KAFKA-5152; move state restoration out of rebalance and into poll loop
Date Wed, 16 Aug 2017 10:14:12 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 22c9a8cb4 -> b268322ed


http://git-wip-us.apache.org/repos/asf/kafka/blob/b268322e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 28ead02..f4a0400 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -114,7 +114,6 @@ public class StreamTaskTest {
     private final MockTime time = new MockTime();
     private File baseDir = TestUtils.tempDirectory();
     private StateDirectory stateDirectory;
-    private final RecordCollectorImpl recordCollector = new RecordCollectorImpl(producer,
"taskId");
     private StreamsConfig config;
     private StreamsConfig eosConfig;
     private StreamTask task;
@@ -147,6 +146,7 @@ public class StreamTaskTest {
         stateDirectory = new StateDirectory("applicationId", baseDir.getPath(), new MockTime());
         task = new StreamTask(taskId00, applicationId, partitions, topology, consumer,
                               changelogReader, config, streamsMetrics, stateDirectory, null,
time, producer);
+        task.initialize();
     }
 
     @After
@@ -377,6 +377,7 @@ public class StreamTaskTest {
 
         task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
config,
             streamsMetrics, stateDirectory, null, time, producer);
+        task.initialize();
         final int offset = 20;
         task.addRecords(partition1, Collections.singletonList(
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), offset,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -494,6 +495,7 @@ public class StreamTaskTest {
                 };
             }
         };
+        streamTask.initialize();
 
         time.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));
 
@@ -611,6 +613,7 @@ public class StreamTaskTest {
     public void shouldThrowExceptionIfAnyExceptionsRaisedDuringCloseButStillCloseAllProcessorNodesTopology()
throws Exception {
         task.close(true);
         task = createTaskThatThrowsExceptionOnClose();
+        task.initialize();
         try {
             task.close(true);
             fail("should have thrown runtime exception");
@@ -777,6 +780,17 @@ public class StreamTaskTest {
         assertTrue(producer.closed());
     }
 
+    @Test
+    public void shouldNotCloseTopologyProcessorNodesIfNotInitialized() {
+        final StreamTask task = createTaskThatThrowsExceptionOnClose();
+        try {
+            task.close(true);
+        } catch (Exception e) {
+            fail("should have not closed unitialized topology");
+        }
+    }
+
+
     @SuppressWarnings("unchecked")
     private StreamTask createTaskThatThrowsExceptionOnClose() {
         final MockSourceNode processorNode = new MockSourceNode(topic1, intDeserializer,
intDeserializer) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b268322e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index ded1bfd..a3ffdda 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -41,6 +41,7 @@ import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockInternalTopicManager;
 import org.apache.kafka.test.MockProcessorSupplier;
@@ -96,6 +97,7 @@ public class StreamThreadTest {
     private final StreamsConfig config = new StreamsConfig(configProps(false));
     private final String stateDir = TestUtils.tempDirectory().getPath();
     private final StateDirectory stateDirectory  = new StateDirectory("applicationId", stateDir,
mockTime);
+    private final ChangelogReader changelogReader = new StoreChangelogReader(clientSupplier.restoreConsumer,
Time.SYSTEM, 5000);
 
     @Before
     public void setUp() throws Exception {
@@ -169,7 +171,7 @@ public class StreamThreadTest {
         };
     }
 
-    private static class TestStreamTask extends StreamTask {
+    private class TestStreamTask extends StreamTask {
         boolean committed = false;
         private boolean suspended;
         private boolean closed;
@@ -181,20 +183,20 @@ public class StreamThreadTest {
                        final ProcessorTopology topology,
                        final Consumer<byte[], byte[]> consumer,
                        final Producer<byte[], byte[]> producer,
-                       final Consumer<byte[], byte[]> restoreConsumer,
                        final StreamsConfig config,
                        final StreamsMetrics metrics,
-                       final StateDirectory stateDirectory) {
+                       final StateDirectory stateDirectory,
+                       final ChangelogReader storeChangelogReader) {
             super(id,
                 applicationId,
                 partitions,
                 topology,
                 consumer,
-                new StoreChangelogReader(restoreConsumer, Time.SYSTEM, 5000),
+                storeChangelogReader,
                 config,
                 metrics,
                 stateDirectory,
-                null,
+                new ThreadCache("", 0L, metrics),
                 new MockTime(),
                 producer);
         }
@@ -291,9 +293,10 @@ public class StreamThreadTest {
         activeTasks.put(new TaskId(0, 1), expectedGroup1);
         rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
+        Assert.assertEquals(stateListener.numChanges, 3);
+        Assert.assertEquals(StreamThread.State.PARTITIONS_REVOKED, stateListener.oldState);
+        thread.runOnce(-1);
         assertEquals(thread.state(), StreamThread.State.RUNNING);
-        Assert.assertEquals(stateListener.numChanges, 4);
-        Assert.assertEquals(stateListener.oldState, StreamThread.State.ASSIGNING_PARTITIONS);
         assertTrue(thread.tasks().containsKey(task1));
         assertEquals(expectedGroup1, thread.tasks().get(task1).partitions());
         assertEquals(1, thread.tasks().size());
@@ -311,7 +314,7 @@ public class StreamThreadTest {
         expectedGroup2 = new HashSet<>(Collections.singleton(t1p2));
         activeTasks.put(new TaskId(0, 2), expectedGroup2);
         rebalanceListener.onPartitionsAssigned(assignedPartitions);
-
+        thread.runOnce(-1);
         assertTrue(thread.tasks().containsKey(task2));
         assertEquals(expectedGroup2, thread.tasks().get(task2).partitions());
         assertEquals(1, thread.tasks().size());
@@ -326,7 +329,7 @@ public class StreamThreadTest {
         activeTasks.put(new TaskId(0, 1), expectedGroup1);
         activeTasks.put(new TaskId(0, 2), expectedGroup2);
         rebalanceListener.onPartitionsAssigned(assignedPartitions);
-
+        thread.runOnce(-1);
         assertTrue(thread.tasks().containsKey(task1));
         assertTrue(thread.tasks().containsKey(task2));
         assertEquals(expectedGroup1, thread.tasks().get(task1).partitions());
@@ -338,7 +341,7 @@ public class StreamThreadTest {
         rebalanceListener.onPartitionsRevoked(revokedPartitions);
         assignedPartitions = Collections.emptyList();
         rebalanceListener.onPartitionsAssigned(assignedPartitions);
-
+        thread.runOnce(-1);
         assertTrue(thread.tasks().isEmpty());
 
         thread.close();
@@ -389,6 +392,7 @@ public class StreamThreadTest {
         activeTasks.put(new TaskId(1, 1), expectedGroup1);
         activeTasks.put(new TaskId(1, 2), expectedGroup2);
         rebalanceListener.onPartitionsAssigned(assignedPartitions);
+        thread.runOnce(-1);
 
         assertTrue(thread.tasks().containsKey(task4));
         assertTrue(thread.tasks().containsKey(task5));
@@ -404,8 +408,9 @@ public class StreamThreadTest {
         expectedGroup1 = new HashSet<>(Collections.singleton(t1p1));
         expectedGroup2 = new HashSet<>(Arrays.asList(t2p1, t3p1));
         activeTasks.put(new TaskId(0, 1), expectedGroup1);
-        activeTasks.put(new TaskId(1, 1), expectedGroup2);
+        activeTasks.remove(task5);
         rebalanceListener.onPartitionsAssigned(assignedPartitions);
+        thread.runOnce(-1);
 
         assertTrue(thread.tasks().containsKey(task1));
         assertTrue(thread.tasks().containsKey(task4));
@@ -420,6 +425,7 @@ public class StreamThreadTest {
         expectedGroup1 = new HashSet<>(Collections.singleton(t1p1));
         expectedGroup2 = new HashSet<>(Arrays.asList(t2p1, t3p1));
         rebalanceListener.onPartitionsAssigned(assignedPartitions);
+        thread.runOnce(-1);
 
         assertTrue(thread.tasks().containsKey(task1));
         assertTrue(thread.tasks().containsKey(task4));
@@ -432,6 +438,7 @@ public class StreamThreadTest {
         rebalanceListener.onPartitionsRevoked(revokedPartitions);
         assignedPartitions = Collections.emptyList();
         rebalanceListener.onPartitionsAssigned(assignedPartitions);
+        thread.runOnce(-1);
 
         assertTrue(thread.tasks().isEmpty());
 
@@ -494,7 +501,14 @@ public class StreamThreadTest {
         );
         builder.addSource("source", TOPIC);
 
-        //clientSupplier.consumer.assign(Arrays.asList(new TopicPartition(TOPIC, 0), new
TopicPartition(TOPIC, 1)));
+        TopicPartition tp0 = new TopicPartition(TOPIC, 0);
+        TopicPartition tp1 = new TopicPartition(TOPIC, 1);
+        clientSupplier.consumer.assign(Arrays.asList(tp0, tp1));
+        final Map<TopicPartition, Long> offsets = new HashMap<>();
+        offsets.put(tp0, 0L);
+        offsets.put(tp1, 0L);
+        clientSupplier.consumer.updateBeginningOffsets(offsets);
+
 
         final StreamThread thread1 = new StreamThread(
             builder,
@@ -530,30 +544,17 @@ public class StreamThreadTest {
         thread1.setPartitionAssignor(new MockStreamsPartitionAssignor(thread1Assignment));
         thread2.setPartitionAssignor(new MockStreamsPartitionAssignor(thread2Assignment));
 
-
-        thread1.start();
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return thread1.state() == StreamThread.State.RUNNING;
-            }
-        }, 10 * 1000, "Thread never started.");
-
-        thread2.start();
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return thread2.state() == StreamThread.State.RUNNING;
-            }
-        }, 10 * 1000, "Thread never started.");
-
         // revoke (to get threads in correct state)
+        thread1.setState(StreamThread.State.RUNNING);
+        thread2.setState(StreamThread.State.RUNNING);
         thread1.rebalanceListener.onPartitionsRevoked(EMPTY_SET);
         thread2.rebalanceListener.onPartitionsRevoked(EMPTY_SET);
 
         // assign
         thread1.rebalanceListener.onPartitionsAssigned(task0Assignment);
+        thread1.runOnce(-1);
         thread2.rebalanceListener.onPartitionsAssigned(task1Assignment);
+        thread2.runOnce(-1);
 
         final Set<TaskId> originalTaskAssignmentThread1 = new HashSet<>();
         originalTaskAssignmentThread1.addAll(thread1.tasks().keySet());
@@ -564,6 +565,8 @@ public class StreamThreadTest {
         thread1.rebalanceListener.onPartitionsRevoked(task0Assignment);
         thread2.rebalanceListener.onPartitionsRevoked(task1Assignment);
 
+        assertThat(thread1.prevActiveTasks(), equalTo(originalTaskAssignmentThread1));
+        assertThat(thread2.prevActiveTasks(), equalTo(originalTaskAssignmentThread2));
 
         // assign reverted
         thread1Assignment.clear();
@@ -576,18 +579,18 @@ public class StreamThreadTest {
             @Override
             public void run() {
                 thread1.rebalanceListener.onPartitionsAssigned(task1Assignment);
+                thread1.runOnce(-1);
             }
         });
         runIt.start();
 
         thread2.rebalanceListener.onPartitionsAssigned(task0Assignment);
+        thread2.runOnce(-1);
 
         runIt.join();
 
         assertThat(thread1.tasks().keySet(), equalTo(originalTaskAssignmentThread2));
         assertThat(thread2.tasks().keySet(), equalTo(originalTaskAssignmentThread1));
-        assertThat(thread1.prevActiveTasks(), equalTo(originalTaskAssignmentThread1));
-        assertThat(thread2.prevActiveTasks(), equalTo(originalTaskAssignmentThread2));
     }
 
     private class MockStreamsPartitionAssignor extends StreamPartitionAssignor {
@@ -704,10 +707,10 @@ public class StreamThreadTest {
                         topology,
                         consumer,
                         clientSupplier.getProducer(new HashMap<String, Object>()),
-                        restoreConsumer,
                         config,
                         new MockStreamsMetrics(new Metrics()),
-                        stateDirectory);
+                        stateDirectory,
+                        storeChangelogReader);
                 }
             };
 
@@ -728,6 +731,8 @@ public class StreamThreadTest {
             rebalanceListener.onPartitionsRevoked(revokedPartitions);
             rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
+            thread.runOnce(-1);
+
             assertEquals(2, thread.tasks().size());
 
             // no task is committed before the commit interval
@@ -830,6 +835,8 @@ public class StreamThreadTest {
         thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
         thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
+        thread.runOnce(-1);
+
         assertNull(thread.threadProducer);
         assertEquals(thread.tasks().size(), clientSupplier.producers.size());
         final Iterator it = clientSupplier.producers.iterator();
@@ -940,16 +947,16 @@ public class StreamThreadTest {
         builder.addSource("name", "topic").addSink("out", "output");
 
         final TestStreamTask testStreamTask = new TestStreamTask(
-            new TaskId(0, 0),
-            applicationId,
-            Utils.mkSet(new TopicPartition("topic", 0)),
-            builder.build(0),
-            clientSupplier.consumer,
-            clientSupplier.getProducer(new HashMap<String, Object>()),
-            clientSupplier.restoreConsumer,
-            config,
-            new MockStreamsMetrics(new Metrics()),
-            new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG),
mockTime));
+                new TaskId(0, 0),
+                applicationId,
+                Utils.mkSet(new TopicPartition("topic", 0)),
+                builder.build(0),
+                clientSupplier.consumer,
+                clientSupplier.getProducer(new HashMap<String, Object>()),
+                config,
+                new MockStreamsMetrics(new Metrics()),
+                new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG),
mockTime),
+                changelogReader);
 
         final StreamThread thread = new StreamThread(
             builder,
@@ -987,6 +994,7 @@ public class StreamThreadTest {
         thread.setState(StreamThread.State.RUNNING);
         thread.setState(StreamThread.State.PARTITIONS_REVOKED);
         thread.rebalanceListener.onPartitionsAssigned(activeTasks);
+        thread.runOnce(-1);
         thread.rebalanceListener.onPartitionsRevoked(activeTasks);
 
         assertTrue(testStreamTask.suspended);
@@ -1021,17 +1029,23 @@ public class StreamThreadTest {
 
         final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
         restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog",
-                                         Collections.singletonList(new PartitionInfo("stream-thread-test-count-one-changelog",
-                                                                                     0,
-                                                                                     null,
-                                                                                     new
Node[0],
-                                                                                     new
Node[0])));
+                Collections.singletonList(new PartitionInfo("stream-thread-test-count-one-changelog",
+                        0,
+                        null,
+                        new Node[0],
+                        new Node[0])));
         restoreConsumer.updatePartitions("stream-thread-test-count-two-changelog",
-                                         Collections.singletonList(new PartitionInfo("stream-thread-test-count-two-changelog",
-                                                                                     0,
-                                                                                     null,
-                                                                                     new
Node[0],
-                                                                                     new
Node[0])));
+                Collections.singletonList(new PartitionInfo("stream-thread-test-count-two-changelog",
+                        0,
+                        null,
+                        new Node[0],
+                        new Node[0])));
+        final TopicPartition tp1 = new TopicPartition("stream-thread-test-count-one-changelog",
0);
+        final TopicPartition tp2 = new TopicPartition("stream-thread-test-count-two-changelog",
0);
+        final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
+        beginningOffsets.put(tp1, 0L);
+        beginningOffsets.put(tp2, 0L);
+        restoreConsumer.updateBeginningOffsets(beginningOffsets);
 
         final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
         final TopicPartition t1 = new TopicPartition("t1", 0);
@@ -1047,16 +1061,16 @@ public class StreamThreadTest {
         thread.setState(StreamThread.State.RUNNING);
         thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
         thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList());
-
-        assertThat(restoreConsumer.assignment(), equalTo(Utils.mkSet(new TopicPartition("stream-thread-test-count-one-changelog",
0))));
+        thread.runOnce(-1);
+        assertThat(restoreConsumer.assignment(), equalTo(Utils.mkSet(tp1)));
 
         // assign an existing standby plus a new one
         standbyTasks.put(new TaskId(1, 0), Utils.mkSet(new TopicPartition("t2", 0)));
         thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
         thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList());
-
-        assertThat(restoreConsumer.assignment(), equalTo(Utils.mkSet(new TopicPartition("stream-thread-test-count-one-changelog",
0),
-                                                                     new TopicPartition("stream-thread-test-count-two-changelog",
0))));
+        thread.runOnce(-1);
+        assertThat(restoreConsumer.assignment(), equalTo(Utils.mkSet(tp1,
+                                                                     tp2)));
     }
 
     @Test
@@ -1101,11 +1115,15 @@ public class StreamThreadTest {
 
         final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
         final TopicPartition t1 = new TopicPartition("t1", 0);
-        standbyTasks.put(new TaskId(0, 0), Utils.mkSet(t1));
+        Set<TopicPartition> partitionsT1 = Utils.mkSet(t1);
+        standbyTasks.put(new TaskId(0, 0), partitionsT1);
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
         final TopicPartition t2 = new TopicPartition("t2", 0);
-        activeTasks.put(new TaskId(1, 0), Utils.mkSet(t2));
+        Set<TopicPartition> partitionsT2 = Utils.mkSet(t2);
+        activeTasks.put(new TaskId(1, 0), partitionsT2);
+
+        clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t2, 0L));
 
         thread.setPartitionAssignor(new StreamPartitionAssignor() {
             @Override
@@ -1121,8 +1139,9 @@ public class StreamThreadTest {
 
         thread.setState(StreamThread.State.RUNNING);
         thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+        clientSupplier.consumer.assign(partitionsT2);
         thread.rebalanceListener.onPartitionsAssigned(Utils.mkSet(t2));
-
+        thread.runOnce(-1);
         // swap the assignment around and make sure we don't get any exceptions
         standbyTasks.clear();
         activeTasks.clear();
@@ -1130,6 +1149,7 @@ public class StreamThreadTest {
         activeTasks.put(new TaskId(0, 0), Utils.mkSet(t1));
 
         thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+        clientSupplier.consumer.assign(partitionsT1);
         thread.rebalanceListener.onPartitionsAssigned(Utils.mkSet(t1));
     }
 
@@ -1158,16 +1178,16 @@ public class StreamThreadTest {
             protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition>
partitions) {
                 final ProcessorTopology topology = builder.build(id.topicGroupId);
                 final TestStreamTask task = new TestStreamTask(
-                    id,
-                    applicationId,
-                    partitions,
-                    topology,
-                    consumer,
-                    clientSupplier.getProducer(new HashMap<String, Object>()),
-                    restoreConsumer,
-                    config,
-                    new MockStreamsMetrics(new Metrics()),
-                    stateDirectory);
+                        id,
+                        applicationId,
+                        partitions,
+                        topology,
+                        consumer,
+                        clientSupplier.getProducer(new HashMap<String, Object>()),
+                        config,
+                        new MockStreamsMetrics(new Metrics()),
+                        stateDirectory,
+                        storeChangelogReader);
                 createdTasks.put(partitions, task);
                 return task;
             }
@@ -1198,6 +1218,7 @@ public class StreamThreadTest {
         thread.setState(StreamThread.State.RUNNING);
         thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
         thread.rebalanceListener.onPartitionsAssigned(task00Partitions);
+        thread.runOnce(-1);
 
         final TestStreamTask firstTask = createdTasks.get(task00Partitions);
         assertThat(firstTask.id(), is(taskId));
@@ -1241,39 +1262,22 @@ public class StreamThreadTest {
         activeTasks.put(task1, task0Assignment);
 
         thread.setPartitionAssignor(new MockStreamsPartitionAssignor(activeTasks));
-        thread.start();
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return thread.state() == StreamThread.State.RUNNING;
-            }
-        }, 10 * 1000, "Thread never started.");
-
+        thread.setState(StreamThread.State.RUNNING);
         thread.rebalanceListener.onPartitionsRevoked(null);
         thread.rebalanceListener.onPartitionsAssigned(task0Assignment);
+        thread.runOnce(-1);
+
         assertThat(thread.tasks().size(), equalTo(1));
         final MockProducer producer = clientSupplier.producers.get(0);
 
-        TestUtils.waitForCondition(
-            new TestCondition() {
-                @Override
-                public boolean conditionMet() {
-                    return !consumer.subscription().isEmpty();
-                }
-            },
-            "StreamsThread's internal consumer did not subscribe to input topic.");
-
         // change consumer subscription from "pattern" to "manual" to be able to call .addRecords()
-        consumer.updateBeginningOffsets(new HashMap<TopicPartition, Long>() {
-            {
-                put(task0Assignment.iterator().next(), 0L);
-            }
-        });
+        consumer.updateBeginningOffsets(Collections.singletonMap(task0Assignment.iterator().next(),
0L));
         consumer.unsubscribe();
         consumer.assign(task0Assignment);
 
         consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, new byte[0], new byte[0]));
         mockTime.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) + 1);
+        thread.runOnce(-1);
         TestUtils.waitForCondition(
             new TestCondition() {
                 @Override
@@ -1296,8 +1300,8 @@ public class StreamThreadTest {
 
         producer.fenceProducer();
         mockTime.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) + 1L);
-
         consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, new byte[0], new byte[0]));
+        thread.runOnce(-1);
         TestUtils.waitForCondition(
             new TestCondition() {
                 @Override
@@ -1307,9 +1311,6 @@ public class StreamThreadTest {
             },
             "StreamsThread did not remove fenced zombie task.");
 
-        thread.close();
-        thread.join();
-
         assertThat(producer.commitCount(), equalTo(1L));
     }
 
@@ -1338,6 +1339,7 @@ public class StreamThreadTest {
         thread.setState(StreamThread.State.RUNNING);
         thread.rebalanceListener.onPartitionsRevoked(null);
         thread.rebalanceListener.onPartitionsAssigned(task0Assignment);
+        thread.runOnce(-1);
         assertThat(thread.tasks().size(), equalTo(1));
 
         thread.rebalanceListener.onPartitionsRevoked(null);
@@ -1357,16 +1359,16 @@ public class StreamThreadTest {
         builder.stream("t1").groupByKey();
 
         final TestStreamTask testStreamTask = new TestStreamTask(
-            new TaskId(0, 0),
-            applicationId,
-            Utils.mkSet(new TopicPartition("t1", 0)),
-            builder.build(0),
-            clientSupplier.consumer,
-            clientSupplier.getProducer(new HashMap<String, Object>()),
-            clientSupplier.restoreConsumer,
-            config,
-            new MockStreamsMetrics(new Metrics()),
-            new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG),
mockTime)) {
+                new TaskId(0, 0),
+                applicationId,
+                Utils.mkSet(new TopicPartition("t1", 0)),
+                builder.build(0),
+                clientSupplier.consumer,
+                clientSupplier.getProducer(new HashMap<String, Object>()),
+                config,
+                new MockStreamsMetrics(new Metrics()),
+                new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG),
mockTime),
+                changelogReader) {
 
             @Override
             public void close(final boolean clean) {
@@ -1410,17 +1412,18 @@ public class StreamThreadTest {
     public void shouldNotViolateAtLeastOnceWhenAnExceptionOccursOnTaskFlushDuringShutdown()
throws Exception {
         final MockStateStoreSupplier.MockStateStore stateStore = new MockStateStoreSupplier.MockStateStore("foo",
false);
         builder.stream("t1").groupByKey().count(new MockStateStoreSupplier(stateStore));
+        TopicPartition t1 = new TopicPartition("t1", 0);
         final TestStreamTask testStreamTask = new TestStreamTask(
-            new TaskId(0, 0),
-            applicationId,
-            Utils.mkSet(new TopicPartition("t1", 0)),
-            builder.build(0),
-            clientSupplier.consumer,
-            clientSupplier.getProducer(new HashMap<String, Object>()),
-            clientSupplier.restoreConsumer,
-            config,
-            new MockStreamsMetrics(metrics),
-            new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG),
mockTime)) {
+                new TaskId(0, 0),
+                applicationId,
+                Utils.mkSet(t1),
+                builder.build(0),
+                clientSupplier.consumer,
+                clientSupplier.getProducer(new HashMap<String, Object>()),
+                config,
+                new MockStreamsMetrics(metrics),
+                new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG),
mockTime),
+                changelogReader) {
 
             @Override
             public void flushState() {
@@ -1453,20 +1456,17 @@ public class StreamThreadTest {
 
 
         thread.setPartitionAssignor(new MockStreamsPartitionAssignor(activeTasks));
-        thread.start();
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return thread.state() == StreamThread.State.RUNNING;
-            }
-        }, 10 * 1000, "Thread never started.");
+        thread.setState(StreamThread.State.RUNNING);
         thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+        clientSupplier.consumer.assign(testStreamTask.partitions);
+        clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1, 0L));
         thread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions);
+        thread.runOnce(-1);
         // store should have been opened
         assertTrue(stateStore.isOpen());
 
-        thread.close();
-        thread.join();
+        thread.shutdown(true);
+
         assertFalse("task shouldn't have been committed as there was an exception during
shutdown", testStreamTask.committed);
         // store should be closed even if we had an exception
         assertFalse(stateStore.isOpen());
@@ -1483,10 +1483,10 @@ public class StreamThreadTest {
                 builder.build(0),
                 clientSupplier.consumer,
                 clientSupplier.getProducer(new HashMap<String, Object>()),
-                clientSupplier.restoreConsumer,
                 config,
                 new MockStreamsMetrics(new Metrics()),
-                new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG),
mockTime)) {
+                new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG),
mockTime),
+                changelogReader) {
 
             @Override
             public void suspend() {
@@ -1520,7 +1520,7 @@ public class StreamThreadTest {
         thread.setState(StreamThread.State.RUNNING);
         thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
         thread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions);
-
+        thread.runOnce(-1);
         thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
 
         assertFalse(testStreamTask.committed);
@@ -1534,16 +1534,16 @@ public class StreamThreadTest {
         builder.stream("t1").groupByKey();
 
         final TestStreamTask testStreamTask = new TestStreamTask(
-            new TaskId(0, 0),
-            applicationId,
-            Utils.mkSet(new TopicPartition("t1", 0)),
-            builder.build(0),
-            clientSupplier.consumer,
-            clientSupplier.getProducer(new HashMap<String, Object>()),
-            clientSupplier.restoreConsumer,
-            config,
-            new MockStreamsMetrics(new Metrics()),
-            new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG),
mockTime)) {
+                new TaskId(0, 0),
+                applicationId,
+                Utils.mkSet(new TopicPartition("t1", 0)),
+                builder.build(0),
+                clientSupplier.consumer,
+                clientSupplier.getProducer(new HashMap<String, Object>()),
+                config,
+                new MockStreamsMetrics(new Metrics()),
+                new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG),
mockTime),
+                changelogReader) {
 
             @Override
             public void suspend() {
@@ -1579,6 +1579,7 @@ public class StreamThreadTest {
         thread.setState(StreamThread.State.RUNNING);
         thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
         thread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions);
+        thread.runOnce(-1);
         try {
             thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
             fail("should have thrown exception");
@@ -1595,16 +1596,16 @@ public class StreamThreadTest {
         builder.stream("t1").groupByKey();
 
         final TestStreamTask testStreamTask = new TestStreamTask(
-            new TaskId(0, 0),
-            applicationId,
-            Utils.mkSet(new TopicPartition("t1", 0)),
-            builder.build(0),
-            clientSupplier.consumer,
-            clientSupplier.getProducer(new HashMap<String, Object>()),
-            clientSupplier.restoreConsumer,
-            config,
-            new MockStreamsMetrics(new Metrics()),
-            new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG),
mockTime)) {
+                new TaskId(0, 0),
+                applicationId,
+                Utils.mkSet(new TopicPartition("t1", 0)),
+                builder.build(0),
+                clientSupplier.consumer,
+                clientSupplier.getProducer(new HashMap<String, Object>()),
+                config,
+                new MockStreamsMetrics(new Metrics()),
+                new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG),
mockTime),
+                changelogReader) {
 
             @Override
             protected void flushState() {
@@ -1640,6 +1641,7 @@ public class StreamThreadTest {
         thread.setState(StreamThread.State.RUNNING);
         thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
         thread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions);
+        thread.runOnce(-1);
         try {
             thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
             fail("should have thrown exception");
@@ -1763,35 +1765,31 @@ public class StreamThreadTest {
         final StateDirectory stateDirMock = mockStateDirInteractions(taskId);
 
         final StreamThread thread = setupTest(taskId, stateDirMock);
+        thread.start();
         thread.close();
         thread.join();
         EasyMock.verify(stateDirMock);
     }
 
 
-    private StreamThread setupTest(final TaskId taskId, final StateDirectory stateDirectory)
throws InterruptedException {
-        final TopologyBuilder builder = new TopologyBuilder();
+    private StreamThread setupTest(final TaskId taskId, final StateDirectory stateDirectory)
{
+        final KStreamBuilder builder = new KStreamBuilder();
+
         builder.setApplicationId(applicationId);
-        builder.addSource("source", "topic");
+        builder.table("topic", "topic");
 
         final MockClientSupplier clientSupplier = new MockClientSupplier();
 
-        final TestStreamTask testStreamTask = new TestStreamTask(taskId,
-            applicationId,
-            Utils.mkSet(new TopicPartition("topic", 0)),
-            builder.build(0),
-            clientSupplier.consumer,
-            clientSupplier.getProducer(new HashMap<String, Object>()),
-            clientSupplier.restoreConsumer,
-            config,
-            new MockStreamsMetrics(new Metrics()),
-            stateDirectory) {
-
-            @Override
-            public void suspend() {
-                throw new RuntimeException("KABOOM!!!");
-            }
-        };
+        final TopicPartition topic = new TopicPartition("topic", 0);
+        final Set<TopicPartition> partitions = Utils.mkSet(topic);
+        final Map<TopicPartition, Long> offsets = Collections.singletonMap(topic, 0L);
+        clientSupplier.restoreConsumer.updatePartitions("topic",
+                                                        Collections.singletonList(
+                                                                new PartitionInfo("topic",
0, null, null, null)));
+        clientSupplier.restoreConsumer.updateBeginningOffsets(offsets);
+        clientSupplier.restoreConsumer.updateEndOffsets(offsets);
+        clientSupplier.consumer.assign(partitions);
+        clientSupplier.consumer.updateBeginningOffsets(offsets);
 
         final StreamThread thread = new StreamThread(
             builder,
@@ -1807,23 +1805,33 @@ public class StreamThreadTest {
 
             @Override
             protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition>
partitions) {
-                return testStreamTask;
+                return new TestStreamTask(taskId,
+                                          applicationId,
+                                          partitions,
+                                          builder.build(0),
+                                          clientSupplier.consumer,
+                                          clientSupplier.getProducer(new HashMap<String,
Object>()),
+                                          config,
+                                          new MockStreamsMetrics(new Metrics()),
+                                          stateDirectory,
+                                          storeChangelogReader) {
+
+                    @Override
+                    public void suspend() {
+                        throw new RuntimeException("KABOOM!!!");
+                    }
+                };
+
             }
         };
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
-        activeTasks.put(testStreamTask.id, testStreamTask.partitions);
+        activeTasks.put(taskId, partitions);
         thread.setPartitionAssignor(new MockStreamsPartitionAssignor(activeTasks));
-        thread.start();
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return thread.state() == StreamThread.State.RUNNING;
-            }
-        }, "thread didn't transition to running");
+        thread.setState(StreamThread.State.RUNNING);
         thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptySet());
-        thread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions);
-
+        thread.rebalanceListener.onPartitionsAssigned(partitions);
+        thread.runOnce(-1);
         return thread;
     }
 
@@ -1833,8 +1841,6 @@ public class StreamThreadTest {
         final StateDirectory stateDirMock = mockStateDirInteractions(taskId);
         final StreamThread thread = setupStandbyTest(taskId, stateDirMock);
 
-        startThreadAndRebalance(thread);
-
         try {
             thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
             fail("Should have thrown exception");
@@ -1846,30 +1852,15 @@ public class StreamThreadTest {
         EasyMock.verify(stateDirMock);
     }
 
-    private void startThreadAndRebalance(final StreamThread thread) throws InterruptedException
{
-        thread.start();
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return thread.state() == StreamThread.State.RUNNING;
-            }
-        }, "thread didn't transition to running");
-        thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptySet());
-        thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptySet());
-    }
-
     @Test
     public void shouldReleaseStateDirLockIfFailureOnStandbyTaskCloseForUnassignedSuspendedStandbyTask()
throws Exception {
         final TaskId taskId = new TaskId(0, 0);
         final StateDirectory stateDirMock = mockStateDirInteractions(taskId);
         final StreamThread thread = setupStandbyTest(taskId, stateDirMock);
-        startThreadAndRebalance(thread);
-        try {
-            thread.close();
-            thread.join();
-        } finally {
-            thread.close();
-        }
+
+        thread.close();
+        thread.shutdown(true);
+
         EasyMock.verify(stateDirMock);
     }
 
@@ -1947,6 +1938,10 @@ public class StreamThreadTest {
         final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
         standbyTasks.put(taskId, Collections.singleton(new TopicPartition("topic", 0)));
         thread.setPartitionAssignor(new MockStreamsPartitionAssignor(Collections.<TaskId,
Set<TopicPartition>>emptyMap(), standbyTasks));
+        thread.setState(StreamThread.State.RUNNING);
+        thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptySet());
+        thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptySet());
+        thread.runOnce(-1);
         return thread;
     }
 
@@ -2003,16 +1998,15 @@ public class StreamThreadTest {
             protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition>
partitionsForTask) {
                 final ProcessorTopology topology = builder.build(id.topicGroupId);
                 return new TestStreamTask(
-                    id,
-                    applicationId,
-                    partitionsForTask,
-                    topology,
-                    consumer,
-                    clientSupplier.getProducer(new HashMap()),
-                    restoreConsumer,
-                    config,
-                    new MockStreamsMetrics(new Metrics()),
-                    stateDirectory);
+                        id,
+                        applicationId,
+                        partitionsForTask,
+                        topology,
+                        consumer,
+                        clientSupplier.getProducer(new HashMap()),
+                        config,
+                        new MockStreamsMetrics(new Metrics()),
+                        stateDirectory, storeChangelogReader);
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b268322e/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index f04f80a..549c67c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -103,10 +103,12 @@ public class StreamThreadStateStoreProviderTest {
         stateDirectory = new StateDirectory(applicationId, stateConfigDir, new MockTime());
         taskOne = createStreamsTask(applicationId, streamsConfig, clientSupplier, topology,
                                     new TaskId(0, 0));
+        taskOne.initialize();
         tasks.put(new TaskId(0, 0),
                   taskOne);
         taskTwo = createStreamsTask(applicationId, streamsConfig, clientSupplier, topology,
                                     new TaskId(0, 1));
+        taskTwo.initialize();
         tasks.put(new TaskId(0, 1),
                   taskTwo);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b268322e/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java b/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java
index 4db10e7..86c0eb5 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.processor.internals.ChangelogReader;
 import org.apache.kafka.streams.processor.internals.StateRestorer;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
@@ -39,8 +40,8 @@ public class MockChangelogReader implements ChangelogReader {
     }
 
     @Override
-    public void restore() {
-
+    public Collection<TopicPartition> restore() {
+        return registered;
     }
 
     @Override
@@ -48,6 +49,11 @@ public class MockChangelogReader implements ChangelogReader {
         return Collections.emptyMap();
     }
 
+    @Override
+    public void reset() {
+        registered.clear();
+    }
+
     public boolean wasRegistered(final TopicPartition partition) {
         return registered.contains(partition);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b268322e/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index a9f020b..20ab3f1 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -225,6 +225,7 @@ public class ProcessorTopologyTestDriver {
                                   cache,
                                   new MockTime(),
                                   producer);
+            task.initialize();
         }
     }
 


Mime
View raw message