kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3616: Make kafka producers/consumers injectable for KafkaStreams
Date Fri, 06 May 2016 15:49:48 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 78de891ac -> fec2d41bc


KAFKA-3616: Make kafka producers/consumers injectable for KafkaStreams

Ticket: https://issues.apache.org/jira/browse/KAFKA-3616

Author: Yuto Kawamura <kawamuray.dadada@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1264 from kawamuray/kafka-3616-inject-clients


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fec2d41b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fec2d41b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fec2d41b

Branch: refs/heads/trunk
Commit: fec2d41bc2bed9f4effbd9922ca487bb02e5eeae
Parents: 78de891
Author: Yuto Kawamura <kawamuray.dadada@gmail.com>
Authored: Fri May 6 08:49:44 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri May 6 08:49:44 2016 -0700

----------------------------------------------------------------------
 .../kafka/streams/KafkaClientSupplier.java      | 47 ++++++++++++++++
 .../org/apache/kafka/streams/KafkaStreams.java  | 17 +++++-
 .../internals/DefaultKafkaClientSupplier.java   | 45 ++++++++++++++++
 .../processor/internals/StreamThread.java       | 56 +++++---------------
 .../internals/StreamPartitionAssignorTest.java  | 56 ++++----------------
 .../processor/internals/StreamThreadTest.java   | 42 +++++++--------
 .../apache/kafka/test/MockClientSupplier.java   | 52 ++++++++++++++++++
 7 files changed, 201 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fec2d41b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
