http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
deleted file mode 100644
index 2a5ca9b..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import static org.apache.kafka.common.utils.Utils.mkSet;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-
-public class DefaultPartitionGrouperTest {
-
- private List<PartitionInfo> infos = Arrays.asList(
- new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]),
- new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]),
- new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]),
- new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]),
- new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0])
- );
-
- private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
-
- @Test
- public void testGrouping() {
- PartitionGrouper grouper = new DefaultPartitionGrouper();
- int topicGroupId;
- Map<TaskId, Set<TopicPartition>> expected;
- Map<Integer, Set<String>> topicGroups;
-
- topicGroups = new HashMap<>();
- topicGroups.put(0, mkSet("topic1"));
- topicGroups.put(1, mkSet("topic2"));
- grouper.topicGroups(topicGroups);
-
- expected = new HashMap<>();
- topicGroupId = 0;
- expected.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic1", 0)));
- expected.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic1", 1)));
- expected.put(new TaskId(topicGroupId, 2), mkSet(new TopicPartition("topic1", 2)));
- topicGroupId++;
- expected.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic2", 0)));
- expected.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic2", 1)));
-
- assertEquals(expected, grouper.partitionGroups(metadata));
-
- topicGroups = new HashMap<>();
- topicGroups.put(0, mkSet("topic1", "topic2"));
- grouper.topicGroups(topicGroups);
-
- expected = new HashMap<>();
- topicGroupId = 0;
- expected.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic1", 0), new TopicPartition("topic2", 0)));
- expected.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic1", 1), new TopicPartition("topic2", 1)));
- expected.put(new TaskId(topicGroupId, 2), mkSet(new TopicPartition("topic1", 2)));
-
- assertEquals(expected, grouper.partitionGroups(metadata));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
deleted file mode 100644
index b1b71b6..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-import static org.junit.Assert.assertEquals;
-
-import static org.apache.kafka.common.utils.Utils.mkSet;
-import static org.apache.kafka.common.utils.Utils.mkList;
-
-import org.apache.kafka.streams.processor.internals.ProcessorNode;
-import org.apache.kafka.streams.processor.internals.ProcessorTopology;
-import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.MockStateStoreSupplier;
-import org.junit.Test;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class TopologyBuilderTest {
-
- @Test(expected = TopologyException.class)
- public void testAddSourceWithSameName() {
- final TopologyBuilder builder = new TopologyBuilder();
-
- builder.addSource("source", "topic-1");
- builder.addSource("source", "topic-2");
- }
-
- @Test(expected = TopologyException.class)
- public void testAddSourceWithSameTopic() {
- final TopologyBuilder builder = new TopologyBuilder();
-
- builder.addSource("source", "topic-1");
- builder.addSource("source-2", "topic-1");
- }
-
- @Test(expected = TopologyException.class)
- public void testAddProcessorWithSameName() {
- final TopologyBuilder builder = new TopologyBuilder();
-
- builder.addSource("source", "topic-1");
- builder.addProcessor("processor", new MockProcessorSupplier(), "source");
- builder.addProcessor("processor", new MockProcessorSupplier(), "source");
- }
-
- @Test(expected = TopologyException.class)
- public void testAddProcessorWithWrongParent() {
- final TopologyBuilder builder = new TopologyBuilder();
-
- builder.addProcessor("processor", new MockProcessorSupplier(), "source");
- }
-
- @Test(expected = TopologyException.class)
- public void testAddProcessorWithSelfParent() {
- final TopologyBuilder builder = new TopologyBuilder();
-
- builder.addProcessor("processor", new MockProcessorSupplier(), "processor");
- }
-
- @Test(expected = TopologyException.class)
- public void testAddSinkWithSameName() {
- final TopologyBuilder builder = new TopologyBuilder();
-
- builder.addSource("source", "topic-1");
- builder.addSink("sink", "topic-2", "source");
- builder.addSink("sink", "topic-3", "source");
- }
-
- @Test(expected = TopologyException.class)
- public void testAddSinkWithWrongParent() {
- final TopologyBuilder builder = new TopologyBuilder();
-
- builder.addSink("sink", "topic-2", "source");
- }
-
- @Test(expected = TopologyException.class)
- public void testAddSinkWithSelfParent() {
- final TopologyBuilder builder = new TopologyBuilder();
-
- builder.addSink("sink", "topic-2", "sink");
- }
-
- @Test
- public void testSourceTopics() {
- final TopologyBuilder builder = new TopologyBuilder();
-
- builder.addSource("source-1", "topic-1");
- builder.addSource("source-2", "topic-2");
- builder.addSource("source-3", "topic-3");
-
- assertEquals(3, builder.sourceTopics().size());
- }
-
- @Test(expected = TopologyException.class)
- public void testAddStateStoreWithNonExistingProcessor() {
- final TopologyBuilder builder = new TopologyBuilder();
-
- builder.addStateStore(new MockStateStoreSupplier("store", false), "no-such-processsor");
- }
-
- @Test(expected = TopologyException.class)
- public void testAddStateStoreWithSource() {
- final TopologyBuilder builder = new TopologyBuilder();
-
- builder.addSource("source-1", "topic-1");
- builder.addStateStore(new MockStateStoreSupplier("store", false), "source-1");
- }
-
- @Test(expected = TopologyException.class)
- public void testAddStateStoreWithSink() {
- final TopologyBuilder builder = new TopologyBuilder();
-
- builder.addSink("sink-1", "topic-1");
- builder.addStateStore(new MockStateStoreSupplier("store", false), "sink-1");
- }
-
- @Test(expected = TopologyException.class)
- public void testAddStateStoreWithDuplicates() {
- final TopologyBuilder builder = new TopologyBuilder();
-
- builder.addStateStore(new MockStateStoreSupplier("store", false));
- builder.addStateStore(new MockStateStoreSupplier("store", false));
- }
-
- @Test
- public void testAddStateStore() {
- final TopologyBuilder builder = new TopologyBuilder();
- List<StateStoreSupplier> suppliers;
-
- StateStoreSupplier supplier = new MockStateStoreSupplier("store-1", false);
- builder.addStateStore(supplier);
- suppliers = builder.build(null).stateStoreSuppliers();
- assertEquals(0, suppliers.size());
-
- builder.addSource("source-1", "topic-1");
- builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
- builder.connectProcessorAndStateStores("processor-1", "store-1");
- suppliers = builder.build(null).stateStoreSuppliers();
- assertEquals(1, suppliers.size());
- assertEquals(supplier.name(), suppliers.get(0).name());
- }
-
- @Test
- public void testTopicGroups() {
- final TopologyBuilder builder = new TopologyBuilder();
-
- builder.addSource("source-1", "topic-1", "topic-1x");
- builder.addSource("source-2", "topic-2");
- builder.addSource("source-3", "topic-3");
- builder.addSource("source-4", "topic-4");
- builder.addSource("source-5", "topic-5");
-
- builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
-
- builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2", "processor-1");
- builder.copartitionSources(mkList("source-1", "source-2"));
-
- builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
-
- Map<Integer, Set<String>> topicGroups = builder.topicGroups();
-
- Map<Integer, Set<String>> expectedTopicGroups = new HashMap<>();
- expectedTopicGroups.put(0, mkSet("topic-1", "topic-1x", "topic-2"));
- expectedTopicGroups.put(1, mkSet("topic-3", "topic-4"));
- expectedTopicGroups.put(2, mkSet("topic-5"));
-
- assertEquals(3, topicGroups.size());
- assertEquals(expectedTopicGroups, topicGroups);
-
- Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
-
- assertEquals(mkSet(mkSet("topic-1", "topic-1x", "topic-2")), new HashSet<>(copartitionGroups));
- }
-
- @Test
- public void testTopicGroupsByStateStore() {
- final TopologyBuilder builder = new TopologyBuilder();
-
- builder.addSource("source-1", "topic-1", "topic-1x");
- builder.addSource("source-2", "topic-2");
- builder.addSource("source-3", "topic-3");
- builder.addSource("source-4", "topic-4");
- builder.addSource("source-5", "topic-5");
-
- builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
- builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2");
- builder.addStateStore(new MockStateStoreSupplier("strore-1", false), "processor-1", "processor-2");
-
- builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3");
- builder.addProcessor("processor-4", new MockProcessorSupplier(), "source-4");
- builder.addStateStore(new MockStateStoreSupplier("strore-2", false), "processor-3", "processor-4");
-
- Map<Integer, Set<String>> topicGroups = builder.topicGroups();
-
- Map<Integer, Set<String>> expectedTopicGroups = new HashMap<>();
- expectedTopicGroups.put(0, mkSet("topic-1", "topic-1x", "topic-2"));
- expectedTopicGroups.put(1, mkSet("topic-3", "topic-4"));
- expectedTopicGroups.put(2, mkSet("topic-5"));
-
- assertEquals(3, topicGroups.size());
- assertEquals(expectedTopicGroups, topicGroups);
- }
-
- @Test
- public void testBuild() {
- final TopologyBuilder builder = new TopologyBuilder();
-
- builder.addSource("source-1", "topic-1", "topic-1x");
- builder.addSource("source-2", "topic-2");
- builder.addSource("source-3", "topic-3");
- builder.addSource("source-4", "topic-4");
- builder.addSource("source-5", "topic-5");
-
- builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
- builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2", "processor-1");
- builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
-
- ProcessorTopology topology0 = builder.build(0);
- ProcessorTopology topology1 = builder.build(1);
- ProcessorTopology topology2 = builder.build(2);
-
- assertEquals(mkSet("source-1", "source-2", "processor-1", "processor-2"), nodeNames(topology0.processors()));
- assertEquals(mkSet("source-3", "source-4", "processor-3"), nodeNames(topology1.processors()));
- assertEquals(mkSet("source-5"), nodeNames(topology2.processors()));
- }
-
- private Set<String> nodeNames(Collection<ProcessorNode> nodes) {
- Set<String> nodeNames = new HashSet<>();
- for (ProcessorNode node : nodes) {
- nodeNames.add(node.name());
- }
- return nodeNames;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java
deleted file mode 100644
index 0a1f95c..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import static org.junit.Assert.assertEquals;
-
-import org.junit.Test;
-
-public class MinTimestampTrackerTest {
-
- private Stamped<String> elem(long timestamp) {
- return new Stamped<>("", timestamp);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testTracking() {
- TimestampTracker<String> tracker = new MinTimestampTracker<>();
-
- Object[] elems = new Object[]{
- elem(100), elem(101), elem(102), elem(98), elem(99), elem(100)
- };
-
- int insertionIndex = 0;
- int removalIndex = 0;
-
- // add 100
- tracker.addElement((Stamped<String>) elems[insertionIndex++]);
- assertEquals(100L, tracker.get());
-
- // add 101
- tracker.addElement((Stamped<String>) elems[insertionIndex++]);
- assertEquals(100L, tracker.get());
-
- // remove 100
- tracker.removeElement((Stamped<String>) elems[removalIndex++]);
- assertEquals(101L, tracker.get());
-
- // add 102
- tracker.addElement((Stamped<String>) elems[insertionIndex++]);
- assertEquals(101L, tracker.get());
-
- // add 98
- tracker.addElement((Stamped<String>) elems[insertionIndex++]);
- assertEquals(98L, tracker.get());
-
- // add 99
- tracker.addElement((Stamped<String>) elems[insertionIndex++]);
- assertEquals(98L, tracker.get());
-
- // add 100
- tracker.addElement((Stamped<String>) elems[insertionIndex++]);
- assertEquals(98L, tracker.get());
-
- // remove 101
- tracker.removeElement((Stamped<String>) elems[removalIndex++]);
- assertEquals(98L, tracker.get());
-
- // remove 102
- tracker.removeElement((Stamped<String>) elems[removalIndex++]);
- assertEquals(98L, tracker.get());
-
- // remove 98
- tracker.removeElement((Stamped<String>) elems[removalIndex++]);
- assertEquals(99L, tracker.get());
-
- // remove 99
- tracker.removeElement((Stamped<String>) elems[removalIndex++]);
- assertEquals(100L, tracker.get());
-
- // remove 100
- tracker.removeElement((Stamped<String>) elems[removalIndex++]);
- assertEquals(100L, tracker.get());
-
- assertEquals(insertionIndex, removalIndex);
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
deleted file mode 100644
index b91acdc..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.TimestampExtractor;
-import org.apache.kafka.test.MockSourceNode;
-import org.apache.kafka.test.MockTimestampExtractor;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-
-public class PartitionGroupTest {
- private final Serializer<Integer> intSerializer = new IntegerSerializer();
- private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
- private final TimestampExtractor timestampExtractor = new MockTimestampExtractor();
- private final TopicPartition partition1 = new TopicPartition("topic", 1);
- private final TopicPartition partition2 = new TopicPartition("topic", 2);
- private final RecordQueue queue1 = new RecordQueue(partition1, new MockSourceNode<>(intDeserializer, intDeserializer));
- private final RecordQueue queue2 = new RecordQueue(partition2, new MockSourceNode<>(intDeserializer, intDeserializer));
-
- private final byte[] recordValue = intSerializer.serialize(null, 10);
- private final byte[] recordKey = intSerializer.serialize(null, 1);
-
- private final PartitionGroup group = new PartitionGroup(new HashMap<TopicPartition, RecordQueue>() {
- {
- put(partition1, queue1);
- put(partition2, queue2);
- }
- }, timestampExtractor);
-
- @Test
- public void testTimeTracking() {
- assertEquals(0, group.numBuffered());
-
- // add three 3 records with timestamp 1, 3, 5 to partition-1
- List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
- new ConsumerRecord<>("topic", 1, 1, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 3, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 5, recordKey, recordValue));
-
- group.addRawRecords(partition1, list1);
-
- // add three 3 records with timestamp 2, 4, 6 to partition-2
- List<ConsumerRecord<byte[], byte[]>> list2 = Arrays.asList(
- new ConsumerRecord<>("topic", 1, 2, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 4, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 6, recordKey, recordValue));
-
- group.addRawRecords(partition2, list2);
-
- assertEquals(6, group.numBuffered());
- assertEquals(3, group.numBuffered(partition1));
- assertEquals(3, group.numBuffered(partition2));
- assertEquals(TimestampTracker.NOT_KNOWN, group.timestamp());
-
- StampedRecord record;
- PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo();
-
- // get one record
- record = group.nextRecord(info);
- assertEquals(partition1, info.partition());
- assertEquals(1L, record.timestamp);
- assertEquals(5, group.numBuffered());
- assertEquals(2, group.numBuffered(partition1));
- assertEquals(3, group.numBuffered(partition2));
- assertEquals(TimestampTracker.NOT_KNOWN, group.timestamp());
-
- // get one record, now the time should be advanced
- record = group.nextRecord(info);
- assertEquals(partition2, info.partition());
- assertEquals(2L, record.timestamp);
- assertEquals(4, group.numBuffered());
- assertEquals(2, group.numBuffered(partition1));
- assertEquals(2, group.numBuffered(partition2));
- assertEquals(3L, group.timestamp());
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
deleted file mode 100644
index c447f99..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ /dev/null
@@ -1,407 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.MockConsumer;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.state.OffsetCheckpoint;
-import org.apache.kafka.test.MockStateStoreSupplier;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.channels.FileLock;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
-
-public class ProcessorStateManagerTest {
-
- private class MockRestoreConsumer extends MockConsumer<byte[], byte[]> {
- private final Serializer<Integer> serializer = new IntegerSerializer();
-
- public TopicPartition assignedPartition = null;
- public TopicPartition seekPartition = null;
- public long seekOffset = -1L;
- public boolean seekToBeginingCalled = false;
- public boolean seekToEndCalled = false;
- private long endOffset = 0L;
- private long currentOffset = 0L;
-
- private ArrayList<ConsumerRecord<byte[], byte[]>> recordBuffer = new ArrayList<>();
-
- MockRestoreConsumer() {
- super(OffsetResetStrategy.EARLIEST);
-
- reset();
- }
-
- // reset this mock restore consumer for a state store registration
- public void reset() {
- assignedPartition = null;
- seekOffset = -1L;
- seekToBeginingCalled = false;
- seekToEndCalled = false;
- endOffset = 0L;
- recordBuffer.clear();
- }
-
- // buffer a record (we cannot use addRecord because we need to add records before asigning a partition)
- public void bufferRecord(ConsumerRecord<Integer, Integer> record) {
- recordBuffer.add(
- new ConsumerRecord<>(record.topic(), record.partition(), record.offset(),
- serializer.serialize(record.topic(), record.key()),
- serializer.serialize(record.topic(), record.value())));
- endOffset = record.offset();
-
- super.updateEndOffsets(Collections.singletonMap(assignedPartition, endOffset));
- }
-
- @Override
- public synchronized void assign(List<TopicPartition> partitions) {
- int numPartitions = partitions.size();
- if (numPartitions > 1)
- throw new IllegalArgumentException("RestoreConsumer: more than one partition specified");
-
- if (numPartitions == 1) {
- if (assignedPartition != null)
- throw new IllegalStateException("RestoreConsumer: partition already assigned");
- assignedPartition = partitions.get(0);
-
- // set the beginning offset to 0
- // NOTE: this is users responsible to set the initial lEO.
- super.updateBeginningOffsets(Collections.singletonMap(assignedPartition, 0L));
- }
-
- super.assign(partitions);
- }
-
- @Override
- public ConsumerRecords<byte[], byte[]> poll(long timeout) {
- // add buffered records to MockConsumer
- for (ConsumerRecord<byte[], byte[]> record : recordBuffer) {
- super.addRecord(record);
- }
- recordBuffer.clear();
-
- ConsumerRecords<byte[], byte[]> records = super.poll(timeout);
-
- // set the current offset
- Iterable<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records(assignedPartition);
- for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
- currentOffset = record.offset();
- }
-
- return records;
- }
-
- @Override
- public synchronized long position(TopicPartition partition) {
- if (!partition.equals(assignedPartition))
- throw new IllegalStateException("RestoreConsumer: unassigned partition");
-
- return currentOffset;
- }
-
- @Override
- public synchronized void seek(TopicPartition partition, long offset) {
- if (offset < 0)
- throw new IllegalArgumentException("RestoreConsumer: offset should not be negative");
-
- if (seekOffset >= 0)
- throw new IllegalStateException("RestoreConsumer: offset already seeked");
-
- seekPartition = partition;
- seekOffset = offset;
- currentOffset = offset;
- super.seek(partition, offset);
- }
-
- @Override
- public synchronized void seekToBeginning(TopicPartition... partitions) {
- if (partitions.length != 1)
- throw new IllegalStateException("RestoreConsumer: other than one partition specified");
-
- for (TopicPartition partition : partitions) {
- if (!partition.equals(assignedPartition))
- throw new IllegalStateException("RestoreConsumer: seek-to-end not on the assigned partition");
- }
-
- seekToBeginingCalled = true;
- currentOffset = 0L;
- }
-
- @Override
- public synchronized void seekToEnd(TopicPartition... partitions) {
- if (partitions.length != 1)
- throw new IllegalStateException("RestoreConsumer: other than one partition specified");
-
- for (TopicPartition partition : partitions) {
- if (!partition.equals(assignedPartition))
- throw new IllegalStateException("RestoreConsumer: seek-to-end not on the assigned partition");
- }
-
- seekToEndCalled = true;
- currentOffset = endOffset;
- }
- }
-
- @Test
- public void testLockStateDirectory() throws IOException {
- File baseDir = Files.createTempDirectory("test").toFile();
- try {
- FileLock lock;
-
- // the state manager locks the directory
- ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, new MockRestoreConsumer());
-
- try {
- // this should not get the lock
- lock = ProcessorStateManager.lockStateDirectory(baseDir);
- assertNull(lock);
- } finally {
- // by closing the state manager, release the lock
- stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
- }
-
- // now, this should get the lock
- lock = ProcessorStateManager.lockStateDirectory(baseDir);
- try {
- assertNotNull(lock);
- } finally {
- if (lock != null) lock.release();
- }
- } finally {
- Utils.delete(baseDir);
- }
- }
-
- @Test(expected = IllegalStateException.class)
- public void testNoTopic() throws IOException {
- File baseDir = Files.createTempDirectory("test").toFile();
- try {
- MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore("mockStore", false);
-
- ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, new MockRestoreConsumer());
- try {
- stateMgr.register(mockStateStore, mockStateStore.stateRestoreCallback);
- } finally {
- stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
- }
- } finally {
- Utils.delete(baseDir);
- }
- }
-
- @Test
- public void testRegisterPersistentStore() throws IOException {
- File baseDir = Files.createTempDirectory("test").toFile();
- try {
- long lastCheckpointedOffset = 10L;
- OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
- checkpoint.write(Collections.singletonMap(new TopicPartition("persistentStore", 2), lastCheckpointedOffset));
-
- MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
- restoreConsumer.updatePartitions("persistentStore", Arrays.asList(
- new PartitionInfo("persistentStore", 1, Node.noNode(), new Node[0], new Node[0]),
- new PartitionInfo("persistentStore", 2, Node.noNode(), new Node[0], new Node[0])
- ));
- restoreConsumer.updateEndOffsets(Collections.singletonMap(new TopicPartition("persistentStore", 2), 13L));
-
- MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent store
-
- ProcessorStateManager stateMgr = new ProcessorStateManager(2, baseDir, restoreConsumer);
- try {
- restoreConsumer.reset();
-
- ArrayList<Integer> expectedKeys = new ArrayList<>();
- for (int i = 1; i <= 3; i++) {
- long offset = (long) i;
- int key = i * 10;
- expectedKeys.add(key);
- restoreConsumer.bufferRecord(
- new ConsumerRecord<>("persistentStore", 2, offset, key, 0)
- );
- }
-
- stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
-
- assertEquals(new TopicPartition("persistentStore", 2), restoreConsumer.assignedPartition);
- assertEquals(lastCheckpointedOffset, restoreConsumer.seekOffset);
- assertFalse(restoreConsumer.seekToBeginingCalled);
- assertTrue(restoreConsumer.seekToEndCalled);
- assertEquals(expectedKeys, persistentStore.keys);
-
- } finally {
- stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
- }
-
- } finally {
- Utils.delete(baseDir);
- }
- }
-
- @Test
- public void testRegisterNonPersistentStore() throws IOException {
- File baseDir = Files.createTempDirectory("test").toFile();
- try {
- long lastCheckpointedOffset = 10L;
- OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
- checkpoint.write(Collections.singletonMap(new TopicPartition("persistentStore", 2), lastCheckpointedOffset));
-
- MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
- restoreConsumer.updatePartitions("nonPersistentStore", Arrays.asList(
- new PartitionInfo("nonPersistentStore", 1, Node.noNode(), new Node[0], new Node[0]),
- new PartitionInfo("nonPersistentStore", 2, Node.noNode(), new Node[0], new Node[0])
- ));
- restoreConsumer.updateEndOffsets(Collections.singletonMap(new TopicPartition("persistentStore", 2), 13L));
-
- MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore("nonPersistentStore", false); // non persistent store
-
- ProcessorStateManager stateMgr = new ProcessorStateManager(2, baseDir, restoreConsumer);
- try {
- restoreConsumer.reset();
-
- ArrayList<Integer> expectedKeys = new ArrayList<>();
- for (int i = 1; i <= 3; i++) {
- long offset = (long) (i + 100);
- int key = i;
- expectedKeys.add(i);
- restoreConsumer.bufferRecord(
- new ConsumerRecord<>("nonPersistentStore", 2, offset, key, 0)
- );
- }
-
- stateMgr.register(nonPersistentStore, nonPersistentStore.stateRestoreCallback);
-
- assertEquals(new TopicPartition("nonPersistentStore", 2), restoreConsumer.assignedPartition);
- assertEquals(0L, restoreConsumer.seekOffset);
- assertTrue(restoreConsumer.seekToBeginingCalled);
- assertTrue(restoreConsumer.seekToEndCalled);
- assertEquals(expectedKeys, nonPersistentStore.keys);
- } finally {
- stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
- }
- } finally {
- Utils.delete(baseDir);
- }
- }
-
- @Test
- public void testGetStore() throws IOException {
- File baseDir = Files.createTempDirectory("test").toFile();
- try {
- MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
- restoreConsumer.updatePartitions("mockStore", Arrays.asList(
- new PartitionInfo("mockStore", 1, Node.noNode(), new Node[0], new Node[0])
- ));
-
- MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore("mockStore", false);
-
- ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, restoreConsumer);
- try {
- stateMgr.register(mockStateStore, mockStateStore.stateRestoreCallback);
-
- assertNull(stateMgr.getStore("noSuchStore"));
- assertEquals(mockStateStore, stateMgr.getStore("mockStore"));
-
- } finally {
- stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
- }
- } finally {
- Utils.delete(baseDir);
- }
- }
-
- @Test
- public void testClose() throws IOException {
- File baseDir = Files.createTempDirectory("test").toFile();
- File checkpointFile = new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME);
- try {
- // write an empty checkpoint file
- OffsetCheckpoint oldCheckpoint = new OffsetCheckpoint(checkpointFile);
- oldCheckpoint.write(Collections.<TopicPartition, Long>emptyMap());
-
- MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
- restoreConsumer.updatePartitions("persistentStore", Arrays.asList(
- new PartitionInfo("persistentStore", 1, Node.noNode(), new Node[0], new Node[0])
- ));
- restoreConsumer.updatePartitions("nonPersistentStore", Arrays.asList(
- new PartitionInfo("nonPersistentStore", 1, Node.noNode(), new Node[0], new Node[0])
- ));
-
- // set up ack'ed offsets
- HashMap<TopicPartition, Long> ackedOffsets = new HashMap<>();
- ackedOffsets.put(new TopicPartition("persistentStore", 1), 123L);
- ackedOffsets.put(new TopicPartition("nonPersistentStore", 1), 456L);
- ackedOffsets.put(new TopicPartition("otherTopic", 1), 789L);
-
- MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true);
- MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore("nonPersistentStore", false);
-
- ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, restoreConsumer);
- try {
- // make sure the checkpoint file is deleted
- assertFalse(checkpointFile.exists());
-
- restoreConsumer.reset();
- stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
-
- restoreConsumer.reset();
- stateMgr.register(nonPersistentStore, nonPersistentStore.stateRestoreCallback);
- } finally {
- // close the state manager with the ack'ed offsets
- stateMgr.close(ackedOffsets);
- }
-
- // make sure all stores are closed, and the checkpoint file is written.
- assertTrue(persistentStore.flushed);
- assertTrue(persistentStore.closed);
- assertTrue(nonPersistentStore.flushed);
- assertTrue(nonPersistentStore.closed);
- assertTrue(checkpointFile.exists());
-
- // the checkpoint file should contain an offset from the persistent store only.
- OffsetCheckpoint newCheckpoint = new OffsetCheckpoint(checkpointFile);
- Map<TopicPartition, Long> checkpointedOffsets = newCheckpoint.read();
- assertEquals(1, checkpointedOffsets.size());
- assertEquals(new Long(123L + 1L), checkpointedOffsets.get(new TopicPartition("persistentStore", 1)));
- } finally {
- Utils.delete(baseDir);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
deleted file mode 100644
index 54096b2..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ /dev/null
@@ -1,311 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.TimestampExtractor;
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StateUtils;
-import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.ProcessorTopologyTestDriver;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.util.Properties;
-
-public class ProcessorTopologyTest {
-
- private static final Serializer<String> STRING_SERIALIZER = new StringSerializer();
- private static final Deserializer<String> STRING_DESERIALIZER = new StringDeserializer();
-
- protected static final String INPUT_TOPIC = "input-topic";
- protected static final String OUTPUT_TOPIC_1 = "output-topic-1";
- protected static final String OUTPUT_TOPIC_2 = "output-topic-2";
-
- private static long timestamp = 1000L;
-
- private ProcessorTopologyTestDriver driver;
- private StreamingConfig config;
-
- @Before
- public void setup() {
- // Create a new directory in which we'll put all of the state for this test, enabling running tests in parallel ...
- File localState = StateUtils.tempDir();
- Properties props = new Properties();
- props.setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
- props.setProperty(StreamingConfig.STATE_DIR_CONFIG, localState.getAbsolutePath());
- props.setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
- props.setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- props.setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- props.setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- props.setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- this.config = new StreamingConfig(props);
- }
-
- @After
- public void cleanup() {
- if (driver != null) {
- driver.close();
- }
- driver = null;
- }
-
- @Test
- public void testTopologyMetadata() {
- final TopologyBuilder builder = new TopologyBuilder();
-
- builder.addSource("source-1", "topic-1");
- builder.addSource("source-2", "topic-2", "topic-3");
- builder.addProcessor("processor-1", new MockProcessorSupplier<>(), "source-1");
- builder.addProcessor("processor-2", new MockProcessorSupplier<>(), "source-1", "source-2");
- builder.addSink("sink-1", "topic-3", "processor-1");
- builder.addSink("sink-2", "topic-4", "processor-1", "processor-2");
-
- final ProcessorTopology topology = builder.build(null);
-
- assertEquals(6, topology.processors().size());
-
- assertEquals(2, topology.sources().size());
-
- assertEquals(3, topology.sourceTopics().size());
-
- assertNotNull(topology.source("topic-1"));
-
- assertNotNull(topology.source("topic-2"));
-
- assertNotNull(topology.source("topic-3"));
-
- assertEquals(topology.source("topic-2"), topology.source("topic-3"));
- }
-
- @Test
- public void testDrivingSimpleTopology() {
- driver = new ProcessorTopologyTestDriver(config, createSimpleTopology());
- driver.process(INPUT_TOPIC, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
- assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1");
- assertNoOutputRecord(OUTPUT_TOPIC_2);
-
- driver.process(INPUT_TOPIC, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
- assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2");
- assertNoOutputRecord(OUTPUT_TOPIC_2);
-
- driver.process(INPUT_TOPIC, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
- driver.process(INPUT_TOPIC, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
- driver.process(INPUT_TOPIC, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER);
- assertNoOutputRecord(OUTPUT_TOPIC_2);
- assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3");
- assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4");
- assertNextOutputRecord(OUTPUT_TOPIC_1, "key5", "value5");
- }
-
- @Test
- public void testDrivingMultiplexingTopology() {
- driver = new ProcessorTopologyTestDriver(config, createMultiplexingTopology());
- driver.process(INPUT_TOPIC, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
- assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
- assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)");
-
- driver.process(INPUT_TOPIC, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
- assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2(1)");
- assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2(2)");
-
- driver.process(INPUT_TOPIC, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
- driver.process(INPUT_TOPIC, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
- driver.process(INPUT_TOPIC, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER);
- assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3(1)");
- assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4(1)");
- assertNextOutputRecord(OUTPUT_TOPIC_1, "key5", "value5(1)");
- assertNextOutputRecord(OUTPUT_TOPIC_2, "key3", "value3(2)");
- assertNextOutputRecord(OUTPUT_TOPIC_2, "key4", "value4(2)");
- assertNextOutputRecord(OUTPUT_TOPIC_2, "key5", "value5(2)");
- }
-
- @Test
- public void testDrivingStatefulTopology() {
- String storeName = "entries";
- driver = new ProcessorTopologyTestDriver(config, createStatefulTopology(storeName), storeName);
- driver.process(INPUT_TOPIC, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
- driver.process(INPUT_TOPIC, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
- driver.process(INPUT_TOPIC, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
- driver.process(INPUT_TOPIC, "key1", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
- assertNoOutputRecord(OUTPUT_TOPIC_1);
-
- KeyValueStore<String, String> store = driver.getKeyValueStore("entries");
- assertEquals("value4", store.get("key1"));
- assertEquals("value2", store.get("key2"));
- assertEquals("value3", store.get("key3"));
- assertNull(store.get("key4"));
- }
-
- protected void assertNextOutputRecord(String topic, String key, String value) {
- assertProducerRecord(driver.readOutput(topic, STRING_DESERIALIZER, STRING_DESERIALIZER), topic, key, value);
- }
-
- protected void assertNoOutputRecord(String topic) {
- assertNull(driver.readOutput(topic));
- }
-
- private void assertProducerRecord(ProducerRecord<String, String> record, String topic, String key, String value) {
- assertEquals(topic, record.topic());
- assertEquals(key, record.key());
- assertEquals(value, record.value());
- // Kafka Streaming doesn't set the partition, so it's always null
- assertNull(record.partition());
- }
-
- protected TopologyBuilder createSimpleTopology() {
- return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC)
- .addProcessor("processor", define(new ForwardingProcessor()), "source")
- .addSink("sink", OUTPUT_TOPIC_1, "processor");
- }
-
- protected TopologyBuilder createMultiplexingTopology() {
- return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC)
- .addProcessor("processor", define(new MultiplexingProcessor(2)), "source")
- .addSink("sink1", OUTPUT_TOPIC_1, "processor")
- .addSink("sink2", OUTPUT_TOPIC_2, "processor");
- }
-
- protected TopologyBuilder createStatefulTopology(String storeName) {
- return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC)
- .addProcessor("processor", define(new StatefulProcessor(storeName)), "source")
- .addStateStore(
- Stores.create(storeName, config).withStringKeys().withStringValues().inMemory().build(),
- "processor"
- )
- .addSink("counts", OUTPUT_TOPIC_1, "processor");
- }
-
- /**
- * A processor that simply forwards all messages to all children.
- */
- protected static class ForwardingProcessor extends AbstractProcessor<String, String> {
-
- @Override
- public void process(String key, String value) {
- context().forward(key, value);
- }
-
- @Override
- public void punctuate(long streamTime) {
- context().forward(Long.toString(streamTime), "punctuate");
- }
- }
-
- /**
- * A processor that forwards slightly-modified messages to each child.
- */
- protected static class MultiplexingProcessor extends AbstractProcessor<String, String> {
-
- private final int numChildren;
-
- public MultiplexingProcessor(int numChildren) {
- this.numChildren = numChildren;
- }
-
- @Override
- public void process(String key, String value) {
- for (int i = 0; i != numChildren; ++i) {
- context().forward(key, value + "(" + (i + 1) + ")", i);
- }
- }
-
- @Override
- public void punctuate(long streamTime) {
- for (int i = 0; i != numChildren; ++i) {
- context().forward(Long.toString(streamTime), "punctuate(" + (i + 1) + ")", i);
- }
- }
- }
-
- /**
- * A processor that stores each key-value pair in an in-memory key-value store registered with the context. When
- * {@link #punctuate(long)} is called, it outputs the total number of entries in the store.
- */
- protected static class StatefulProcessor extends AbstractProcessor<String, String> {
-
- private KeyValueStore<String, String> store;
- private final String storeName;
-
- public StatefulProcessor(String storeName) {
- this.storeName = storeName;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void init(ProcessorContext context) {
- super.init(context);
- store = (KeyValueStore<String, String>) context.getStateStore(storeName);
- }
-
- @Override
- public void process(String key, String value) {
- store.put(key, value);
- }
-
- @Override
- public void punctuate(long streamTime) {
- int count = 0;
- for (KeyValueIterator<String, String> iter = store.all(); iter.hasNext();) {
- iter.next();
- ++count;
- }
- context().forward(Long.toString(streamTime), count);
- }
-
- @Override
- public void close() {
- store.close();
- }
- }
-
- protected <K, V> ProcessorSupplier<K, V> define(final Processor<K, V> processor) {
- return new ProcessorSupplier<K, V>() {
- @Override
- public Processor<K, V> get() {
- return processor;
- }
- };
- }
-
- public static class CustomTimestampExtractor implements TimestampExtractor {
- @Override
- public long extract(ConsumerRecord<Object, Object> record) {
- return timestamp;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
deleted file mode 100644
index 2c7aaeb..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.junit.Test;
-
-import java.util.ArrayList;
-
-import static org.junit.Assert.assertEquals;
-
-public class PunctuationQueueTest {
-
- @Test
- public void testPunctuationInterval() {
- TestProcessor processor = new TestProcessor();
- ProcessorNode<String, String> node = new ProcessorNode<>("test", processor, null);
- PunctuationQueue queue = new PunctuationQueue();
-
- PunctuationSchedule sched = new PunctuationSchedule(node, 100L);
- final long now = sched.timestamp - 100L;
-
- queue.schedule(sched);
-
- Punctuator punctuator = new Punctuator() {
- public void punctuate(ProcessorNode node, long time) {
- node.processor().punctuate(time);
- }
- };
-
- queue.mayPunctuate(now, punctuator);
- assertEquals(0, processor.punctuatedAt.size());
-
- queue.mayPunctuate(now + 99L, punctuator);
- assertEquals(0, processor.punctuatedAt.size());
-
- queue.mayPunctuate(now + 100L, punctuator);
- assertEquals(1, processor.punctuatedAt.size());
-
- queue.mayPunctuate(now + 199L, punctuator);
- assertEquals(1, processor.punctuatedAt.size());
-
- queue.mayPunctuate(now + 200L, punctuator);
- assertEquals(2, processor.punctuatedAt.size());
- }
-
- private static class TestProcessor implements Processor<String, String> {
-
- public final ArrayList<Long> punctuatedAt = new ArrayList<>();
-
- @Override
- public void init(ProcessorContext context) {
- }
-
- @Override
- public void process(String key, String value) {
- }
-
- @Override
- public void punctuate(long streamTime) {
- punctuatedAt.add(streamTime);
- }
-
- @Override
- public void close() {
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/processor/internals/QuickUnionTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/QuickUnionTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/QuickUnionTest.java
deleted file mode 100644
index c40e881..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/QuickUnionTest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.junit.Test;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-
-public class QuickUnionTest {
-
- @SuppressWarnings("unchecked")
- @Test
- public void testUnite() {
- QuickUnion<Long> qu = new QuickUnion<>();
-
- long[] ids = {
- 1L, 2L, 3L, 4L, 5L
- };
-
- for (long id : ids) {
- qu.add(id);
- }
-
- assertEquals(5, roots(qu, ids).size());
-
- qu.unite(1L, 2L);
- assertEquals(4, roots(qu, ids).size());
- assertEquals(qu.root(1L), qu.root(2L));
-
- qu.unite(3L, 4L);
- assertEquals(3, roots(qu, ids).size());
- assertEquals(qu.root(1L), qu.root(2L));
- assertEquals(qu.root(3L), qu.root(4L));
-
- qu.unite(1L, 5L);
- assertEquals(2, roots(qu, ids).size());
- assertEquals(qu.root(1L), qu.root(2L));
- assertEquals(qu.root(2L), qu.root(5L));
- assertEquals(qu.root(3L), qu.root(4L));
-
- qu.unite(3L, 5L);
- assertEquals(1, roots(qu, ids).size());
- assertEquals(qu.root(1L), qu.root(2L));
- assertEquals(qu.root(2L), qu.root(3L));
- assertEquals(qu.root(3L), qu.root(4L));
- assertEquals(qu.root(4L), qu.root(5L));
- }
-
- @Test
- public void testUniteMany() {
- QuickUnion<Long> qu = new QuickUnion<>();
-
- long[] ids = {
- 1L, 2L, 3L, 4L, 5L
- };
-
- for (long id : ids) {
- qu.add(id);
- }
-
- assertEquals(5, roots(qu, ids).size());
-
- qu.unite(1L, 2L, 3L, 4L);
- assertEquals(2, roots(qu, ids).size());
- assertEquals(qu.root(1L), qu.root(2L));
- assertEquals(qu.root(2L), qu.root(3L));
- assertEquals(qu.root(3L), qu.root(4L));
- assertNotEquals(qu.root(1L), qu.root(5L));
- }
-
- private Set<Long> roots(QuickUnion<Long> qu, long... ids) {
- HashSet<Long> roots = new HashSet<>();
- for (long id : ids) {
- roots.add(qu.root(id));
- }
- return roots;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
deleted file mode 100644
index 6e86410..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.TimestampExtractor;
-import org.apache.kafka.test.MockSourceNode;
-import org.apache.kafka.test.MockTimestampExtractor;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.List;
-
-public class RecordQueueTest {
- private final Serializer<Integer> intSerializer = new IntegerSerializer();
- private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
- private final TimestampExtractor timestampExtractor = new MockTimestampExtractor();
- private final RecordQueue queue = new RecordQueue(new TopicPartition("topic", 1), new MockSourceNode<>(intDeserializer, intDeserializer));
-
- private final byte[] recordValue = intSerializer.serialize(null, 10);
- private final byte[] recordKey = intSerializer.serialize(null, 1);
-
- @Test
- public void testTimeTracking() {
-
- assertTrue(queue.isEmpty());
-
- // add three 3 out-of-order records with timestamp 2, 1, 3
- List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
- new ConsumerRecord<>("topic", 1, 2, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 1, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 3, recordKey, recordValue));
-
- queue.addRawRecords(list1, timestampExtractor);
-
- assertEquals(3, queue.size());
- assertEquals(TimestampTracker.NOT_KNOWN, queue.timestamp());
-
- // poll the first record, now with 1, 3
- assertEquals(2L, queue.poll().timestamp);
- assertEquals(2, queue.size());
- assertEquals(1L, queue.timestamp());
-
- // poll the second record, now with 3
- assertEquals(1L, queue.poll().timestamp);
- assertEquals(1, queue.size());
- assertEquals(3L, queue.timestamp());
-
- // add three 3 out-of-order records with timestamp 4, 1, 2
- // now with 3, 4, 1, 2
- List<ConsumerRecord<byte[], byte[]>> list2 = Arrays.asList(
- new ConsumerRecord<>("topic", 1, 4, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 1, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 2, recordKey, recordValue));
-
- queue.addRawRecords(list2, timestampExtractor);
-
- assertEquals(4, queue.size());
- assertEquals(3L, queue.timestamp());
-
- // poll the third record, now with 4, 1, 2
- assertEquals(3L, queue.poll().timestamp);
- assertEquals(3, queue.size());
- assertEquals(3L, queue.timestamp());
-
- // poll the rest records
- assertEquals(4L, queue.poll().timestamp);
- assertEquals(3L, queue.timestamp());
-
- assertEquals(1L, queue.poll().timestamp);
- assertEquals(3L, queue.timestamp());
-
- assertEquals(2L, queue.poll().timestamp);
- assertEquals(0, queue.size());
- assertEquals(3L, queue.timestamp());
-
- // add three more records with 4, 5, 6
- List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList(
- new ConsumerRecord<>("topic", 1, 4, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 5, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 6, recordKey, recordValue));
-
- queue.addRawRecords(list3, timestampExtractor);
-
- assertEquals(3, queue.size());
- assertEquals(3L, queue.timestamp());
-
- // poll one record again, the timestamp should advance now
- assertEquals(4L, queue.poll().timestamp);
- assertEquals(2, queue.size());
- assertEquals(5L, queue.timestamp());
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
deleted file mode 100644
index a95c2fa..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.MockConsumer;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.clients.producer.MockProducer;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.test.MockSourceNode;
-import org.junit.Test;
-import org.junit.Before;
-
-import java.io.File;
-import java.nio.file.Files;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Properties;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class StreamTaskTest {
-
- private final Serializer<Integer> intSerializer = new IntegerSerializer();
- private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
- private final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
-
- private final TopicPartition partition1 = new TopicPartition("topic1", 1);
- private final TopicPartition partition2 = new TopicPartition("topic2", 1);
- private final Set<TopicPartition> partitions = Utils.mkSet(partition1, partition2);
-
- private final MockSourceNode source1 = new MockSourceNode<>(intDeserializer, intDeserializer);
- private final MockSourceNode source2 = new MockSourceNode<>(intDeserializer, intDeserializer);
- private final ProcessorTopology topology = new ProcessorTopology(
- Arrays.asList((ProcessorNode) source1, (ProcessorNode) source2),
- new HashMap<String, SourceNode>() {
- {
- put("topic1", source1);
- put("topic2", source2);
- }
- },
- Collections.<StateStoreSupplier>emptyList()
- );
-
- private StreamingConfig createConfig(final File baseDir) throws Exception {
- return new StreamingConfig(new Properties() {
- {
- setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
- setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
- setProperty(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
- setProperty(StreamingConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
- }
- });
- }
-
- private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
- private final MockProducer<byte[], byte[]> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer);
- private final MockConsumer<byte[], byte[]> restoreStateConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-
- private final byte[] recordValue = intSerializer.serialize(null, 10);
- private final byte[] recordKey = intSerializer.serialize(null, 1);
-
-
- @Before
- public void setup() {
- consumer.assign(Arrays.asList(partition1, partition2));
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testProcessOrder() throws Exception {
- File baseDir = Files.createTempDirectory("test").toFile();
- try {
- StreamingConfig config = createConfig(baseDir);
- StreamTask task = new StreamTask(new TaskId(0, 0), consumer, producer, restoreStateConsumer, partitions, topology, config, null);
-
- task.addRecords(partition1, records(
- new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue),
- new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, recordKey, recordValue),
- new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, recordKey, recordValue)
- ));
-
- task.addRecords(partition2, records(
- new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25, recordKey, recordValue),
- new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, recordKey, recordValue),
- new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, recordKey, recordValue)
- ));
-
- assertEquals(5, task.process());
- assertEquals(1, source1.numReceived);
- assertEquals(0, source2.numReceived);
-
- assertEquals(4, task.process());
- assertEquals(1, source1.numReceived);
- assertEquals(1, source2.numReceived);
-
- assertEquals(3, task.process());
- assertEquals(2, source1.numReceived);
- assertEquals(1, source2.numReceived);
-
- assertEquals(2, task.process());
- assertEquals(3, source1.numReceived);
- assertEquals(1, source2.numReceived);
-
- assertEquals(1, task.process());
- assertEquals(3, source1.numReceived);
- assertEquals(2, source2.numReceived);
-
- assertEquals(0, task.process());
- assertEquals(3, source1.numReceived);
- assertEquals(3, source2.numReceived);
-
- task.close();
-
- } finally {
- Utils.delete(baseDir);
- }
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testPauseResume() throws Exception {
- File baseDir = Files.createTempDirectory("test").toFile();
- try {
- StreamingConfig config = createConfig(baseDir);
- StreamTask task = new StreamTask(new TaskId(1, 1), consumer, producer, restoreStateConsumer, partitions, topology, config, null);
-
- task.addRecords(partition1, records(
- new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue),
- new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, recordKey, recordValue)
- ));
-
- task.addRecords(partition2, records(
- new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, recordKey, recordValue),
- new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, recordKey, recordValue),
- new ConsumerRecord<>(partition2.topic(), partition2.partition(), 55, recordKey, recordValue),
- new ConsumerRecord<>(partition2.topic(), partition2.partition(), 65, recordKey, recordValue)
- ));
-
- assertEquals(5, task.process());
- assertEquals(1, source1.numReceived);
- assertEquals(0, source2.numReceived);
-
- assertEquals(1, consumer.paused().size());
- assertTrue(consumer.paused().contains(partition2));
-
- task.addRecords(partition1, records(
- new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, recordKey, recordValue),
- new ConsumerRecord<>(partition1.topic(), partition1.partition(), 40, recordKey, recordValue),
- new ConsumerRecord<>(partition1.topic(), partition1.partition(), 50, recordKey, recordValue)
- ));
-
- assertEquals(2, consumer.paused().size());
- assertTrue(consumer.paused().contains(partition1));
- assertTrue(consumer.paused().contains(partition2));
-
- assertEquals(7, task.process());
- assertEquals(1, source1.numReceived);
- assertEquals(1, source2.numReceived);
-
- assertEquals(1, consumer.paused().size());
- assertTrue(consumer.paused().contains(partition1));
-
- assertEquals(6, task.process());
- assertEquals(2, source1.numReceived);
- assertEquals(1, source2.numReceived);
-
- assertEquals(0, consumer.paused().size());
-
- task.close();
-
- } finally {
- Utils.delete(baseDir);
- }
- }
-
- private Iterable<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]>... recs) {
- return Arrays.asList(recs);
- }
-}
|