kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject [1/2] kafka git commit: KAFKA-3066: Demo Examples for Kafka Streams
Date Fri, 22 Jan 2016 23:25:43 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a19729fe6 -> c197113a9


http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/processor/internals/WallclockTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/WallclockTimestampExtractor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/WallclockTimestampExtractor.java
new file mode 100644
index 0000000..60b3b96
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/WallclockTimestampExtractor.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+
+public class WallclockTimestampExtractor implements TimestampExtractor {
+    @Override
+    public long extract(ConsumerRecord<Object, Object> record) {
+        return System.currentTimeMillis();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 777fae5..b2af904 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.examples.WallclockTimestampExtractor;
+import org.apache.kafka.streams.processor.internals.WallclockTimestampExtractor;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.junit.Before;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
new file mode 100644
index 0000000..1b8cbb8
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+
+public class WindowedStreamPartitionerTest {
+
+    private String topicName = "topic";
+
+    private IntegerSerializer keySerializer = new IntegerSerializer();
+    private StringSerializer valSerializer = new StringSerializer();
+
+    private List<PartitionInfo> infos = Arrays.asList(
+            new PartitionInfo(topicName, 0, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo(topicName, 1, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo(topicName, 2, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo(topicName, 3, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo(topicName, 4, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo(topicName, 5, Node.noNode(), new Node[0], new Node[0])
+    );
+
+    private Cluster cluster = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
+
+    @Test
+    public void testCopartitioning() {
+
+        Random rand = new Random();
+
+        DefaultPartitioner defaultPartitioner = new DefaultPartitioner();
+
+        WindowedSerializer<Integer> windowedSerializer = new WindowedSerializer<>(keySerializer);
+        WindowedStreamPartitioner<Integer, String> streamPartitioner = new WindowedStreamPartitioner<>(windowedSerializer);
+
+        for (int k = 0; k < 10; k++) {
+            Integer key = rand.nextInt();
+            byte[] keyBytes = keySerializer.serialize(topicName, key);
+
+            String value = key.toString();
+            byte[] valueBytes = valSerializer.serialize(topicName, value);
+
+            Integer expected = defaultPartitioner.partition("topic", key, keyBytes, value,
valueBytes, cluster);
+
+            for (int w = 0; w < 10; w++) {
+                HoppingWindow window = new HoppingWindow(10 * w, 20 * w);
+
+                Windowed<Integer> windowedKey = new Windowed<>(key, window);
+                Integer actual = streamPartitioner.partition(windowedKey, value, infos.size());
+
+                assertEquals(expected, actual);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitionerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitionerTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitionerTest.java
deleted file mode 100644
index 18494fd..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitionerTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-
-import static org.junit.Assert.assertEquals;
-
-public class WindowedStreamsPartitionerTest {
-
-    private String topicName = "topic";
-
-    private IntegerSerializer keySerializer = new IntegerSerializer();
-    private StringSerializer valSerializer = new StringSerializer();
-
-    private List<PartitionInfo> infos = Arrays.asList(
-            new PartitionInfo(topicName, 0, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo(topicName, 1, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo(topicName, 2, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo(topicName, 3, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo(topicName, 4, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo(topicName, 5, Node.noNode(), new Node[0], new Node[0])
-    );
-
-    private Cluster cluster = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
-
-    @Test
-    public void testCopartitioning() {
-
-        Random rand = new Random();
-
-        DefaultPartitioner defaultPartitioner = new DefaultPartitioner();
-
-        WindowedSerializer<Integer> windowedSerializer = new WindowedSerializer<>(keySerializer);
-        WindowedStreamsPartitioner<Integer, String> streamPartitioner = new WindowedStreamsPartitioner<>(windowedSerializer);
-
-        for (int k = 0; k < 10; k++) {
-            Integer key = rand.nextInt();
-            byte[] keyBytes = keySerializer.serialize(topicName, key);
-
-            String value = key.toString();
-            byte[] valueBytes = valSerializer.serialize(topicName, value);
-
-            Integer expected = defaultPartitioner.partition("topic", key, keyBytes, value,
valueBytes, cluster);
-
-            for (int w = 0; w < 10; w++) {
-                HoppingWindow window = new HoppingWindow(10 * w, 20 * w);
-
-                Windowed<Integer> windowedKey = new Windowed<>(key, window);
-                Integer actual = streamPartitioner.partition(windowedKey, value, infos.size());
-
-                assertEquals(expected, actual);
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 60bd309..cb6ea05 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -32,7 +32,7 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StreamsPartitioner;
+import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -194,8 +194,8 @@ public class ProcessorTopologyTest {
         assertNull(driver.readOutput(topic));
     }
 
-    protected <K, V> StreamsPartitioner<K, V> constantPartitioner(final Integer
partition) {
-        return new StreamsPartitioner<K, V>() {
+    protected <K, V> StreamPartitioner<K, V> constantPartitioner(final Integer
partition) {
+        return new StreamPartitioner<K, V>() {
             @Override
             public Integer partition(K key, V value, int numPartitions) {
                 return partition;

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index fd604b6..ffcf9ae 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -93,8 +93,8 @@ public class StandbyTaskTest {
                 setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
                 setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
                 setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
+                setProperty(StreamsConfig.JOB_ID_CONFIG, jobId);
                 setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
-                setProperty(StreamsConfig.JOB_ID_CONFIG, "standby-task-test");
                 setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
                 setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
             }
@@ -200,7 +200,7 @@ public class StandbyTaskTest {
 
             task.close();
 
-            File taskDir = new File(baseDir, taskId.toString());
+            File taskDir = new File(StreamThread.makeStateDir(jobId, baseDir.getCanonicalPath()),
taskId.toString());
             OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
             Map<TopicPartition, Long> offsets = checkpoint.read();
 
@@ -298,7 +298,7 @@ public class StandbyTaskTest {
 
             task.close();
 
-            File taskDir = new File(baseDir, taskId.toString());
+            File taskDir = new File(StreamThread.makeStateDir(jobId, baseDir.getCanonicalPath()),
taskId.toString());
             OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
             Map<TopicPartition, Long> offsets = checkpoint.read();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 2d531bc..039cb96 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -59,8 +59,9 @@ import java.util.UUID;
 
 public class StreamThreadTest {
 
-    private String clientId = "clientId";
-    private UUID processId = UUID.randomUUID();
+    private final String clientId = "clientId";
+    private final String jobId = "stream-thread-test";
+    private final UUID processId = UUID.randomUUID();
 
     private TopicPartition t1p1 = new TopicPartition("topic1", 1);
     private TopicPartition t1p2 = new TopicPartition("topic1", 2);
@@ -117,8 +118,8 @@ public class StreamThreadTest {
                 setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
                 setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
                 setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
+                setProperty(StreamsConfig.JOB_ID_CONFIG, jobId);
                 setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
-                setProperty(StreamsConfig.JOB_ID_CONFIG, "stream-thread-test");
                 setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
             }
         };
@@ -128,13 +129,14 @@ public class StreamThreadTest {
         public boolean committed = false;
 
         public TestStreamTask(TaskId id,
+                              String jobId,
                               Collection<TopicPartition> partitions,
                               ProcessorTopology topology,
                               Consumer<byte[], byte[]> consumer,
                               Producer<byte[], byte[]> producer,
                               Consumer<byte[], byte[]> restoreConsumer,
                               StreamsConfig config) {
-            super(id, "jobId", partitions, topology, consumer, producer, restoreConsumer,
config, null);
+            super(id, jobId, partitions, topology, consumer, producer, restoreConsumer, config,
null);
         }
 
         @Override
@@ -161,11 +163,11 @@ public class StreamThreadTest {
         builder.addSource("source3", "topic3");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source2", "source3");
 
-        StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer,
"test", clientId, processId, new Metrics(), new SystemTime()) {
+        StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer,
jobId, clientId, processId, new Metrics(), new SystemTime()) {
             @Override
             protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition>
partitionsForTask) {
                 ProcessorTopology topology = builder.build(id.topicGroupId);
-                return new TestStreamTask(id, partitionsForTask, topology, consumer, producer,
mockRestoreConsumer, config);
+                return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer,
producer, mockRestoreConsumer, config);
             }
         };
 
@@ -264,10 +266,12 @@ public class StreamThreadTest {
 
             StreamsConfig config = new StreamsConfig(props);
 
-            File stateDir1 = new File(baseDir, task1.toString());
-            File stateDir2 = new File(baseDir, task2.toString());
-            File stateDir3 = new File(baseDir, task3.toString());
-            File extraDir = new File(baseDir, "X");
+            File jobDir = new File(baseDir, jobId);
+            jobDir.mkdir();
+            File stateDir1 = new File(jobDir, task1.toString());
+            File stateDir2 = new File(jobDir, task2.toString());
+            File stateDir3 = new File(jobDir, task3.toString());
+            File extraDir = new File(jobDir, "X");
             stateDir1.mkdir();
             stateDir2.mkdir();
             stateDir3.mkdir();
@@ -281,7 +285,7 @@ public class StreamThreadTest {
             TopologyBuilder builder = new TopologyBuilder();
             builder.addSource("source1", "topic1");
 
-            StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer,
"test", clientId,  processId, new Metrics(), mockTime) {
+            StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer,
jobId, clientId,  processId, new Metrics(), mockTime) {
                 @Override
                 public void maybeClean() {
                     super.maybeClean();
@@ -290,7 +294,7 @@ public class StreamThreadTest {
                 @Override
                 protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition>
partitionsForTask) {
                     ProcessorTopology topology = builder.build(id.topicGroupId);
-                    return new TestStreamTask(id, partitionsForTask, topology, consumer,
producer, mockRestoreConsumer, config);
+                    return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer,
producer, mockRestoreConsumer, config);
                 }
             };
 
@@ -403,7 +407,7 @@ public class StreamThreadTest {
             TopologyBuilder builder = new TopologyBuilder();
             builder.addSource("source1", "topic1");
 
-            StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer,
"test", clientId,  processId, new Metrics(), mockTime) {
+            StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer,
jobId, clientId,  processId, new Metrics(), mockTime) {
                 @Override
                 public void maybeCommit() {
                     super.maybeCommit();
@@ -412,7 +416,7 @@ public class StreamThreadTest {
                 @Override
                 protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition>
partitionsForTask) {
                     ProcessorTopology topology = builder.build(id.topicGroupId);
-                    return new TestStreamTask(id, partitionsForTask, topology, consumer,
producer, mockRestoreConsumer, config);
+                    return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer,
producer, mockRestoreConsumer, config);
                 }
             };
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 36e487b..1e9c3ba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -29,7 +29,7 @@ import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StreamsPartitioner;
+import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.test.MockProcessorContext;
@@ -249,7 +249,7 @@ public class KeyValueStoreTestDriver<K, V> {
             }
             @Override
             public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1>
keySerializer, Serializer<V1> valueSerializer,
-                                    StreamsPartitioner<K1, V1> partitioner) {
+                                    StreamPartitioner<K1, V1> partitioner) {
                 recordFlushed(record.key(), record.value());
             }
         };

http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 8f8e00f..2dc567e 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -24,7 +24,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.processor.StreamsPartitioner;
+import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
@@ -130,7 +130,7 @@ public class KStreamTestDriver {
         
         @Override
         public <K, V> void send(ProducerRecord<K, V> record, Serializer<K>
keySerializer, Serializer<V> valueSerializer,
-                                StreamsPartitioner<K, V> partitioner) {
+                                StreamPartitioner<K, V> partitioner) {
             // The serialization is skipped.
             process(record.topic(), record.key(), record.value());
         }


Mime
View raw message