new file mode 100644
index 0000000..e0312f9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams;
+
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
+
+public interface KafkaClientSupplier {
+    /**
+     * Creates an instance of Producer which is used to produce records.
+     * @param config producer config which supplied by {@link StreamsConfig} given to {@link
KafkaStreams}
+     * @return an instance of kafka Producer
+     */
+    Producer<byte[], byte[]> getProducer(Map<String, Object> config);
+
+    /**
+     * Creates an instance of Consumer which is used to consume records of source topics.
+     * @param config consumer config which supplied by {@link StreamsConfig} given to {@link
KafkaStreams}
+     * @return an instance of kafka Consumer
+     */
+    Consumer<byte[], byte[]> getConsumer(Map<String, Object> config);
+
+    /**
+     * Creates an instance of Consumer which is used to consume records of internal topics.
+     * @param config restore consumer config which supplied by {@link StreamsConfig} given
to
+     * {@link KafkaStreams}
+     * @return an instance of kafka Consumer
+     */
+    Consumer<byte[], byte[]> getRestoreConsumer(Map<String, Object> config);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fec2d41b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 45024f2..b3e3f5d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -104,7 +105,7 @@ public class KafkaStreams {
      * @param props    properties for the {@link StreamsConfig}
      */
     public KafkaStreams(TopologyBuilder builder, Properties props) {
-        this(builder, new StreamsConfig(props));
+        this(builder, new StreamsConfig(props), new DefaultKafkaClientSupplier());
     }
 
     /**
@@ -114,6 +115,18 @@ public class KafkaStreams {
      * @param config   the stream configs
      */
     public KafkaStreams(TopologyBuilder builder, StreamsConfig config) {
+        this(builder, config, new DefaultKafkaClientSupplier());
+    }
+
+    /**
+     * Construct the stream instance.
+     *
+     * @param builder         the processor topology builder specifying the computational
logic
+     * @param config          the stream configs
+     * @param clientSupplier  the kafka clients supplier which provides underlying producer
and consumer clients
+     * for this {@link KafkaStreams} instance
+     */
+    public KafkaStreams(TopologyBuilder builder, StreamsConfig config, KafkaClientSupplier
clientSupplier) {
         // create the metrics
         Time time = new SystemTime();
 
@@ -138,7 +151,7 @@ public class KafkaStreams {
 
         this.threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
         for (int i = 0; i < this.threads.length; i++) {
-            this.threads[i] = new StreamThread(builder, config, applicationId, clientId,
processId, metrics, time);
+            this.threads[i] = new StreamThread(builder, config, clientSupplier, applicationId,
clientId, processId, metrics, time);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fec2d41b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
new file mode 100644
index 0000000..be17008
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.streams.KafkaClientSupplier;
+
+public class DefaultKafkaClientSupplier implements KafkaClientSupplier {
+    @Override
+    public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
+        return new KafkaProducer<>(config, new ByteArraySerializer(), new ByteArraySerializer());
+    }
+
+    @Override
+    public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) {
+        return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+    }
+
+    @Override
+    public Consumer<byte[], byte[]> getRestoreConsumer(Map<String, Object> config)
{
+        return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fec2d41b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index d4cb78c..72eeef5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -22,8 +22,6 @@ import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.MetricName;
@@ -35,10 +33,9 @@ import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Count;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaClientSupplier;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.errors.StreamsException;
@@ -151,24 +148,12 @@ public class StreamThread extends Thread {
 
     public StreamThread(TopologyBuilder builder,
                         StreamsConfig config,
+                        KafkaClientSupplier clientSupplier,
                         String applicationId,
                         String clientId,
                         UUID processId,
                         Metrics metrics,
                         Time time) {
-        this(builder, config, null , null, null, applicationId, clientId, processId, metrics,
time);
-    }
-
-    StreamThread(TopologyBuilder builder,
-                 StreamsConfig config,
-                 Producer<byte[], byte[]> producer,
-                 Consumer<byte[], byte[]> consumer,
-                 Consumer<byte[], byte[]> restoreConsumer,
-                 String applicationId,
-                 String clientId,
-                 UUID processId,
-                 Metrics metrics,
-                 Time time) {
         super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
 
         this.applicationId = applicationId;
@@ -180,9 +165,16 @@ public class StreamThread extends Thread {
         this.partitionGrouper = config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG,
PartitionGrouper.class);
 
         // set the producer and consumer clients
-        this.producer = (producer != null) ? producer : createProducer();
-        this.consumer = (consumer != null) ? consumer : createConsumer();
-        this.restoreConsumer = (restoreConsumer != null) ? restoreConsumer : createRestoreConsumer();
+        String threadName = getName();
+        String threadClientId = clientId + "-" + threadName;
+        log.info("Creating producer client for stream thread [{}]", threadName);
+        this.producer = clientSupplier.getProducer(config.getProducerConfigs(threadClientId));
+        log.info("Creating consumer client for stream thread [{}]", threadName);
+        this.consumer = clientSupplier.getConsumer(
+                config.getConsumerConfigs(this, applicationId, threadClientId));
+        log.info("Creating restore consumer client for stream thread [{}]", threadName);
+        this.restoreConsumer = clientSupplier.getRestoreConsumer(
+                config.getRestoreConsumerConfigs(threadClientId));
 
         // initialize the task list
         this.activeTasks = new HashMap<>();
@@ -213,30 +205,6 @@ public class StreamThread extends Thread {
         this.partitionAssignor = partitionAssignor;
     }
 
-    private Producer<byte[], byte[]> createProducer() {
-        String threadName = this.getName();
-        log.info("Creating producer client for stream thread [" + threadName + "]");
-        return new KafkaProducer<>(config.getProducerConfigs(this.clientId + "-" +
threadName),
-                new ByteArraySerializer(),
-                new ByteArraySerializer());
-    }
-
-    private Consumer<byte[], byte[]> createConsumer() {
-        String threadName = this.getName();
-        log.info("Creating consumer client for stream thread [" + threadName + "]");
-        return new KafkaConsumer<>(config.getConsumerConfigs(this, this.applicationId,
this.clientId + "-" + threadName),
-                new ByteArrayDeserializer(),
-                new ByteArrayDeserializer());
-    }
-
-    private Consumer<byte[], byte[]> createRestoreConsumer() {
-        String threadName = this.getName();
-        log.info("Creating restore consumer client for stream thread [" + threadName + "]");
-        return new KafkaConsumer<>(config.getRestoreConsumerConfigs(this.clientId +
"-" + threadName),
-                new ByteArrayDeserializer(),
-                new ByteArrayDeserializer());
-    }
-
     /**
      * Execute the stream processors
      * @throws KafkaException for any Kafka-related exceptions

http://git-wip-us.apache.org/repos/asf/kafka/blob/fec2d41b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 3e8b110..17bda54 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -18,15 +18,12 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.MockConsumer;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
-import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
@@ -34,6 +31,7 @@ import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
+import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
@@ -98,17 +96,11 @@ public class StreamPartitionAssignorTest {
         };
     }
 
-    private ByteArraySerializer serializer = new ByteArraySerializer();
-
     @SuppressWarnings("unchecked")
     @Test
     public void testSubscription() throws Exception {
         StreamsConfig config = new StreamsConfig(configProps());
 
-        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer,
serializer);
-        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-        MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
-
         TopologyBuilder builder = new TopologyBuilder();
         builder.addSource("source1", "topic1");
         builder.addSource("source2", "topic2");
@@ -122,7 +114,7 @@ public class StreamPartitionAssignorTest {
 
         String clientId = "client-id";
         UUID processId = UUID.randomUUID();
-        StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer,
"test", clientId, processId, new Metrics(), new SystemTime()) {
+        StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(),
"test", clientId, processId, new Metrics(), new SystemTime()) {
             @Override
             public Set<TaskId> prevTasks() {
                 return prevTasks;
@@ -152,10 +144,6 @@ public class StreamPartitionAssignorTest {
     public void testAssignBasic() throws Exception {
         StreamsConfig config = new StreamsConfig(configProps());
 
-        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer,
serializer);
-        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-        MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
-
         TopologyBuilder builder = new TopologyBuilder();
         builder.addSource("source1", "topic1");
         builder.addSource("source2", "topic2");
@@ -173,9 +161,8 @@ public class StreamPartitionAssignorTest {
         UUID uuid1 = UUID.randomUUID();
         UUID uuid2 = UUID.randomUUID();
         String client1 = "client1";
-        String client2 = "client2";
 
-        StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer,
"test", client1, uuid1, new Metrics(), new SystemTime());
+        StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(),
"test", client1, uuid1, new Metrics(), new SystemTime());
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
@@ -224,10 +211,6 @@ public class StreamPartitionAssignorTest {
     public void testAssignWithNewTasks() throws Exception {
         StreamsConfig config = new StreamsConfig(configProps());
 
-        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer,
serializer);
-        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-        MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
-
         TopologyBuilder builder = new TopologyBuilder();
         builder.addSource("source1", "topic1");
         builder.addSource("source2", "topic2");
@@ -244,9 +227,8 @@ public class StreamPartitionAssignorTest {
         UUID uuid1 = UUID.randomUUID();
         UUID uuid2 = UUID.randomUUID();
         String client1 = "client1";
-        String client2 = "client2";
 
-        StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer,
"test", client1, uuid1, new Metrics(), new SystemTime());
+        StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(),
"test", client1, uuid1, new Metrics(), new SystemTime());
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
@@ -288,10 +270,6 @@ public class StreamPartitionAssignorTest {
     public void testAssignWithStates() throws Exception {
         StreamsConfig config = new StreamsConfig(configProps());
 
-        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer,
serializer);
-        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-        MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
-
         TopologyBuilder builder = new TopologyBuilder();
 
         builder.addSource("source1", "topic1");
@@ -316,9 +294,8 @@ public class StreamPartitionAssignorTest {
         UUID uuid1 = UUID.randomUUID();
         UUID uuid2 = UUID.randomUUID();
         String client1 = "client1";
-        String client2 = "client2";
 
-        StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer,
"test", client1, uuid1, new Metrics(), new SystemTime());
+        StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(),
"test", client1, uuid1, new Metrics(), new SystemTime());
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
@@ -354,10 +331,6 @@ public class StreamPartitionAssignorTest {
         props.setProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
         StreamsConfig config = new StreamsConfig(props);
 
-        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer,
serializer);
-        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-        MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
-
         TopologyBuilder builder = new TopologyBuilder();
         builder.addSource("source1", "topic1");
         builder.addSource("source2", "topic2");
@@ -376,9 +349,8 @@ public class StreamPartitionAssignorTest {
         UUID uuid1 = UUID.randomUUID();
         UUID uuid2 = UUID.randomUUID();
         String client1 = "client1";
-        String client2 = "client2";
 
-        StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer,
"test", client1, uuid1, new Metrics(), new SystemTime());
+        StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(),
"test", client1, uuid1, new Metrics(), new SystemTime());
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
@@ -470,10 +442,6 @@ public class StreamPartitionAssignorTest {
     public void testOnAssignment() throws Exception {
         StreamsConfig config = new StreamsConfig(configProps());
 
-        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer,
serializer);
-        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-        MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
-
         TopicPartition t2p3 = new TopicPartition("topic2", 3);
 
         TopologyBuilder builder = new TopologyBuilder();
@@ -484,7 +452,7 @@ public class StreamPartitionAssignorTest {
         UUID uuid = UUID.randomUUID();
         String client1 = "client1";
 
-        StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer,
"test", client1, uuid, new Metrics(), new SystemTime());
+        StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(),
"test", client1, uuid, new Metrics(), new SystemTime());
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(thread, "test", client1));
@@ -507,10 +475,6 @@ public class StreamPartitionAssignorTest {
     public void testAssignWithInternalTopics() throws Exception {
         StreamsConfig config = new StreamsConfig(configProps());
 
-        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer,
serializer);
-        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-        MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
-
         TopologyBuilder builder = new TopologyBuilder();
         builder.addInternalTopic("topicX");
         builder.addSource("source1", "topic1");
@@ -522,14 +486,14 @@ public class StreamPartitionAssignorTest {
         Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
 
         UUID uuid1 = UUID.randomUUID();
-        UUID uuid2 = UUID.randomUUID();
         String client1 = "client1";
 
-        StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer,
"test", client1, uuid1, new Metrics(), new SystemTime());
+        MockClientSupplier clientSupplier = new MockClientSupplier();
+        StreamThread thread10 = new StreamThread(builder, config, clientSupplier, "test",
client1, uuid1, new Metrics(), new SystemTime());
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
-        MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(mockRestoreConsumer);
+        MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(clientSupplier.restoreConsumer);
         partitionAssignor.setInternalTopicManager(internalTopicManager);
 
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/fec2d41b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index e387a59..4ae31e4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -19,27 +19,25 @@ package org.apache.kafka.streams.processor.internals;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.MockConsumer;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
-import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.junit.Test;
@@ -148,28 +146,22 @@ public class StreamThreadTest {
         }
     }
 
-    private ByteArraySerializer serializer = new ByteArraySerializer();
-
     @SuppressWarnings("unchecked")
     @Test
     public void testPartitionAssignmentChange() throws Exception {
         StreamsConfig config = new StreamsConfig(configProps());
 
-        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer,
serializer);
-        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-        final MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
-
         TopologyBuilder builder = new TopologyBuilder();
         builder.addSource("source1", "topic1");
         builder.addSource("source2", "topic2");
         builder.addSource("source3", "topic3");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source2", "source3");
 
-        StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer,
applicationId, clientId, processId, new Metrics(), new SystemTime()) {
+        StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(),
applicationId, clientId, processId, new Metrics(), new SystemTime()) {
             @Override
             protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition>
partitionsForTask) {
                 ProcessorTopology topology = builder.build("X", id.topicGroupId);
-                return new TestStreamTask(id, applicationId, partitionsForTask, topology,
consumer, producer, mockRestoreConsumer, config);
+                return new TestStreamTask(id, applicationId, partitionsForTask, topology,
consumer, producer, restoreConsumer, config);
             }
         };
 
@@ -279,15 +271,12 @@ public class StreamThreadTest {
             stateDir3.mkdir();
             extraDir.mkdir();
 
-            MockProducer<byte[], byte[]> producer = new MockProducer<>(true,
serializer, serializer);
-            MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-            final MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
             MockTime mockTime = new MockTime();
 
             TopologyBuilder builder = new TopologyBuilder();
             builder.addSource("source1", "topic1");
 
-            StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer,
applicationId, clientId,  processId, new Metrics(), mockTime) {
+            StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(),
applicationId, clientId,  processId, new Metrics(), mockTime) {
                 @Override
                 public void maybeClean() {
                     super.maybeClean();
@@ -296,7 +285,7 @@ public class StreamThreadTest {
                 @Override
                 protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition>
partitionsForTask) {
                     ProcessorTopology topology = builder.build("X", id.topicGroupId);
-                    return new TestStreamTask(id, applicationId, partitionsForTask, topology,
consumer, producer, mockRestoreConsumer, config);
+                    return new TestStreamTask(id, applicationId, partitionsForTask, topology,
consumer, producer, restoreConsumer, config);
                 }
             };
 
@@ -401,15 +390,12 @@ public class StreamThreadTest {
 
             StreamsConfig config = new StreamsConfig(props);
 
-            MockProducer<byte[], byte[]> producer = new MockProducer<>(true,
serializer, serializer);
-            MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-            final MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
             MockTime mockTime = new MockTime();
 
             TopologyBuilder builder = new TopologyBuilder();
             builder.addSource("source1", "topic1");
 
-            StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer,
applicationId, clientId,  processId, new Metrics(), mockTime) {
+            StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(),
applicationId, clientId,  processId, new Metrics(), mockTime) {
                 @Override
                 public void maybeCommit() {
                     super.maybeCommit();
@@ -418,7 +404,7 @@ public class StreamThreadTest {
                 @Override
                 protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition>
partitionsForTask) {
                     ProcessorTopology topology = builder.build("X", id.topicGroupId);
-                    return new TestStreamTask(id, applicationId, partitionsForTask, topology,
consumer, producer, mockRestoreConsumer, config);
+                    return new TestStreamTask(id, applicationId, partitionsForTask, topology,
consumer, producer, restoreConsumer, config);
                 }
             };
 
@@ -475,6 +461,18 @@ public class StreamThreadTest {
         }
     }
 
+    @Test
+    public void testInjectClients() {
+        TopologyBuilder builder = new TopologyBuilder();
+        StreamsConfig config = new StreamsConfig(configProps());
+        MockClientSupplier clientSupplier = new MockClientSupplier();
+        StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId,
+                                               clientId,  processId, new Metrics(), new MockTime());
+        assertSame(clientSupplier.producer, thread.producer);
+        assertSame(clientSupplier.consumer, thread.consumer);
+        assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer);
+    }
+
     private void initPartitionGrouper(StreamsConfig config, StreamThread thread) {
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();

http://git-wip-us.apache.org/repos/asf/kafka/blob/fec2d41b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
new file mode 100644
index 0000000..3861ff8
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.test;
+
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.streams.KafkaClientSupplier;
+
+public class MockClientSupplier implements KafkaClientSupplier {
+    private static final ByteArraySerializer BYTE_ARRAY_SERIALIZER = new ByteArraySerializer();
+
+    public final MockProducer<byte[], byte[]> producer =
+            new MockProducer<>(true, BYTE_ARRAY_SERIALIZER, BYTE_ARRAY_SERIALIZER);
+    public final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+    public final MockConsumer<byte[], byte[]> restoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
+
+    @Override
+    public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
+        return producer;
+    }
+
+    @Override
+    public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) {
+        return consumer;
+    }
+
+    @Override
+    public Consumer<byte[], byte[]> getRestoreConsumer(Map<String, Object> config)
{
+        return restoreConsumer;
+    }
+}


Mime
View raw message