kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/6] kafka git commit: KIP-28: Add a processor client for Kafka Streaming
Date Sat, 26 Sep 2015 00:24:21 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java
new file mode 100644
index 0000000..0a1f95c
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+public class MinTimestampTrackerTest {
+
+    private Stamped<String> elem(long timestamp) {
+        return new Stamped<>("", timestamp);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testTracking() {
+        TimestampTracker<String> tracker = new MinTimestampTracker<>();
+
+        Object[] elems = new Object[]{
+            elem(100), elem(101), elem(102), elem(98), elem(99), elem(100)
+        };
+
+        int insertionIndex = 0;
+        int removalIndex = 0;
+
+        // add 100
+        tracker.addElement((Stamped<String>) elems[insertionIndex++]);
+        assertEquals(100L, tracker.get());
+
+        // add 101
+        tracker.addElement((Stamped<String>) elems[insertionIndex++]);
+        assertEquals(100L, tracker.get());
+
+        // remove 100
+        tracker.removeElement((Stamped<String>) elems[removalIndex++]);
+        assertEquals(101L, tracker.get());
+
+        // add 102
+        tracker.addElement((Stamped<String>) elems[insertionIndex++]);
+        assertEquals(101L, tracker.get());
+
+        // add 98
+        tracker.addElement((Stamped<String>) elems[insertionIndex++]);
+        assertEquals(98L, tracker.get());
+
+        // add 99
+        tracker.addElement((Stamped<String>) elems[insertionIndex++]);
+        assertEquals(98L, tracker.get());
+
+        // add 100
+        tracker.addElement((Stamped<String>) elems[insertionIndex++]);
+        assertEquals(98L, tracker.get());
+
+        // remove 101
+        tracker.removeElement((Stamped<String>) elems[removalIndex++]);
+        assertEquals(98L, tracker.get());
+
+        // remove 102
+        tracker.removeElement((Stamped<String>) elems[removalIndex++]);
+        assertEquals(98L, tracker.get());
+
+        // remove 98
+        tracker.removeElement((Stamped<String>) elems[removalIndex++]);
+        assertEquals(99L, tracker.get());
+
+        // remove 99
+        tracker.removeElement((Stamped<String>) elems[removalIndex++]);
+        assertEquals(100L, tracker.get());
+
+        // remove 100
+        tracker.removeElement((Stamped<String>) elems[removalIndex++]);
+        assertEquals(100L, tracker.get());
+
+        assertEquals(insertionIndex, removalIndex);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
new file mode 100644
index 0000000..b91acdc
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.test.MockSourceNode;
+import org.apache.kafka.test.MockTimestampExtractor;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+public class PartitionGroupTest {
+    private final Serializer<Integer> intSerializer = new IntegerSerializer();
+    private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
+    private final TimestampExtractor timestampExtractor = new MockTimestampExtractor();
+    private final TopicPartition partition1 = new TopicPartition("topic", 1);
+    private final TopicPartition partition2 = new TopicPartition("topic", 2);
+    private final RecordQueue queue1 = new RecordQueue(partition1, new MockSourceNode<>(intDeserializer, intDeserializer));
+    private final RecordQueue queue2 = new RecordQueue(partition2, new MockSourceNode<>(intDeserializer, intDeserializer));
+
+    private final byte[] recordValue = intSerializer.serialize(null, 10);
+    private final byte[] recordKey = intSerializer.serialize(null, 1);
+
+    private final PartitionGroup group = new PartitionGroup(new HashMap<TopicPartition, RecordQueue>() {
+        {
+            put(partition1, queue1);
+            put(partition2, queue2);
+        }
+    }, timestampExtractor);
+
+    @Test
+    public void testTimeTracking() {
+        assertEquals(0, group.numBuffered());
+
+        // add three 3 records with timestamp 1, 3, 5 to partition-1
+        List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
+            new ConsumerRecord<>("topic", 1, 1, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 3, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 5, recordKey, recordValue));
+
+        group.addRawRecords(partition1, list1);
+
+        // add three 3 records with timestamp 2, 4, 6 to partition-2
+        List<ConsumerRecord<byte[], byte[]>> list2 = Arrays.asList(
+            new ConsumerRecord<>("topic", 1, 2, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 4, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 6, recordKey, recordValue));
+
+        group.addRawRecords(partition2, list2);
+
+        assertEquals(6, group.numBuffered());
+        assertEquals(3, group.numBuffered(partition1));
+        assertEquals(3, group.numBuffered(partition2));
+        assertEquals(TimestampTracker.NOT_KNOWN, group.timestamp());
+
+        StampedRecord record;
+        PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo();
+
+        // get one record
+        record = group.nextRecord(info);
+        assertEquals(partition1, info.partition());
+        assertEquals(1L, record.timestamp);
+        assertEquals(5, group.numBuffered());
+        assertEquals(2, group.numBuffered(partition1));
+        assertEquals(3, group.numBuffered(partition2));
+        assertEquals(TimestampTracker.NOT_KNOWN, group.timestamp());
+
+        // get one record, now the time should be advanced
+        record = group.nextRecord(info);
+        assertEquals(partition2, info.partition());
+        assertEquals(2L, record.timestamp);
+        assertEquals(4, group.numBuffered());
+        assertEquals(2, group.numBuffered(partition1));
+        assertEquals(2, group.numBuffered(partition2));
+        assertEquals(3L, group.timestamp());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
new file mode 100644
index 0000000..343ed52
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -0,0 +1,449 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.processor.RestoreFunc;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.OffsetCheckpoint;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.channels.FileLock;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+public class ProcessorStateManagerTest {
+
+    private static class MockStateStore implements StateStore {
+        private final String name;
+        private final boolean persistent;
+
+        public boolean flushed = false;
+        public boolean closed = false;
+        public final ArrayList<Integer> keys = new ArrayList<>();
+
+        public MockStateStore(String name, boolean persistent) {
+            this.name = name;
+            this.persistent = persistent;
+        }
+        @Override
+        public String name() {
+            return name;
+        }
+        @Override
+        public void flush() {
+            flushed = true;
+        }
+        @Override
+        public void close() {
+            closed = true;
+        }
+        @Override
+        public boolean persistent() {
+            return persistent;
+        }
+
+        public final RestoreFunc restoreFunc = new RestoreFunc() {
+            private final Deserializer<Integer> deserializer = new IntegerDeserializer();
+
+            @Override
+            public void apply(byte[] key, byte[] value) {
+                keys.add(deserializer.deserialize("", key));
+            }
+        };
+    }
+
+    private class MockRestoreConsumer  extends MockConsumer<byte[], byte[]> {
+        private final Serializer<Integer> serializer = new IntegerSerializer();
+
+        public TopicPartition assignedPartition = null;
+        public TopicPartition seekPartition = null;
+        public long seekOffset = -1L;
+        public boolean seekToBeginingCalled = false;
+        public boolean seekToEndCalled = false;
+        private long endOffset = 0L;
+        private long currentOffset = 0L;
+
+        private ArrayList<ConsumerRecord<byte[], byte[]>> recordBuffer = new ArrayList<>();
+
+        MockRestoreConsumer() {
+            super(OffsetResetStrategy.EARLIEST);
+
+            reset();
+        }
+
+        // reset this mock restore consumer for a state store registration
+        public void reset() {
+            assignedPartition = null;
+            seekOffset = -1L;
+            seekToBeginingCalled = false;
+            seekToEndCalled = false;
+            endOffset = 0L;
+            recordBuffer.clear();
+        }
+
+        // buffer a record (we cannot use addRecord because we need to add records before asigning a partition)
+        public void bufferRecord(ConsumerRecord<Integer, Integer> record) {
+            recordBuffer.add(
+                new ConsumerRecord<>(record.topic(), record.partition(), record.offset(),
+                    serializer.serialize(record.topic(), record.key()),
+                    serializer.serialize(record.topic(), record.value())));
+            endOffset = record.offset();
+
+            super.updateEndOffsets(Collections.singletonMap(assignedPartition, endOffset));
+        }
+
+        @Override
+        public synchronized void assign(List<TopicPartition> partitions) {
+            int numPartitions = partitions.size();
+            if (numPartitions > 1)
+                throw new IllegalArgumentException("RestoreConsumer: more than one partition specified");
+
+            if (numPartitions == 1) {
+                if (assignedPartition != null)
+                    throw new IllegalStateException("RestoreConsumer: partition already assigned");
+                assignedPartition = partitions.get(0);
+
+                // set the beginning offset to 0
+                // NOTE: this is users responsible to set the initial lEO.
+                super.updateBeginningOffsets(Collections.singletonMap(assignedPartition, 0L));
+            }
+
+            super.assign(partitions);
+        }
+
+        @Override
+        public ConsumerRecords<byte[], byte[]> poll(long timeout) {
+            // add buffered records to MockConsumer
+            for (ConsumerRecord<byte[], byte[]> record : recordBuffer) {
+                super.addRecord(record);
+            }
+            recordBuffer.clear();
+
+            ConsumerRecords<byte[], byte[]> records = super.poll(timeout);
+
+            // set the current offset
+            Iterable<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records(assignedPartition);
+            for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
+                currentOffset = record.offset();
+            }
+
+            return records;
+        }
+
+        @Override
+        public synchronized long position(TopicPartition partition) {
+            if (!partition.equals(assignedPartition))
+                throw new IllegalStateException("RestoreConsumer: unassigned partition");
+
+            return currentOffset;
+        }
+
+        @Override
+        public synchronized void seek(TopicPartition partition, long offset) {
+            if (offset < 0)
+                throw new IllegalArgumentException("RestoreConsumer: offset should not be negative");
+
+            if (seekOffset >= 0)
+                throw new IllegalStateException("RestoreConsumer: offset already seeked");
+
+            seekPartition = partition;
+            seekOffset = offset;
+            currentOffset = offset;
+            super.seek(partition, offset);
+        }
+
+        @Override
+        public synchronized void seekToBeginning(TopicPartition... partitions) {
+            if (partitions.length != 1)
+                throw new IllegalStateException("RestoreConsumer: other than one partition specified");
+
+            for (TopicPartition partition : partitions) {
+                if (!partition.equals(assignedPartition))
+                    throw new IllegalStateException("RestoreConsumer: seek-to-end not on the assigned partition");
+            }
+
+            seekToBeginingCalled = true;
+            currentOffset = 0L;
+        }
+
+        @Override
+        public synchronized void seekToEnd(TopicPartition... partitions) {
+            if (partitions.length != 1)
+                throw new IllegalStateException("RestoreConsumer: other than one partition specified");
+
+            for (TopicPartition partition : partitions) {
+                if (!partition.equals(assignedPartition))
+                    throw new IllegalStateException("RestoreConsumer: seek-to-end not on the assigned partition");
+            }
+
+            seekToEndCalled = true;
+            currentOffset = endOffset;
+        }
+    }
+
+    @Test
+    public void testLockStateDirectory() throws IOException {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            FileLock lock;
+
+            // the state manager locks the directory
+            ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, new MockRestoreConsumer());
+
+            try {
+                // this should not get the lock
+                lock = ProcessorStateManager.lockStateDirectory(baseDir);
+                assertNull(lock);
+            } finally {
+                // by closing the state manager, release the lock
+                stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
+            }
+
+            // now, this should get the lock
+            lock = ProcessorStateManager.lockStateDirectory(baseDir);
+            try {
+                assertNotNull(lock);
+            } finally {
+                if (lock != null) lock.release();
+            }
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testNoTopic() throws IOException {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            MockStateStore mockStateStore = new MockStateStore("mockStore", false);
+
+            ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, new MockRestoreConsumer());
+            try {
+                stateMgr.register(mockStateStore, mockStateStore.restoreFunc);
+            } finally {
+                stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
+            }
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    @Test
+    public void testRegisterPersistentStore() throws IOException {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            long lastCheckpointedOffset = 10L;
+            OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
+            checkpoint.write(Collections.singletonMap(new TopicPartition("persistentStore", 2), lastCheckpointedOffset));
+
+            MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
+            restoreConsumer.updatePartitions("persistentStore", Arrays.asList(
+                    new PartitionInfo("persistentStore", 1, Node.noNode(), new Node[0], new Node[0]),
+                    new PartitionInfo("persistentStore", 2, Node.noNode(), new Node[0], new Node[0])
+            ));
+            restoreConsumer.updateEndOffsets(Collections.singletonMap(new TopicPartition("persistentStore", 2), 13L));
+
+            MockStateStore persistentStore = new MockStateStore("persistentStore", false); // non persistent store
+
+            ProcessorStateManager stateMgr = new ProcessorStateManager(2, baseDir, restoreConsumer);
+            try {
+                restoreConsumer.reset();
+
+                ArrayList<Integer> expectedKeys = new ArrayList<>();
+                for (int i = 1; i <= 3; i++) {
+                    long offset = (long) i;
+                    int key = i * 10;
+                    expectedKeys.add(key);
+                    restoreConsumer.bufferRecord(
+                            new ConsumerRecord<>("persistentStore", 2, offset, key, 0)
+                    );
+                }
+
+                stateMgr.register(persistentStore, persistentStore.restoreFunc);
+
+                assertEquals(new TopicPartition("persistentStore", 2), restoreConsumer.assignedPartition);
+                assertEquals(lastCheckpointedOffset, restoreConsumer.seekOffset);
+                assertFalse(restoreConsumer.seekToBeginingCalled);
+                assertTrue(restoreConsumer.seekToEndCalled);
+                assertEquals(expectedKeys, persistentStore.keys);
+
+            } finally {
+                stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
+            }
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    @Test
+    public void testRegisterNonPersistentStore() throws IOException {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            long lastCheckpointedOffset = 10L;
+            OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
+            checkpoint.write(Collections.singletonMap(new TopicPartition("persistentStore", 2), lastCheckpointedOffset));
+
+            MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
+            restoreConsumer.updatePartitions("nonPersistentStore", Arrays.asList(
+                    new PartitionInfo("nonPersistentStore", 1, Node.noNode(), new Node[0], new Node[0]),
+                    new PartitionInfo("nonPersistentStore", 2, Node.noNode(), new Node[0], new Node[0])
+            ));
+            restoreConsumer.updateEndOffsets(Collections.singletonMap(new TopicPartition("persistentStore", 2), 13L));
+
+            MockStateStore nonPersistentStore = new MockStateStore("nonPersistentStore", true); // persistent store
+
+            ProcessorStateManager stateMgr = new ProcessorStateManager(2, baseDir, restoreConsumer);
+            try {
+                restoreConsumer.reset();
+
+                ArrayList<Integer> expectedKeys = new ArrayList<>();
+                for (int i = 1; i <= 3; i++) {
+                    long offset = (long) (i + 100);
+                    int key = i;
+                    expectedKeys.add(i);
+                    restoreConsumer.bufferRecord(
+                            new ConsumerRecord<>("nonPersistentStore", 2, offset, key, 0)
+                    );
+                }
+
+                stateMgr.register(nonPersistentStore, nonPersistentStore.restoreFunc);
+
+                assertEquals(new TopicPartition("nonPersistentStore", 2), restoreConsumer.assignedPartition);
+                assertEquals(0L, restoreConsumer.seekOffset);
+                assertTrue(restoreConsumer.seekToBeginingCalled);
+                assertTrue(restoreConsumer.seekToEndCalled);
+                assertEquals(expectedKeys, nonPersistentStore.keys);
+            } finally {
+                stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
+            }
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    @Test
+    public void testGetStore() throws IOException {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
+            restoreConsumer.updatePartitions("mockStore", Arrays.asList(
+                    new PartitionInfo("mockStore", 1, Node.noNode(), new Node[0], new Node[0])
+            ));
+
+            MockStateStore mockStateStore = new MockStateStore("mockStore", false);
+
+            ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, restoreConsumer);
+            try {
+                stateMgr.register(mockStateStore, mockStateStore.restoreFunc);
+
+                assertNull(stateMgr.getStore("noSuchStore"));
+                assertEquals(mockStateStore, stateMgr.getStore("mockStore"));
+
+            } finally {
+                stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
+            }
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    @Test
+    public void testClose() throws IOException {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        File checkpointFile = new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME);
+        try {
+            // write an empty checkpoint file
+            OffsetCheckpoint oldCheckpoint = new OffsetCheckpoint(checkpointFile);
+            oldCheckpoint.write(Collections.<TopicPartition, Long>emptyMap());
+
+            MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
+            restoreConsumer.updatePartitions("persistentStore", Arrays.asList(
+                    new PartitionInfo("persistentStore", 1, Node.noNode(), new Node[0], new Node[0])
+            ));
+            restoreConsumer.updatePartitions("nonPersistentStore", Arrays.asList(
+                    new PartitionInfo("nonPersistentStore", 1, Node.noNode(), new Node[0], new Node[0])
+            ));
+
+            // set up ack'ed offsets
+            HashMap<TopicPartition, Long> ackedOffsets = new HashMap<>();
+            ackedOffsets.put(new TopicPartition("persistentStore", 1), 123L);
+            ackedOffsets.put(new TopicPartition("nonPersistentStore", 1), 456L);
+            ackedOffsets.put(new TopicPartition("otherTopic", 1), 789L);
+
+            MockStateStore persistentStore = new MockStateStore("persistentStore", true);
+            MockStateStore nonPersistentStore = new MockStateStore("nonPersistentStore", false);
+
+            ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, restoreConsumer);
+            try {
+                // make sure the checkpoint file is deleted
+                assertFalse(checkpointFile.exists());
+
+                restoreConsumer.reset();
+                stateMgr.register(persistentStore, persistentStore.restoreFunc);
+
+                restoreConsumer.reset();
+                stateMgr.register(nonPersistentStore, nonPersistentStore.restoreFunc);
+            } finally {
+                // close the state manager with the ack'ed offsets
+                stateMgr.close(ackedOffsets);
+            }
+
+            // make sure all stores are closed, and the checkpoint file is written.
+            assertTrue(persistentStore.flushed);
+            assertTrue(persistentStore.closed);
+            assertTrue(nonPersistentStore.flushed);
+            assertTrue(nonPersistentStore.closed);
+            assertTrue(checkpointFile.exists());
+
+            // the checkpoint file should contain an offset from the persistent store only.
+            OffsetCheckpoint newCheckpoint = new OffsetCheckpoint(checkpointFile);
+            Map<TopicPartition, Long> checkpointedOffsets = newCheckpoint.read();
+            assertEquals(1, checkpointedOffsets.size());
+            assertEquals(new Long(123L + 1L), checkpointedOffsets.get(new TopicPartition("persistentStore", 1)));
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
new file mode 100644
index 0000000..1abb989
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorDef;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.state.InMemoryKeyValueStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.MockProcessorDef;
+import org.apache.kafka.test.ProcessorTopologyTestDriver;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Properties;
+
+public class ProcessorTopologyTest {
+
+    private static final Serializer<String> STRING_SERIALIZER = new StringSerializer();
+    private static final Deserializer<String> STRING_DESERIALIZER = new StringDeserializer();
+    private static final File STATE_DIR = new File("build/data").getAbsoluteFile();
+
+    protected static final String INPUT_TOPIC = "input-topic";
+    protected static final String OUTPUT_TOPIC_1 = "output-topic-1";
+    protected static final String OUTPUT_TOPIC_2 = "output-topic-2";
+
+    private static long timestamp = 1000L;
+
+    private ProcessorTopologyTestDriver driver;
+    private StreamingConfig config;
+
+    @Before
+    public void setup() {
+        STATE_DIR.mkdirs();
+        Properties props = new Properties();
+        props.setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+        props.setProperty(StreamingConfig.STATE_DIR_CONFIG, STATE_DIR.getAbsolutePath());
+        props.setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
+        props.setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        props.setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        props.setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        props.setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        this.config = new StreamingConfig(props);
+    }
+
+    @After
+    public void cleanup() {
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+        if (STATE_DIR.exists()) {
+            try {
+                Files.walkFileTree(STATE_DIR.toPath(), new SimpleFileVisitor<Path>() {
+                    @Override
+                    public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+                        Files.delete(file);
+                        return FileVisitResult.CONTINUE;
+                    }
+    
+                    @Override
+                    public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
+                        Files.delete(dir);
+                        return FileVisitResult.CONTINUE;
+                    }
+    
+                });
+            } catch (IOException e) {
+                // do nothing
+            }
+        }
+    }
+
+    @Test
+    public void testTopologyMetadata() {
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        builder.addSource("source-1", "topic-1");
+        builder.addSource("source-2", "topic-2", "topic-3");
+        builder.addProcessor("processor-1", new MockProcessorDef(), "source-1");
+        builder.addProcessor("processor-2", new MockProcessorDef(), "source-1", "source-2");
+        builder.addSink("sink-1", "topic-3", "processor-1");
+        builder.addSink("sink-2", "topic-4", "processor-1", "processor-2");
+
+        final ProcessorTopology topology = builder.build();
+
+        assertEquals(6, topology.processors().size());
+
+        assertEquals(2, topology.sources().size());
+
+        assertEquals(3, topology.sourceTopics().size());
+
+        assertNotNull(topology.source("topic-1"));
+
+        assertNotNull(topology.source("topic-2"));
+
+        assertNotNull(topology.source("topic-3"));
+
+        assertEquals(topology.source("topic-2"), topology.source("topic-3"));
+    }
+
+    @Test
+    public void testDrivingSimpleTopology() {
+        driver = new ProcessorTopologyTestDriver(config, createSimpleTopology());
+        driver.process(INPUT_TOPIC, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1");
+        assertNoOutputRecord(OUTPUT_TOPIC_2);
+
+        driver.process(INPUT_TOPIC, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2");
+        assertNoOutputRecord(OUTPUT_TOPIC_2);
+
+        driver.process(INPUT_TOPIC, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER);
+        assertNoOutputRecord(OUTPUT_TOPIC_2);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3");
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4");
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key5", "value5");
+    }
+
+    @Test
+    public void testDrivingMultiplexingTopology() {
+        driver = new ProcessorTopologyTestDriver(config, createMultiplexingTopology());
+        driver.process(INPUT_TOPIC, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)");
+
+        driver.process(INPUT_TOPIC, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2(1)");
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2(2)");
+
+        driver.process(INPUT_TOPIC, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3(1)");
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4(1)");
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key5", "value5(1)");
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key3", "value3(2)");
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key4", "value4(2)");
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key5", "value5(2)");
+    }
+
+    @Test
+    public void testDrivingStatefulTopology() {
+        String storeName = "entries";
+        driver = new ProcessorTopologyTestDriver(config, createStatefulTopology(storeName), storeName);
+        driver.process(INPUT_TOPIC, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC, "key1", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
+        assertNoOutputRecord(OUTPUT_TOPIC_1);
+
+        KeyValueStore<String, String> store = driver.getKeyValueStore("entries");
+        assertEquals("value4", store.get("key1"));
+        assertEquals("value2", store.get("key2"));
+        assertEquals("value3", store.get("key3"));
+        assertNull(store.get("key4"));
+    }
+
+    protected void assertNextOutputRecord(String topic, String key, String value) {
+        assertProducerRecord(driver.readOutput(topic, STRING_DESERIALIZER, STRING_DESERIALIZER), topic, key, value);
+    }
+
+    protected void assertNoOutputRecord(String topic) {
+        assertNull(driver.readOutput(topic));
+    }
+
+    private void assertProducerRecord(ProducerRecord<String, String> record, String topic, String key, String value) {
+        assertEquals(topic, record.topic());
+        assertEquals(key, record.key());
+        assertEquals(value, record.value());
+        // Kafka Streaming doesn't set the partition, so it's always null
+        assertNull(record.partition());
+    }
+
+    protected TopologyBuilder createSimpleTopology() {
+        return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC)
+                                    .addProcessor("processor", define(new ForwardingProcessor()), "source")
+                                    .addSink("sink", OUTPUT_TOPIC_1, "processor");
+    }
+
+    protected TopologyBuilder createMultiplexingTopology() {
+        return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC)
+                                    .addProcessor("processor", define(new MultiplexingProcessor(2)), "source")
+                                    .addSink("sink1", OUTPUT_TOPIC_1, "processor")
+                                    .addSink("sink2", OUTPUT_TOPIC_2, "processor");
+    }
+
+    protected TopologyBuilder createStatefulTopology(String storeName) {
+        return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC)
+                                    .addProcessor("processor", define(new StatefulProcessor(storeName)), "source")
+                                    .addSink("counts", OUTPUT_TOPIC_1, "processor");
+    }
+
+    /**
+     * A processor that simply forwards all messages to all children.
+     */
+    protected static class ForwardingProcessor extends AbstractProcessor<String, String> {
+
+        @Override
+        public void process(String key, String value) {
+            context().forward(key, value);
+        }
+
+        @Override
+        public void punctuate(long streamTime) {
+            context().forward(Long.toString(streamTime), "punctuate");
+        }
+    }
+
+    /**
+     * A processor that forwards slightly-modified messages to each child.
+     */
+    protected static class MultiplexingProcessor extends AbstractProcessor<String, String> {
+
+        private final int numChildren;
+
+        public MultiplexingProcessor(int numChildren) {
+            this.numChildren = numChildren;
+        }
+
+        @Override
+        public void process(String key, String value) {
+            for (int i = 0; i != numChildren; ++i) {
+                context().forward(key, value + "(" + (i + 1) + ")", i);
+            }
+        }
+
+        @Override
+        public void punctuate(long streamTime) {
+            for (int i = 0; i != numChildren; ++i) {
+                context().forward(Long.toString(streamTime), "punctuate(" + (i + 1) + ")", i);
+            }
+        }
+    }
+
+    /**
+     * A processor that stores each key-value pair in an in-memory key-value store registered with the context. When
+     * {@link #punctuate(long)} is called, it outputs the total number of entries in the store.
+     */
+    protected static class StatefulProcessor extends AbstractProcessor<String, String> {
+
+        private KeyValueStore<String, String> store;
+        private final String storeName;
+
+        public StatefulProcessor(String storeName) {
+            this.storeName = storeName;
+        }
+
+        @Override
+        public void init(ProcessorContext context) {
+            super.init(context);
+            store = new InMemoryKeyValueStore<>(storeName, context);
+        }
+
+        @Override
+        public void process(String key, String value) {
+            store.put(key, value);
+        }
+
+        @Override
+        public void punctuate(long streamTime) {
+            int count = 0;
+            for (KeyValueIterator<String, String> iter = store.all(); iter.hasNext();) {
+                iter.next();
+                ++count;
+            }
+            context().forward(Long.toString(streamTime), count);
+        }
+    }
+
+    protected ProcessorDef define(final Processor processor) {
+        return new ProcessorDef() {
+            @Override
+            public Processor instance() {
+                return processor;
+            }
+        };
+    }
+
+    public static class CustomTimestampExtractor implements TimestampExtractor {
+        @Override
+        public long extract(ConsumerRecord<Object, Object> record) {
+            return timestamp;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
new file mode 100644
index 0000000..b1403bd
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.junit.Test;
+
+import java.util.ArrayList;
+
+import static org.junit.Assert.assertEquals;
+
+public class PunctuationQueueTest {
+
+    @Test
+    public void testPunctuationInterval() {
+        TestProcessor processor = new TestProcessor();
+        ProcessorNode<String, String> node = new ProcessorNode<>("test", processor);
+        PunctuationQueue queue = new PunctuationQueue();
+
+        PunctuationSchedule sched = new PunctuationSchedule(node, 100L);
+        final long now = sched.timestamp - 100L;
+
+        queue.schedule(sched);
+
+        Punctuator punctuator = new Punctuator() {
+            public void punctuate(ProcessorNode node, long time) {
+                node.processor().punctuate(time);
+            }
+        };
+
+        queue.mayPunctuate(now, punctuator);
+        assertEquals(0, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 99L, punctuator);
+        assertEquals(0, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 100L, punctuator);
+        assertEquals(1, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 199L, punctuator);
+        assertEquals(1, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 200L, punctuator);
+        assertEquals(2, processor.punctuatedAt.size());
+    }
+
+    private static class TestProcessor implements Processor<String, String> {
+
+        public final ArrayList<Long> punctuatedAt = new ArrayList<>();
+
+        @Override
+        public void init(ProcessorContext context) {
+        }
+
+        @Override
+        public void process(String key, String value) {
+        }
+
+        @Override
+        public void punctuate(long streamTime) {
+            punctuatedAt.add(streamTime);
+        }
+
+        @Override
+        public void close() {
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
new file mode 100644
index 0000000..6e86410
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.test.MockSourceNode;
+import org.apache.kafka.test.MockTimestampExtractor;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class RecordQueueTest {
+    private final Serializer<Integer> intSerializer = new IntegerSerializer();
+    private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
+    private final TimestampExtractor timestampExtractor = new MockTimestampExtractor();
+    private final RecordQueue queue = new RecordQueue(new TopicPartition("topic", 1), new MockSourceNode<>(intDeserializer, intDeserializer));
+
+    private final byte[] recordValue = intSerializer.serialize(null, 10);
+    private final byte[] recordKey = intSerializer.serialize(null, 1);
+
+    @Test
+    public void testTimeTracking() {
+
+        assertTrue(queue.isEmpty());
+
+        // add three 3 out-of-order records with timestamp 2, 1, 3
+        List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
+            new ConsumerRecord<>("topic", 1, 2, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 1, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 3, recordKey, recordValue));
+
+        queue.addRawRecords(list1, timestampExtractor);
+
+        assertEquals(3, queue.size());
+        assertEquals(TimestampTracker.NOT_KNOWN, queue.timestamp());
+
+        // poll the first record, now with 1, 3
+        assertEquals(2L, queue.poll().timestamp);
+        assertEquals(2, queue.size());
+        assertEquals(1L, queue.timestamp());
+
+        // poll the second record, now with 3
+        assertEquals(1L, queue.poll().timestamp);
+        assertEquals(1, queue.size());
+        assertEquals(3L, queue.timestamp());
+
+        // add three 3 out-of-order records with timestamp 4, 1, 2
+        // now with 3, 4, 1, 2
+        List<ConsumerRecord<byte[], byte[]>> list2 = Arrays.asList(
+            new ConsumerRecord<>("topic", 1, 4, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 1, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 2, recordKey, recordValue));
+
+        queue.addRawRecords(list2, timestampExtractor);
+
+        assertEquals(4, queue.size());
+        assertEquals(3L, queue.timestamp());
+
+        // poll the third record, now with 4, 1, 2
+        assertEquals(3L, queue.poll().timestamp);
+        assertEquals(3, queue.size());
+        assertEquals(3L, queue.timestamp());
+
+        // poll the rest records
+        assertEquals(4L, queue.poll().timestamp);
+        assertEquals(3L, queue.timestamp());
+
+        assertEquals(1L, queue.poll().timestamp);
+        assertEquals(3L, queue.timestamp());
+
+        assertEquals(2L, queue.poll().timestamp);
+        assertEquals(0, queue.size());
+        assertEquals(3L, queue.timestamp());
+
+        // add three more records with 4, 5, 6
+        List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList(
+            new ConsumerRecord<>("topic", 1, 4, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 5, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 6, recordKey, recordValue));
+
+        queue.addRawRecords(list3, timestampExtractor);
+
+        assertEquals(3, queue.size());
+        assertEquals(3L, queue.timestamp());
+
+        // poll one record again, the timestamp should advance now
+        assertEquals(4L, queue.poll().timestamp);
+        assertEquals(2, queue.size());
+        assertEquals(5L, queue.timestamp());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
new file mode 100644
index 0000000..8dcfc40
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.test.MockSourceNode;
+import org.junit.Test;
+import org.junit.Before;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class StreamTaskTest {
+
+    private final Serializer<Integer> intSerializer = new IntegerSerializer();
+    private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
+    private final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
+
+    private final TopicPartition partition1 = new TopicPartition("topic1", 1);
+    private final TopicPartition partition2 = new TopicPartition("topic2", 1);
+    private final HashSet<TopicPartition> partitions = new HashSet<>(Arrays.asList(partition1, partition2));
+
+    private final MockSourceNode source1 = new MockSourceNode<>(intDeserializer, intDeserializer);
+    private final MockSourceNode source2 = new MockSourceNode<>(intDeserializer, intDeserializer);
+    private final ProcessorTopology topology = new ProcessorTopology(
+        Arrays.asList((ProcessorNode) source1, (ProcessorNode) source2),
+        new HashMap<String, SourceNode>() {
+            {
+                put("topic1", source1);
+                put("topic2", source2);
+            }
+        });
+
+    private final StreamingConfig config = new StreamingConfig(new Properties() {
+        {
+            setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+            setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+            setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+            setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+            setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
+            setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
+            setProperty(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
+        }
+    });
+
+    private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+    private final MockProducer<byte[], byte[]> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer);
+    private final MockConsumer<byte[], byte[]> restoreStateConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+ 
+    private final byte[] recordValue = intSerializer.serialize(null, 10);
+    private final byte[] recordKey = intSerializer.serialize(null, 1);
+
+
+    @Before
+    public void setup() {
+        consumer.assign(Arrays.asList(partition1, partition2));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testProcessOrder() {
+        StreamTask task = new StreamTask(0, consumer, producer, restoreStateConsumer, partitions, topology, config);
+
+        task.addRecords(partition1, records(
+            new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue),
+            new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, recordKey, recordValue),
+            new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, recordKey, recordValue)
+        ));
+
+        task.addRecords(partition2, records(
+            new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25, recordKey, recordValue),
+            new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, recordKey, recordValue),
+            new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, recordKey, recordValue)
+        ));
+
+        assertEquals(task.process(), 5);
+        assertEquals(source1.numReceived, 1);
+        assertEquals(source2.numReceived, 0);
+
+        assertEquals(task.process(), 4);
+        assertEquals(source1.numReceived, 1);
+        assertEquals(source2.numReceived, 1);
+
+        assertEquals(task.process(), 3);
+        assertEquals(source1.numReceived, 2);
+        assertEquals(source2.numReceived, 1);
+
+        assertEquals(task.process(), 2);
+        assertEquals(source1.numReceived, 3);
+        assertEquals(source2.numReceived, 1);
+
+        assertEquals(task.process(), 1);
+        assertEquals(source1.numReceived, 3);
+        assertEquals(source2.numReceived, 2);
+
+        assertEquals(task.process(), 0);
+        assertEquals(source1.numReceived, 3);
+        assertEquals(source2.numReceived, 3);
+
+        task.close();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testPauseResume() {
+        StreamTask task = new StreamTask(1, consumer, producer, restoreStateConsumer, partitions, topology, config);
+
+        task.addRecords(partition1, records(
+            new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue),
+            new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, recordKey, recordValue)
+        ));
+
+        task.addRecords(partition2, records(
+            new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, recordKey, recordValue),
+            new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, recordKey, recordValue),
+            new ConsumerRecord<>(partition2.topic(), partition2.partition(), 55, recordKey, recordValue),
+            new ConsumerRecord<>(partition2.topic(), partition2.partition(), 65, recordKey, recordValue)
+        ));
+
+        assertEquals(task.process(), 5);
+        assertEquals(source1.numReceived, 1);
+        assertEquals(source2.numReceived, 0);
+
+        assertEquals(consumer.paused().size(), 1);
+        assertTrue(consumer.paused().contains(partition2));
+
+        task.addRecords(partition1, records(
+            new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, recordKey, recordValue),
+            new ConsumerRecord<>(partition1.topic(), partition1.partition(), 40, recordKey, recordValue),
+            new ConsumerRecord<>(partition1.topic(), partition1.partition(), 50, recordKey, recordValue)
+        ));
+
+        assertEquals(consumer.paused().size(), 2);
+        assertTrue(consumer.paused().contains(partition1));
+        assertTrue(consumer.paused().contains(partition2));
+
+        assertEquals(task.process(), 7);
+        assertEquals(source1.numReceived, 1);
+        assertEquals(source2.numReceived, 1);
+
+        assertEquals(consumer.paused().size(), 1);
+        assertTrue(consumer.paused().contains(partition1));
+
+        assertEquals(task.process(), 6);
+        assertEquals(source1.numReceived, 2);
+        assertEquals(source2.numReceived, 1);
+
+        assertEquals(consumer.paused().size(), 0);
+
+        task.close();
+    }
+
+    private Iterable<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]>... recs) {
+        return Arrays.asList(recs);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
new file mode 100644
index 0000000..1f3e541
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -0,0 +1,389 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+public class StreamThreadTest {
+
+    private TopicPartition t1p1 = new TopicPartition("topic1", 1);
+    private TopicPartition t1p2 = new TopicPartition("topic1", 2);
+    private TopicPartition t2p1 = new TopicPartition("topic2", 1);
+    private TopicPartition t2p2 = new TopicPartition("topic2", 2);
+
+    private Properties configProps() {
+        return new Properties() {
+            {
+                setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+                setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+                setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+                setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+                setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
+                setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
+                setProperty(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
+            }
+        };
+    }
+
+    private static class TestStreamTask extends StreamTask {
+        public boolean committed = false;
+
+        public TestStreamTask(int id,
+                              Consumer<byte[], byte[]> consumer,
+                              Producer<byte[], byte[]> producer,
+                              Consumer<byte[], byte[]> restoreConsumer,
+                              Collection<TopicPartition> partitions,
+                              ProcessorTopology topology,
+                              StreamingConfig config) {
+            super(id, consumer, producer, restoreConsumer, partitions, topology, config);
+        }
+
+        @Override
+        public void commit() {
+            super.commit();
+            committed = true;
+        }
+    }
+
+    private ByteArraySerializer serializer = new ByteArraySerializer();
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testPartitionAssignmentChange() throws Exception {
+        StreamingConfig config = new StreamingConfig(configProps());
+
+        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
+        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        final MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
+
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.addSource("source1", "topic1");
+        builder.addSource("source2", "topic2");
+
+        StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, new SystemTime()) {
+            @Override
+            protected StreamTask createStreamTask(int id, Collection<TopicPartition> partitionsForTask) {
+                return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, builder.build(), config);
+            }
+        };
+
+        ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
+
+        assertTrue(thread.tasks().isEmpty());
+
+        List<TopicPartition> revokedPartitions;
+        List<TopicPartition> assignedPartitions;
+        Set<TopicPartition> expectedGroup1;
+        Set<TopicPartition> expectedGroup2;
+
+        revokedPartitions = Collections.emptyList();
+        assignedPartitions = Collections.singletonList(t1p1);
+        expectedGroup1 = new HashSet<>(Arrays.asList(t1p1));
+
+        rebalanceListener.onPartitionsRevoked(revokedPartitions);
+        rebalanceListener.onPartitionsAssigned(assignedPartitions);
+
+        assertTrue(thread.tasks().containsKey(1));
+        assertEquals(expectedGroup1, thread.tasks().get(1).partitions());
+        assertEquals(1, thread.tasks().size());
+
+        revokedPartitions = assignedPartitions;
+        assignedPartitions = Collections.singletonList(t1p2);
+        expectedGroup2 = new HashSet<>(Arrays.asList(t1p2));
+
+        rebalanceListener.onPartitionsRevoked(revokedPartitions);
+        rebalanceListener.onPartitionsAssigned(assignedPartitions);
+
+        assertTrue(thread.tasks().containsKey(2));
+        assertEquals(expectedGroup2, thread.tasks().get(2).partitions());
+        assertEquals(1, thread.tasks().size());
+
+        revokedPartitions = assignedPartitions;
+        assignedPartitions = Arrays.asList(t1p1, t1p2);
+        expectedGroup1 = new HashSet<>(Collections.singleton(t1p1));
+        expectedGroup2 = new HashSet<>(Collections.singleton(t1p2));
+
+        rebalanceListener.onPartitionsRevoked(revokedPartitions);
+        rebalanceListener.onPartitionsAssigned(assignedPartitions);
+
+        assertTrue(thread.tasks().containsKey(1));
+        assertTrue(thread.tasks().containsKey(2));
+        assertEquals(expectedGroup1, thread.tasks().get(1).partitions());
+        assertEquals(expectedGroup2, thread.tasks().get(2).partitions());
+        assertEquals(2, thread.tasks().size());
+
+        revokedPartitions = assignedPartitions;
+        assignedPartitions = Arrays.asList(t1p1, t1p2, t2p1, t2p2);
+        expectedGroup1 = new HashSet<>(Arrays.asList(t1p1, t2p1));
+        expectedGroup2 = new HashSet<>(Arrays.asList(t1p2, t2p2));
+
+        rebalanceListener.onPartitionsRevoked(revokedPartitions);
+        rebalanceListener.onPartitionsAssigned(assignedPartitions);
+
+        assertTrue(thread.tasks().containsKey(1));
+        assertTrue(thread.tasks().containsKey(2));
+        assertEquals(expectedGroup1, thread.tasks().get(1).partitions());
+        assertEquals(expectedGroup2, thread.tasks().get(2).partitions());
+        assertEquals(2, thread.tasks().size());
+
+        revokedPartitions = assignedPartitions;
+        assignedPartitions = Collections.emptyList();
+
+        rebalanceListener.onPartitionsRevoked(revokedPartitions);
+        rebalanceListener.onPartitionsAssigned(assignedPartitions);
+
+        assertTrue(thread.tasks().isEmpty());
+    }
+
+    @Test
+    public void testMaybeClean() throws Exception {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            final long cleanupDelay = 1000L;
+            Properties props = configProps();
+            props.setProperty(StreamingConfig.STATE_CLEANUP_DELAY_MS_CONFIG, Long.toString(cleanupDelay));
+            props.setProperty(StreamingConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
+
+            StreamingConfig config = new StreamingConfig(props);
+
+            File stateDir1 = new File(baseDir, "1");
+            File stateDir2 = new File(baseDir, "2");
+            File stateDir3 = new File(baseDir, "3");
+            File extraDir = new File(baseDir, "X");
+            stateDir1.mkdir();
+            stateDir2.mkdir();
+            stateDir3.mkdir();
+            extraDir.mkdir();
+
+            MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
+            MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+            final MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
+            MockTime mockTime = new MockTime();
+
+            TopologyBuilder builder = new TopologyBuilder();
+            builder.addSource("source1", "topic1");
+
+            StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, mockTime) {
+                @Override
+                public void maybeClean() {
+                    super.maybeClean();
+                }
+                @Override
+                protected StreamTask createStreamTask(int id, Collection<TopicPartition> partitionsForTask) {
+                    return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, builder.build(), config);
+                }
+            };
+
+            ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
+
+            assertTrue(thread.tasks().isEmpty());
+            mockTime.sleep(cleanupDelay);
+
+            // all directories exist since an assignment didn't happen
+            assertTrue(stateDir1.exists());
+            assertTrue(stateDir2.exists());
+            assertTrue(stateDir3.exists());
+            assertTrue(extraDir.exists());
+
+            List<TopicPartition> revokedPartitions;
+            List<TopicPartition> assignedPartitions;
+            Map<Integer, StreamTask> prevTasks;
+
+            //
+            // Assign t1p1 and t1p2. This should create Task 1 & 2
+            //
+            revokedPartitions = Collections.emptyList();
+            assignedPartitions = Arrays.asList(t1p1, t1p2);
+            prevTasks = new HashMap(thread.tasks());
+
+            rebalanceListener.onPartitionsRevoked(revokedPartitions);
+            rebalanceListener.onPartitionsAssigned(assignedPartitions);
+
+            // there shouldn't be any previous task
+            assertTrue(prevTasks.isEmpty());
+
+            // task 1 & 2 are created
+            assertEquals(2, thread.tasks().size());
+
+            // all directories should still exit before the cleanup delay time
+            mockTime.sleep(cleanupDelay - 10L);
+            thread.maybeClean();
+            assertTrue(stateDir1.exists());
+            assertTrue(stateDir2.exists());
+            assertTrue(stateDir3.exists());
+            assertTrue(extraDir.exists());
+
+            // all state directories except for task 1 & 2 will be removed. the extra directory should still exists
+            mockTime.sleep(11L);
+            thread.maybeClean();
+            assertTrue(stateDir1.exists());
+            assertTrue(stateDir2.exists());
+            assertFalse(stateDir3.exists());
+            assertTrue(extraDir.exists());
+
+            //
+            // Revoke t1p1 and t1p2. This should remove Task 1 & 2
+            //
+            revokedPartitions = assignedPartitions;
+            assignedPartitions = Collections.emptyList();
+            prevTasks = new HashMap(thread.tasks());
+
+            rebalanceListener.onPartitionsRevoked(revokedPartitions);
+            rebalanceListener.onPartitionsAssigned(assignedPartitions);
+
+            // previous tasks should be committed
+            assertEquals(2, prevTasks.size());
+            for (StreamTask task : prevTasks.values()) {
+                assertTrue(((TestStreamTask) task).committed);
+                ((TestStreamTask) task).committed = false;
+            }
+
+            // no task
+            assertTrue(thread.tasks().isEmpty());
+
+            // all state directories for task 1 & 2 still exist before the cleanup delay time
+            mockTime.sleep(cleanupDelay - 10L);
+            thread.maybeClean();
+            assertTrue(stateDir1.exists());
+            assertTrue(stateDir2.exists());
+            assertFalse(stateDir3.exists());
+            assertTrue(extraDir.exists());
+
+            // all state directories for task 1 & 2 are removed
+            mockTime.sleep(11L);
+            thread.maybeClean();
+            assertFalse(stateDir1.exists());
+            assertFalse(stateDir2.exists());
+            assertFalse(stateDir3.exists());
+            assertTrue(extraDir.exists());
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    @Test
+    public void testMaybeCommit() throws Exception {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            final long commitInterval = 1000L;
+            Properties props = configProps();
+            props.setProperty(StreamingConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
+            props.setProperty(StreamingConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(commitInterval));
+
+            StreamingConfig config = new StreamingConfig(props);
+
+            MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
+            MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+            final MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+            MockTime mockTime = new MockTime();
+
+            TopologyBuilder builder = new TopologyBuilder();
+            builder.addSource("source1", "topic1");
+
+            StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, mockTime) {
+                @Override
+                public void maybeCommit() {
+                    super.maybeCommit();
+                }
+                @Override
+                protected StreamTask createStreamTask(int id, Collection<TopicPartition> partitionsForTask) {
+                    return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, builder.build(), config);
+                }
+            };
+
+            ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
+
+            List<TopicPartition> revokedPartitions;
+            List<TopicPartition> assignedPartitions;
+
+            //
+            // Assign t1p1 and t1p2. This should create Task 1 & 2
+            //
+            revokedPartitions = Collections.emptyList();
+            assignedPartitions = Arrays.asList(t1p1, t1p2);
+
+            rebalanceListener.onPartitionsRevoked(revokedPartitions);
+            rebalanceListener.onPartitionsAssigned(assignedPartitions);
+
+            assertEquals(2, thread.tasks().size());
+
+            // no task is committed before the commit interval
+            mockTime.sleep(commitInterval - 10L);
+            thread.maybeCommit();
+            for (StreamTask task : thread.tasks().values()) {
+                assertFalse(((TestStreamTask) task).committed);
+            }
+
+            // all tasks are committed after the commit interval
+            mockTime.sleep(11L);
+            thread.maybeCommit();
+            for (StreamTask task : thread.tasks().values()) {
+                assertTrue(((TestStreamTask) task).committed);
+                ((TestStreamTask) task).committed = false;
+            }
+
+            // no task is committed before the commit interval, again
+            mockTime.sleep(commitInterval - 10L);
+            thread.maybeCommit();
+            for (StreamTask task : thread.tasks().values()) {
+                assertFalse(((TestStreamTask) task).committed);
+            }
+
+            // all tasks are committed after the commit interval, again
+            mockTime.sleep(11L);
+            thread.maybeCommit();
+            for (StreamTask task : thread.tasks().values()) {
+                assertTrue(((TestStreamTask) task).committed);
+                ((TestStreamTask) task).committed = false;
+            }
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
new file mode 100644
index 0000000..2c42e6c
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.test;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.processor.internals.ProcessorTopology;
+
+import java.util.List;
+
+public class KStreamTestDriver {
+
+    private final ProcessorTopology topology;
+    private final MockProcessorContext context;
+    private ProcessorNode currNode;
+
+    public KStreamTestDriver(KStreamBuilder builder) {
+        this(builder, null, null);
+    }
+
+    public KStreamTestDriver(KStreamBuilder builder, Serializer<?> serializer, Deserializer<?> deserializer) {
+        this.topology = builder.build();
+        this.context = new MockProcessorContext(this, serializer, deserializer);
+
+        for (ProcessorNode node : topology.processors()) {
+            currNode = node;
+            try {
+                node.init(context);
+            } finally {
+                currNode = null;
+            }
+        }
+    }
+
+    public void process(String topicName, Object key, Object value) {
+        currNode = topology.source(topicName);
+        try {
+            forward(key, value);
+        } finally {
+            currNode = null;
+        }
+    }
+
+    public void setTime(long timestamp) {
+        context.setTime(timestamp);
+    }
+
+    public StateStore getStateStore(String name) {
+        return context.getStateStore(name);
+    }
+
+    @SuppressWarnings("unchecked")
+    public <K, V> void forward(K key, V value) {
+        ProcessorNode thisNode = currNode;
+        for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
+            currNode = childNode;
+            try {
+                childNode.process(key, value);
+            } finally {
+                currNode = thisNode;
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public <K, V> void forward(K key, V value, int childIndex) {
+        ProcessorNode thisNode = currNode;
+        ProcessorNode childNode = (ProcessorNode<K, V>) thisNode.children().get(childIndex);
+        currNode = childNode;
+        try {
+            childNode.process(key, value);
+        } finally {
+            currNode = thisNode;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
new file mode 100644
index 0000000..3fdfc82
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.RestoreFunc;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MockProcessorContext implements ProcessorContext {
+
+    private final KStreamTestDriver driver;
+    private final Serializer serializer;
+    private final Deserializer deserializer;
+
+    private Map<String, StateStore> storeMap = new HashMap<>();
+
+    long timestamp = -1L;
+
+    public MockProcessorContext(KStreamTestDriver driver, Serializer<?> serializer, Deserializer<?> deserializer) {
+        this.driver = driver;
+        this.serializer = serializer;
+        this.deserializer = deserializer;
+    }
+
+    public void setTime(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    public int id() {
+        return -1;
+    }
+
+    @Override
+    public boolean joinable() {
+        return true;
+    }
+
+    @Override
+    public Serializer<?> keySerializer() {
+        return serializer;
+    }
+
+    @Override
+    public Serializer<?> valueSerializer() {
+        return serializer;
+    }
+
+    @Override
+    public Deserializer<?> keyDeserializer() {
+        return deserializer;
+    }
+
+    @Override
+    public Deserializer<?> valueDeserializer() {
+        return deserializer;
+    }
+
+    @Override
+    public File stateDir() {
+        throw new UnsupportedOperationException("stateDir() not supported.");
+    }
+
+    @Override
+    public Metrics metrics() {
+        throw new UnsupportedOperationException("metrics() not supported.");
+    }
+
+    @Override
+    public void register(StateStore store, RestoreFunc func) {
+        if (func != null) throw new UnsupportedOperationException("RestoreFunc not supported.");
+        storeMap.put(store.name(), store);
+    }
+
+    @Override
+    public StateStore getStateStore(String name) {
+        return storeMap.get(name);
+    }
+
+    @Override
+    public void schedule(long interval) {
+        throw new UnsupportedOperationException("schedule() not supported");
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <K, V> void forward(K key, V value) {
+        driver.forward(key, value);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <K, V> void forward(K key, V value, int childIndex) {
+        driver.forward(key, value, childIndex);
+    }
+
+    @Override
+    public void commit() {
+        throw new UnsupportedOperationException("commit() not supported.");
+    }
+
+    @Override
+    public String topic() {
+        throw new UnsupportedOperationException("topic() not supported.");
+    }
+
+    @Override
+    public int partition() {
+        throw new UnsupportedOperationException("partition() not supported.");
+    }
+
+    @Override
+    public long offset() {
+        throw new UnsupportedOperationException("offset() not supported.");
+    }
+
+    @Override
+    public long timestamp() {
+        return this.timestamp;
+    }
+
+}


Mime
View raw message