kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/3] kafka git commit: KAFKA-4923: Add Exactly-Once Semantics to Streams
Date Wed, 17 May 2017 00:23:14 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 670193f22 -> ebc7f7caa


http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 7abe4dd..e5b96ff 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.clients.producer.MockProducer;
@@ -26,6 +27,7 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
@@ -34,13 +36,13 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockInternalTopicManager;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
+import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
 import org.junit.Before;
@@ -79,8 +81,14 @@ public class StreamThreadTest {
 
     private final String clientId = "clientId";
     private final String applicationId = "stream-thread-test";
-    private final MockTime time = new MockTime();
+    private final MockTime mockTime = new MockTime();
+    private final Metrics metrics = new Metrics();
+    private final MockClientSupplier clientSupplier = new MockClientSupplier();
     private UUID processId = UUID.randomUUID();
+    final KStreamBuilder builder = new KStreamBuilder();
+    private final StreamsConfig config = new StreamsConfig(configProps(false));
+
+
 
     @Before
     public void setUp() throws Exception {
@@ -106,8 +114,12 @@ public class StreamThreadTest {
         new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0])
     );
 
-    private final Cluster metadata = new Cluster("cluster", Collections.singleton(Node.noNode()), infos, Collections.<String>emptySet(),
-            Collections.<String>emptySet());
+    private final Cluster metadata = new Cluster(
+        "cluster",
+        Collections.singleton(Node.noNode()),
+        infos,
+        Collections.<String>emptySet(),
+        Collections.<String>emptySet());
 
     private final PartitionAssignor.Subscription subscription =
         new PartitionAssignor.Subscription(Arrays.asList("topic1", "topic2", "topic3"), subscriptionUserData());
@@ -135,7 +147,7 @@ public class StreamThreadTest {
     private final TaskId task4 = new TaskId(1, 1);
     private final TaskId task5 = new TaskId(1, 2);
 
-    private Properties configProps() {
+    private Properties configProps(final boolean enableEos) {
         return new Properties() {
             {
                 setProperty(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
@@ -143,6 +155,9 @@ public class StreamThreadTest {
                 setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
                 setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName());
                 setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
+                if (enableEos) {
+                    setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
+                }
             }
         };
     }
@@ -162,13 +177,23 @@ public class StreamThreadTest {
                        final StreamsConfig config,
                        final StreamsMetrics metrics,
                        final StateDirectory stateDirectory) {
-            super(id, applicationId, partitions, topology, consumer, new StoreChangelogReader(restoreConsumer, Time.SYSTEM, 5000), config, metrics,
-                  stateDirectory, null, new MockTime(), new RecordCollectorImpl(producer, id.toString()));
+            super(id,
+                applicationId,
+                partitions,
+                topology,
+                consumer,
+                new StoreChangelogReader(restoreConsumer, Time.SYSTEM, 5000),
+                config,
+                metrics,
+                stateDirectory,
+                null,
+                new MockTime(),
+                producer);
         }
 
         @Override
-        public void commit() {
-            super.commit();
+        void commitImpl(final boolean startNewTransaction) {
+            super.commitImpl(startNewTransaction);
             committed = true;
         }
 
@@ -176,9 +201,9 @@ public class StreamThreadTest {
         protected void updateOffsetLimits() {}
 
         @Override
-        public void close() {
+        public void close(final boolean clean) {
+            super.close(clean);
             closed = true;
-            super.close();
         }
 
         @Override
@@ -192,32 +217,44 @@ public class StreamThreadTest {
     @SuppressWarnings("unchecked")
     @Test
     public void testPartitionAssignmentChange() throws Exception {
-        final StreamsConfig config = new StreamsConfig(configProps());
-        final StateListenerStub stateListener = new StateListenerStub();
-
-
-        final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X");
         builder.addSource("source1", "topic1");
         builder.addSource("source2", "topic2");
         builder.addSource("source3", "topic3");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source2", "source3");
 
-
-        final MockClientSupplier mockClientSupplier = new MockClientSupplier();
-        final StreamThread thread = new StreamThread(builder, config, mockClientSupplier, applicationId, clientId, processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) {
+        final StreamThread thread = new StreamThread(
+            builder,
+            config,
+            clientSupplier,
+            applicationId,
+            clientId,
+            processId,
+            metrics,
+            Time.SYSTEM,
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            0) {
 
             @Override
             protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitionsForTask) {
-
                 final ProcessorTopology topology = builder.build(id.topicGroupId);
-                return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer,
-                    mockClientSupplier.getProducer(new HashMap()), restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory);
+                return new TestStreamTask(
+                    id,
+                    applicationId,
+                    partitionsForTask,
+                    topology,
+                    consumer,
+                    clientSupplier.getProducer(new HashMap()),
+                    restoreConsumer,
+                    config,
+                    new MockStreamsMetrics(new Metrics()),
+                    stateDirectory);
             }
         };
 
+        final StateListenerStub stateListener = new StateListenerStub();
         thread.setStateListener(stateListener);
         assertEquals(thread.state(), StreamThread.State.RUNNING);
-        initPartitionGrouper(config, thread, mockClientSupplier);
+        initPartitionGrouper(config, thread, clientSupplier);
 
         final ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
 
@@ -336,7 +373,6 @@ public class StreamThreadTest {
     @SuppressWarnings("unchecked")
     @Test
     public void testHandingOverTaskFromOneToAnotherThread() throws Exception {
-        final TopologyBuilder builder = new TopologyBuilder();
         builder.addStateStore(
             Stores
                 .create("store")
@@ -346,12 +382,31 @@ public class StreamThreadTest {
                 .build()
         );
         builder.addSource("source", TOPIC);
-        final StreamsConfig config = new StreamsConfig(configProps());
-        final MockClientSupplier mockClientSupplier = new MockClientSupplier();
-        mockClientSupplier.consumer.assign(Arrays.asList(new TopicPartition(TOPIC, 0), new TopicPartition(TOPIC, 1)));
 
-        final StreamThread thread1 = new StreamThread(builder, config, mockClientSupplier, applicationId, clientId + 1, processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0);
-        final StreamThread thread2 = new StreamThread(builder, config, mockClientSupplier, applicationId, clientId + 2, processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0);
+        clientSupplier.consumer.assign(Arrays.asList(new TopicPartition(TOPIC, 0), new TopicPartition(TOPIC, 1)));
+
+        final StreamThread thread1 = new StreamThread(
+            builder,
+            config,
+            clientSupplier,
+            applicationId,
+            clientId + 1,
+            processId,
+            metrics,
+            Time.SYSTEM,
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            0);
+        final StreamThread thread2 = new StreamThread(
+            builder,
+            config,
+            clientSupplier,
+            applicationId,
+            clientId + 2,
+            processId,
+            metrics,
+            Time.SYSTEM,
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            0);
 
         final Map<TaskId, Set<TopicPartition>> task0 = Collections.singletonMap(new TaskId(0, 0), task0Assignment);
         final Map<TaskId, Set<TopicPartition>> task1 = Collections.singletonMap(new TaskId(0, 1), task1Assignment);
@@ -417,17 +472,24 @@ public class StreamThreadTest {
         Map<TaskId, Set<TopicPartition>> activeTasks() {
             return activeTaskAssignment;
         }
+
+        @Override
+        public void close() {}
     }
 
     @Test
     public void testMetrics() throws Exception {
-        final TopologyBuilder builder = new TopologyBuilder().setApplicationId("MetricsApp");
-        final StreamsConfig config = new StreamsConfig(configProps());
-        final MockClientSupplier clientSupplier = new MockClientSupplier();
-
-        final Metrics metrics = new Metrics();
-        final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId,
-            clientId,  processId, metrics, new MockTime(), new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0);
+        final StreamThread thread = new StreamThread(
+            builder,
+            config,
+            clientSupplier,
+            applicationId,
+            clientId,
+            processId,
+            metrics,
+            mockTime,
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            0);
         final String defaultGroupName = "stream-metrics";
         final String defaultPrefix = "thread." + thread.threadClientId();
         final Map<String, String> defaultTags = Collections.singletonMap("client-id", thread.threadClientId());
@@ -462,31 +524,34 @@ public class StreamThreadTest {
         final File baseDir = Files.createTempDirectory("test").toFile();
         try {
             final long cleanupDelay = 1000L;
-            final Properties props = configProps();
+            final Properties props = configProps(false);
             props.setProperty(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, Long.toString(cleanupDelay));
             props.setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
-
             final StreamsConfig config = new StreamsConfig(props);
-
             final File applicationDir = new File(baseDir, applicationId);
             applicationDir.mkdir();
             final File stateDir1 = new File(applicationDir, task1.toString());
             final File stateDir2 = new File(applicationDir, task2.toString());
             final File stateDir3 = new File(applicationDir, task3.toString());
-            final File extraDir = new File(applicationDir, "X");
+            final File extraDir = new File(applicationDir, applicationId);
             stateDir1.mkdir();
             stateDir2.mkdir();
             stateDir3.mkdir();
             extraDir.mkdir();
 
-            final MockTime mockTime = new MockTime();
-
-            final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X");
             builder.addSource("source1", "topic1");
 
-            final MockClientSupplier mockClientSupplier = new MockClientSupplier();
-            final StreamThread thread = new StreamThread(builder, config, mockClientSupplier, applicationId, clientId, processId, new Metrics(), mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-                                                   0) {
+            final StreamThread thread = new StreamThread(
+                builder,
+                config,
+                clientSupplier,
+                applicationId,
+                clientId,
+                processId,
+                metrics,
+                mockTime,
+                new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+                0) {
 
                 @Override
                 public void maybeClean(final long now) {
@@ -496,15 +561,21 @@ public class StreamThreadTest {
                 @Override
                 protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitionsForTask) {
                     final ProcessorTopology topology = builder.build(id.topicGroupId);
-                    return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer,
-                        mockClientSupplier.getProducer(new HashMap()), restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory);
+                    return new TestStreamTask(
+                        id,
+                        applicationId,
+                        partitionsForTask,
+                        topology,
+                        consumer,
+                        clientSupplier.getProducer(new HashMap<String, Object>()),
+                        restoreConsumer,
+                        config,
+                        new MockStreamsMetrics(new Metrics()),
+                        stateDirectory);
                 }
             };
 
-            initPartitionGrouper(config, thread, mockClientSupplier);
-
-            final ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
-
+            initPartitionGrouper(config, thread, clientSupplier);
             assertTrue(thread.tasks().isEmpty());
             mockTime.sleep(cleanupDelay);
 
@@ -530,6 +601,7 @@ public class StreamThreadTest {
             assignedPartitions = Arrays.asList(t1p1, t1p2);
             prevTasks = new HashMap<>(thread.tasks());
 
+            final ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
             rebalanceListener.onPartitionsRevoked(revokedPartitions);
             rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
@@ -592,7 +664,6 @@ public class StreamThreadTest {
             assertFalse(stateDir2.exists());
             assertFalse(stateDir3.exists());
             assertTrue(extraDir.exists());
-
         } finally {
             Utils.delete(baseDir);
         }
@@ -603,20 +674,25 @@ public class StreamThreadTest {
         final File baseDir = Files.createTempDirectory("test").toFile();
         try {
             final long commitInterval = 1000L;
-            final Properties props = configProps();
+            final Properties props = configProps(false);
             props.setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
             props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(commitInterval));
 
             final StreamsConfig config = new StreamsConfig(props);
 
-            final MockTime mockTime = new MockTime();
-
-            final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X");
             builder.addSource("source1", "topic1");
 
-            final MockClientSupplier mockClientSupplier = new MockClientSupplier();
-            final StreamThread thread = new StreamThread(builder, config, mockClientSupplier, applicationId, clientId, processId, new Metrics(), mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-                                                   0) {
+            final StreamThread thread = new StreamThread(
+                builder,
+                config,
+                clientSupplier,
+                applicationId,
+                clientId,
+                processId,
+                metrics,
+                mockTime,
+                new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+                0) {
 
                 @Override
                 public void maybeCommit(final long now) {
@@ -626,12 +702,21 @@ public class StreamThreadTest {
                 @Override
                 protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitionsForTask) {
                     final ProcessorTopology topology = builder.build(id.topicGroupId);
-                    return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer,
-                        mockClientSupplier.getProducer(new HashMap()), restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory);
+                    return new TestStreamTask(
+                        id,
+                        applicationId,
+                        partitionsForTask,
+                        topology,
+                        consumer,
+                        clientSupplier.getProducer(new HashMap<String, Object>()),
+                        restoreConsumer,
+                        config,
+                        new MockStreamsMetrics(new Metrics()),
+                        stateDirectory);
                 }
             };
 
-            initPartitionGrouper(config, thread, mockClientSupplier);
+            initPartitionGrouper(config, thread, clientSupplier);
 
             final ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
 
@@ -684,10 +769,9 @@ public class StreamThreadTest {
     }
 
     @Test
-    public void shouldInjectSharedProducerForAllTasksUsingClientSupplierWhenEosDisabled() {
-        final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X").addSource("source1", "someTopic");
-        final StreamsConfig config = new StreamsConfig(configProps());
-        final MockClientSupplier clientSupplier = new MockClientSupplier();
+    public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEosDisabled() {
+        builder.addSource("source1", "someTopic");
+
         final StreamThread thread = new StreamThread(
             builder,
             config,
@@ -695,8 +779,8 @@ public class StreamThreadTest {
             applicationId,
             clientId,
             processId,
-            new Metrics(),
-            new MockTime(),
+            metrics,
+            mockTime,
             new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
             0);
 
@@ -718,21 +802,19 @@ public class StreamThreadTest {
     }
 
     @Test
-    public void shouldInjectProducerPerTaskUsingClientSupplierForEoS() {
-        final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X").addSource("source1", "someTopic");
-        final Properties properties = configProps();
-        properties.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
-        final StreamsConfig config = new StreamsConfig(properties);
-        final MockClientSupplier clientSupplier = new MockClientSupplier();
+    public void shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable() {
+        builder.addSource("source1", "someTopic");
+
+        final MockClientSupplier clientSupplier = new MockClientSupplier(applicationId);
         final StreamThread thread = new StreamThread(
             builder,
-            config,
+            new StreamsConfig(configProps(true)),
             clientSupplier,
             applicationId,
             clientId,
             processId,
-            new Metrics(),
-            new MockTime(),
+            metrics,
+            mockTime,
             new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
             0);
 
@@ -757,21 +839,19 @@ public class StreamThreadTest {
     }
 
     @Test
-    public void shouldCloseAllTaskProducers() {
-        final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X").addSource("source1", "someTopic");
-        final Properties properties = configProps();
-        properties.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
-        final StreamsConfig config = new StreamsConfig(properties);
-        final MockClientSupplier clientSupplier = new MockClientSupplier();
+    public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() {
+        builder.addSource("source1", "someTopic");
+
+        final MockClientSupplier clientSupplier = new MockClientSupplier(applicationId);
         final StreamThread thread = new StreamThread(
             builder,
-            config,
+            new StreamsConfig(configProps(true)),
             clientSupplier,
             applicationId,
             clientId,
             processId,
-            new Metrics(),
-            new MockTime(),
+            metrics,
+            mockTime,
             new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
             0);
 
@@ -791,10 +871,9 @@ public class StreamThreadTest {
     }
 
     @Test
-    public void shouldCloseThreadProducer() {
-        final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X").addSource("source1", "someTopic");
-        final StreamsConfig config = new StreamsConfig(configProps());
-        final MockClientSupplier clientSupplier = new MockClientSupplier();
+    public void shouldCloseThreadProducerOnCloseIfEosDisabled() {
+        builder.addSource("source1", "someTopic");
+
         final StreamThread thread = new StreamThread(
             builder,
             config,
@@ -802,8 +881,8 @@ public class StreamThreadTest {
             applicationId,
             clientId,
             processId,
-            new Metrics(),
-            new MockTime(),
+            metrics,
+            mockTime,
             new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
             0);
 
@@ -822,15 +901,19 @@ public class StreamThreadTest {
 
     @Test
     public void shouldNotNullPointerWhenStandbyTasksAssignedAndNoStateStoresForTopology() throws Exception {
-        final TopologyBuilder builder = new TopologyBuilder();
-        builder.setApplicationId(applicationId)
-                .addSource("name", "topic")
-                .addSink("out", "output");
-
+        builder.addSource("name", "topic").addSink("out", "output");
 
-        final StreamsConfig config = new StreamsConfig(configProps());
-        final StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), applicationId,
-                                               clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0);
+        final StreamThread thread = new StreamThread(
+            builder,
+            config,
+            clientSupplier,
+            applicationId,
+            clientId,
+            processId,
+            metrics,
+            mockTime,
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            0);
 
         thread.setPartitionAssignor(new StreamPartitionAssignor() {
             @Override
@@ -849,11 +932,18 @@ public class StreamThreadTest {
         builder.setApplicationId(applicationId);
         builder.stream("t1").groupByKey().count("count-one");
         builder.stream("t2").groupByKey().count("count-two");
-        final StreamsConfig config = new StreamsConfig(configProps());
-        final MockClientSupplier clientSupplier = new MockClientSupplier();
 
-        final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId,
-                                                     clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0);
+        final StreamThread thread = new StreamThread(
+            builder,
+            config,
+            clientSupplier,
+            applicationId,
+            clientId,
+            processId,
+            metrics,
+            mockTime,
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            0);
 
         final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
         restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog",
@@ -900,11 +990,18 @@ public class StreamThreadTest {
         builder.setApplicationId(applicationId);
         builder.stream("t1").groupByKey().count("count-one");
         builder.stream("t2").groupByKey().count("count-two");
-        final StreamsConfig config = new StreamsConfig(configProps());
-        final MockClientSupplier clientSupplier = new MockClientSupplier();
 
-        final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId,
-                                                     clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0);
+        final StreamThread thread = new StreamThread(
+            builder,
+            config,
+            clientSupplier,
+            applicationId,
+            clientId,
+            processId,
+            metrics,
+            mockTime,
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            0);
         final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
         restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog",
                                          Collections.singletonList(new PartitionInfo("stream-thread-test-count-one-changelog",
@@ -964,18 +1061,35 @@ public class StreamThreadTest {
         final KStreamBuilder builder = new KStreamBuilder();
         builder.setApplicationId(applicationId);
         builder.stream(Pattern.compile("t.*")).to("out");
-        final StreamsConfig config = new StreamsConfig(configProps());
-        final MockClientSupplier mockClientSupplier = new MockClientSupplier();
 
         final Map<Collection<TopicPartition>, TestStreamTask> createdTasks = new HashMap<>();
 
-        final StreamThread thread = new StreamThread(builder, config, mockClientSupplier, applicationId,
-                                                     clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) {
+        final StreamThread thread = new StreamThread(
+            builder,
+            config,
+            clientSupplier,
+            applicationId,
+            clientId,
+            processId,
+            metrics,
+            mockTime,
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            0) {
+
             @Override
             protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
                 final ProcessorTopology topology = builder.build(id.topicGroupId);
-                final TestStreamTask task = new TestStreamTask(id, applicationId, partitions, topology, consumer,
-                    mockClientSupplier.getProducer(new HashMap()), restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory);
+                final TestStreamTask task = new TestStreamTask(
+                    id,
+                    applicationId,
+                    partitions,
+                    topology,
+                    consumer,
+                    clientSupplier.getProducer(new HashMap<String, Object>()),
+                    restoreConsumer,
+                    config,
+                    new MockStreamsMetrics(new Metrics()),
+                    stateDirectory);
                 createdTasks.put(partitions, task);
                 return task;
             }
@@ -1024,43 +1138,177 @@ public class StreamThreadTest {
     }
 
     @Test
+    public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWhileProcessing() throws Exception {
+        builder.addSource("source", TOPIC).addSink("sink", "dummyTopic", "source");
+
+        final MockClientSupplier clientSupplier = new MockClientSupplier(applicationId);
+        final StreamThread thread = new StreamThread(
+            builder,
+            new StreamsConfig(configProps(true)),
+            clientSupplier,
+            applicationId,
+            clientId,
+            processId,
+            metrics,
+            mockTime,
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            0);
+
+        final MockConsumer consumer = clientSupplier.consumer;
+        consumer.updatePartitions(TOPIC, Collections.singletonList(new PartitionInfo(TOPIC, 0, null, null, null)));
+
+        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+        activeTasks.put(task1, task0Assignment);
+
+        thread.setPartitionAssignor(new MockStreamsPartitionAssignor(activeTasks));
+
+        thread.rebalanceListener.onPartitionsRevoked(null);
+        thread.rebalanceListener.onPartitionsAssigned(task0Assignment);
+        assertThat(thread.tasks().size(), equalTo(1));
+        final MockProducer producer = clientSupplier.producers.get(0);
+
+        thread.start();
+
+        TestUtils.waitForCondition(
+            new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    return !consumer.subscription().isEmpty();
+                }
+            },
+            "StreamsThread's internal consumer did not subscribe to input topic.");
+
+        // change consumer subscription from "pattern" to "manual" to be able to call .addRecords()
+        consumer.updateBeginningOffsets(new HashMap<TopicPartition, Long>() {
+            {
+                put(task0Assignment.iterator().next(), 0L);
+            }
+        });
+        consumer.unsubscribe();
+        consumer.assign(task0Assignment);
+
+        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, new byte[0], new byte[0]));
+        mockTime.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) + 1);
+        TestUtils.waitForCondition(
+            new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    return producer.history().size() == 1;
+                }
+            },
+            "StreamsThread did not produce output record.");
+
+        assertFalse(producer.transactionCommitted());
+        mockTime.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) + 1L);
+        TestUtils.waitForCondition(
+            new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    return producer.commitCount() == 1;
+                }
+            },
+            "StreamsThread did not commit transaction.");
+
+        producer.fenceProducer();
+        mockTime.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) + 1L);
+
+        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, new byte[0], new byte[0]));
+        TestUtils.waitForCondition(
+            new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    return thread.tasks().isEmpty();
+                }
+            },
+            "StreamsThread did not remove fenced zombie task.");
+
+        thread.close();
+        thread.join();
+
+        assertThat(producer.commitCount(), equalTo(1L));
+    }
+
+    @Test
+    public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedAtBeginTransactionWhenTaskIsResumed() throws Exception {
+        builder.addSource("name", "topic").addSink("out", "output");
+
+        final MockClientSupplier clientSupplier = new MockClientSupplier(applicationId);
+        final StreamThread thread = new StreamThread(
+            builder,
+            new StreamsConfig(configProps(true)),
+            clientSupplier,
+            applicationId,
+            clientId,
+            processId,
+            new Metrics(),
+            new MockTime(),
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            0);
+
+        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+        activeTasks.put(task1, task0Assignment);
+
+        thread.setPartitionAssignor(new MockStreamsPartitionAssignor(activeTasks));
+
+        thread.rebalanceListener.onPartitionsRevoked(null);
+        thread.rebalanceListener.onPartitionsAssigned(task0Assignment);
+        assertThat(thread.tasks().size(), equalTo(1));
+
+        thread.rebalanceListener.onPartitionsRevoked(null);
+        clientSupplier.producers.get(0).fenceProducer();
+        try {
+            thread.rebalanceListener.onPartitionsAssigned(task0Assignment);
+            fail("should have thrown " + ProducerFencedException.class.getSimpleName());
+        } catch (final ProducerFencedException e) { }
+
+        assertTrue(thread.tasks().isEmpty());
+    }
+
+    @Test
     public void shouldNotViolateAtLeastOnceWhenAnExceptionOccursOnTaskCloseDuringShutdown() throws Exception {
         final KStreamBuilder builder = new KStreamBuilder();
         builder.setApplicationId(applicationId);
         builder.stream("t1").groupByKey();
-        final StreamsConfig config = new StreamsConfig(configProps());
-        final MockClientSupplier clientSupplier = new MockClientSupplier();
-        final TestStreamTask testStreamTask = new TestStreamTask(new TaskId(0, 0),
-                                                                 applicationId,
-                                                                 Utils.mkSet(new TopicPartition("t1", 0)),
-                                                                 builder.build(0),
-                                                                 clientSupplier.consumer,
-                                                                 clientSupplier.getProducer(new HashMap()),
-                                                                 clientSupplier.restoreConsumer,
-                                                                 config,
-                                                                 new MockStreamsMetrics(new Metrics()),
-                                                                 new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time)) {
+
+        final TestStreamTask testStreamTask = new TestStreamTask(
+            new TaskId(0, 0),
+            applicationId,
+            Utils.mkSet(new TopicPartition("t1", 0)),
+            builder.build(0),
+            clientSupplier.consumer,
+            clientSupplier.getProducer(new HashMap<String, Object>()),
+            clientSupplier.restoreConsumer,
+            config,
+            new MockStreamsMetrics(new Metrics()),
+            new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), mockTime)) {
+
             @Override
-            public void close() {
+            public void close(final boolean clean) {
                 throw new RuntimeException("KABOOM!");
             }
         };
-        final StreamsConfig config1 = new StreamsConfig(configProps());
 
-        final StreamThread thread = new StreamThread(builder, config1, clientSupplier, applicationId,
-                                                     clientId, processId, new Metrics(), new MockTime(),
-                                                     new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) {
+        final StreamThread thread = new StreamThread(
+            builder,
+            config,
+            clientSupplier,
+            applicationId,
+            clientId,
+            processId,
+            metrics,
+            mockTime,
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            0) {
+
             @Override
             protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
                 return testStreamTask;
             }
         };
 
-
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
         activeTasks.put(testStreamTask.id(), testStreamTask.partitions);
 
-
         thread.setPartitionAssignor(new MockStreamsPartitionAssignor(activeTasks));
 
         thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
@@ -1070,37 +1318,42 @@ public class StreamThreadTest {
         thread.close();
         thread.join();
         assertFalse("task shouldn't have been committed as there was an exception during shutdown", testStreamTask.committed);
-
-
     }
 
     @Test
     public void shouldNotViolateAtLeastOnceWhenAnExceptionOccursOnTaskFlushDuringShutdown() throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
-        builder.setApplicationId(applicationId);
         final MockStateStoreSupplier.MockStateStore stateStore = new MockStateStoreSupplier.MockStateStore("foo", false);
         builder.stream("t1").groupByKey().count(new MockStateStoreSupplier(stateStore));
-        final StreamsConfig config = new StreamsConfig(configProps());
-        final MockClientSupplier clientSupplier = new MockClientSupplier();
-        final TestStreamTask testStreamTask = new TestStreamTask(new TaskId(0, 0),
-                                                                 applicationId,
-                                                                 Utils.mkSet(new TopicPartition("t1", 0)),
-                                                                 builder.build(0),
-                                                                 clientSupplier.consumer,
-                                                                 clientSupplier.getProducer(new HashMap()),
-                                                                 clientSupplier.restoreConsumer,
-                                                                 config,
-                                                                 new MockStreamsMetrics(new Metrics()),
-                                                                 new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time)) {
+        final TestStreamTask testStreamTask = new TestStreamTask(
+            new TaskId(0, 0),
+            applicationId,
+            Utils.mkSet(new TopicPartition("t1", 0)),
+            builder.build(0),
+            clientSupplier.consumer,
+            clientSupplier.getProducer(new HashMap<String, Object>()),
+            clientSupplier.restoreConsumer,
+            config,
+            new MockStreamsMetrics(metrics),
+            new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), mockTime)) {
+
             @Override
             public void flushState() {
                 throw new RuntimeException("KABOOM!");
             }
         };
 
-        final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId,
-                                                     clientId, processId, new Metrics(), new MockTime(),
-                                                     new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) {
+        final StreamThread thread = new StreamThread(
+            builder,
+            config,
+            clientSupplier,
+            applicationId,
+            clientId,
+            processId,
+            metrics,
+            mockTime,
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            0) {
+
             @Override
             protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
                 return testStreamTask;
@@ -1132,28 +1385,37 @@ public class StreamThreadTest {
         final KStreamBuilder builder = new KStreamBuilder();
         builder.setApplicationId(applicationId);
         builder.stream("t1").groupByKey();
-        final StreamsConfig config = new StreamsConfig(configProps());
-        final MockClientSupplier clientSupplier = new MockClientSupplier();
-        final TestStreamTask testStreamTask = new TestStreamTask(new TaskId(0, 0),
-                                                                 applicationId,
-                                                                 Utils.mkSet(new TopicPartition("t1", 0)),
-                                                                 builder.build(0),
-                                                                 clientSupplier.consumer,
-                                                                 clientSupplier.getProducer(new HashMap()),
-                                                                 clientSupplier.restoreConsumer,
-                                                                 config,
-                                                                 new MockStreamsMetrics(new Metrics()),
-                                                                 new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time)) {
+
+        final TestStreamTask testStreamTask = new TestStreamTask(
+            new TaskId(0, 0),
+            applicationId,
+            Utils.mkSet(new TopicPartition("t1", 0)),
+            builder.build(0),
+            clientSupplier.consumer,
+            clientSupplier.getProducer(new HashMap<String, Object>()),
+            clientSupplier.restoreConsumer,
+            config,
+            new MockStreamsMetrics(new Metrics()),
+            new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), mockTime)) {
+
             @Override
             public void suspend() {
                 throw new RuntimeException("KABOOM!");
             }
         };
-        final StreamsConfig config1 = new StreamsConfig(configProps());
 
-        final StreamThread thread = new StreamThread(builder, config1, clientSupplier, applicationId,
-                                                     clientId, processId, new Metrics(), new MockTime(),
-                                                     new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) {
+        final StreamThread thread = new StreamThread(
+            builder,
+            config,
+            clientSupplier,
+            applicationId,
+            clientId,
+            processId,
+            metrics,
+            mockTime,
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            0) {
+
             @Override
             protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
                 return testStreamTask;
@@ -1183,28 +1445,36 @@ public class StreamThreadTest {
         final KStreamBuilder builder = new KStreamBuilder();
         builder.setApplicationId(applicationId);
         builder.stream("t1").groupByKey();
-        final StreamsConfig config = new StreamsConfig(configProps());
-        final MockClientSupplier clientSupplier = new MockClientSupplier();
-        final TestStreamTask testStreamTask = new TestStreamTask(new TaskId(0, 0),
-                                                                 applicationId,
-                                                                 Utils.mkSet(new TopicPartition("t1", 0)),
-                                                                 builder.build(0),
-                                                                 clientSupplier.consumer,
-                                                                 clientSupplier.getProducer(new HashMap()),
-                                                                 clientSupplier.restoreConsumer,
-                                                                 config,
-                                                                 new MockStreamsMetrics(new Metrics()),
-                                                                 new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time)) {
+
+        final TestStreamTask testStreamTask = new TestStreamTask(
+            new TaskId(0, 0),
+            applicationId,
+            Utils.mkSet(new TopicPartition("t1", 0)),
+            builder.build(0),
+            clientSupplier.consumer,
+            clientSupplier.getProducer(new HashMap<String, Object>()),
+            clientSupplier.restoreConsumer,
+            config,
+            new MockStreamsMetrics(new Metrics()),
+            new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), mockTime)) {
+
             @Override
             protected void flushState() {
                 throw new RuntimeException("KABOOM!");
             }
         };
-        final StreamsConfig config1 = new StreamsConfig(configProps());
 
-        final StreamThread thread = new StreamThread(builder, config1, clientSupplier, applicationId,
-                                                     clientId, processId, new Metrics(), new MockTime(),
-                                                     new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) {
+        final StreamThread thread = new StreamThread(
+            builder,
+            config,
+            clientSupplier,
+            applicationId,
+            clientId,
+            processId,
+            metrics,
+            mockTime,
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) {
+
             @Override
             protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
                 return testStreamTask;
@@ -1227,11 +1497,12 @@ public class StreamThreadTest {
             // expected
         }
         assertFalse(testStreamTask.committed);
-
     }
 
+    private void initPartitionGrouper(final StreamsConfig config,
+                                      final StreamThread thread,
+                                      final MockClientSupplier clientSupplier) {
 
-    private void initPartitionGrouper(final StreamsConfig config, final StreamThread thread, final MockClientSupplier clientSupplier) {
         final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
 
         partitionAssignor.configure(config.getConsumerConfigs(thread, thread.applicationId, thread.clientId));

http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index f8b17b2..bf55b47 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -27,20 +27,19 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
 import org.apache.kafka.streams.processor.internals.StreamTask;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.ReadOnlyWindowStore;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -61,7 +60,6 @@ import static org.junit.Assert.assertEquals;
 
 public class StreamThreadStateStoreProviderTest {
 
-    private StreamThread thread;
     private StreamTask taskOne;
     private StreamTask taskTwo;
     private StreamThreadStateStoreProvider provider;
@@ -113,23 +111,29 @@ public class StreamThreadStateStoreProviderTest {
                   taskTwo);
 
         storesAvailable = true;
-        thread = new StreamThread(builder, streamsConfig, clientSupplier,
-                                  applicationId,
-                                  "clientId", UUID.randomUUID(), new Metrics(),
-                                  Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-                                  0) {
-            @Override
-            public Map<TaskId, StreamTask> tasks() {
-                return tasks;
-            }
-
-            @Override
-            public boolean isInitialized() {
-                return storesAvailable;
-            }
-        };
-        provider = new StreamThreadStateStoreProvider(thread);
-
+        provider = new StreamThreadStateStoreProvider(
+            new StreamThread(
+                builder,
+                streamsConfig,
+                clientSupplier,
+                applicationId,
+                "clientId",
+                UUID.randomUUID(),
+                new Metrics(),
+                Time.SYSTEM,
+                new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+                0) {
+
+                @Override
+                public Map<TaskId, StreamTask> tasks() {
+                    return tasks;
+                }
+
+                @Override
+                public boolean isInitialized() {
+                    return storesAvailable;
+                }
+            });
     }
 
     @After
@@ -139,7 +143,7 @@ public class StreamThreadStateStoreProviderTest {
     
     @Test
     public void shouldFindKeyValueStores() throws Exception {
-        List<ReadOnlyKeyValueStore<String, String>> kvStores =
+        final List<ReadOnlyKeyValueStore<String, String>> kvStores =
             provider.stores("kv-store", QueryableStoreTypes.<String, String>keyValueStore());
         assertEquals(2, kvStores.size());
     }
@@ -188,15 +192,22 @@ public class StreamThreadStateStoreProviderTest {
                                          final MockClientSupplier clientSupplier,
                                          final ProcessorTopology topology,
                                          final TaskId taskId) {
-        return new StreamTask(taskId, applicationId, Collections
-                .singletonList(new TopicPartition(topicName, taskId.partition)), topology,
-                              clientSupplier.consumer,
-                              new StoreChangelogReader(clientSupplier.restoreConsumer, Time.SYSTEM, 5000),
-                              streamsConfig, new MockStreamsMetrics(new Metrics()), stateDirectory, null, new MockTime(), new NoOpRecordCollector()) {
-            @Override
-            protected void updateOffsetLimits() {
+        return new StreamTask(
+            taskId,
+            applicationId,
+            Collections.singletonList(new TopicPartition(topicName, taskId.partition)),
+            topology,
+            clientSupplier.consumer,
+            new StoreChangelogReader(clientSupplier.restoreConsumer, Time.SYSTEM, 5000),
+            streamsConfig,
+            new MockStreamsMetrics(new Metrics()),
+            stateDirectory,
+            null,
+            new MockTime(),
+            clientSupplier.getProducer(new HashMap<String, Object>())) {
 
-            }
+            @Override
+            protected void updateOffsetLimits() {}
         };
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
index 531fdb6..da5ab3b 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.streams.KafkaClientSupplier;
 
@@ -28,17 +29,35 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import static org.hamcrest.CoreMatchers.startsWith;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertFalse;
+
 public class MockClientSupplier implements KafkaClientSupplier {
+    private final String applicationId;
     private static final ByteArraySerializer BYTE_ARRAY_SERIALIZER = new ByteArraySerializer();
 
     public final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
     public final MockConsumer<byte[], byte[]> restoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
 
-    public final List<Producer> producers = new LinkedList<>();
+    public final List<MockProducer> producers = new LinkedList<>();
+
+    public MockClientSupplier() {
+        this(null);
+    }
+
+    public MockClientSupplier(final String applicationId) {
+        this.applicationId = applicationId;
+    }
 
     @Override
     public Producer<byte[], byte[]> getProducer(final Map<String, Object> config) {
-        final Producer<byte[], byte[]> producer = new MockProducer<>(true, BYTE_ARRAY_SERIALIZER, BYTE_ARRAY_SERIALIZER);
+        if (applicationId != null) {
+            assertThat((String) config.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG), startsWith(applicationId + "-"));
+        } else {
+            assertFalse(config.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG));
+        }
+        final MockProducer<byte[], byte[]> producer = new MockProducer<>(true, BYTE_ARRAY_SERIALIZER, BYTE_ARRAY_SERIALIZER);
         producers.add(producer);
         return producer;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java b/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
index 05175f9..66271a0 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
@@ -45,17 +45,14 @@ public class NoOpRecordCollector implements RecordCollector {
                             final StreamPartitioner<? super K, ? super V> partitioner) {}
 
     @Override
-    public void flush() {
-        //no-op
-    }
+    public void flush() {}
 
     @Override
-    public void close() {
-        //no-op
-    }
+    public void close() {}
 
     @Override
     public Map<TopicPartition, Long> offsets() {
         return Collections.emptyMap();
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 5d46ce0..a9f020b 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -16,17 +16,7 @@
  */
 package org.apache.kafka.test;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -50,19 +40,28 @@ import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl;
 import org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl;
 import org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
-import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
 import org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
 import org.apache.kafka.streams.processor.internals.StreamTask;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
-
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * This class makes it easier to write tests to verify the behavior of topologies created with a {@link TopologyBuilder}.
@@ -140,21 +139,16 @@ import java.io.IOException;
  */
 public class ProcessorTopologyTestDriver {
 
-    private final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
-
     private final static String APPLICATION_ID = "test-driver-application";
     private final static int PARTITION_ID = 0;
     private final static TaskId TASK_ID = new TaskId(0, PARTITION_ID);
 
     private final ProcessorTopology topology;
-    private final MockConsumer<byte[], byte[]> consumer;
     private final MockProducer<byte[], byte[]> producer;
-    private final MockConsumer<byte[], byte[]> restoreStateConsumer;
     private final Map<String, TopicPartition> partitionsByTopic = new HashMap<>();
     private final Map<TopicPartition, AtomicLong> offsetsByTopicPartition = new HashMap<>();
     private final Map<String, Queue<ProducerRecord<byte[], byte[]>>> outputRecordsByTopic = new HashMap<>();
     private final Set<String> internalTopics = new HashSet<>();
-    private final ProcessorTopology globalTopology;
     private final Map<String, TopicPartition> globalPartitionsByTopic = new HashMap<>();
     private StreamTask task;
     private GlobalStateUpdateTask globalStateTask;
@@ -164,28 +158,29 @@ public class ProcessorTopologyTestDriver {
      * @param config the stream configuration for the topology
      * @param builder the topology builder that will be used to create the topology instance
      */
-    public ProcessorTopologyTestDriver(StreamsConfig config, TopologyBuilder builder) {
+    public ProcessorTopologyTestDriver(final StreamsConfig config,
+                                       final TopologyBuilder builder) {
         topology = builder.setApplicationId(APPLICATION_ID).build(null);
-        globalTopology  = builder.buildGlobalStateTopology();
+        final ProcessorTopology globalTopology  = builder.buildGlobalStateTopology();
 
         // Set up the consumer and producer ...
-        consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        final Consumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
         producer = new MockProducer<byte[], byte[]>(true, bytesSerializer, bytesSerializer) {
             @Override
-            public List<PartitionInfo> partitionsFor(String topic) {
+            public List<PartitionInfo> partitionsFor(final String topic) {
                 return Collections.singletonList(new PartitionInfo(topic, PARTITION_ID, null, null, null));
             }
         };
-        restoreStateConsumer = createRestoreConsumer(TASK_ID, topology.storeToChangelogTopic());
 
         // Identify internal topics for forwarding in process ...
-        for (TopologyBuilder.TopicsInfo topicsInfo : builder.topicGroups().values()) {
+        for (final TopologyBuilder.TopicsInfo topicsInfo : builder.topicGroups().values()) {
             internalTopics.addAll(topicsInfo.repartitionSourceTopics.keySet());
         }
 
         // Set up all of the topic+partition information and subscribe the consumer to each ...
-        for (String topic : topology.sourceTopics()) {
-            TopicPartition tp = new TopicPartition(topic, PARTITION_ID);
+        for (final String topic : topology.sourceTopics()) {
+            final TopicPartition tp = new TopicPartition(topic, PARTITION_ID);
             partitionsByTopic.put(topic, tp);
             offsetsByTopicPartition.put(tp, new AtomicLong());
         }
@@ -199,7 +194,7 @@ public class ProcessorTopologyTestDriver {
         if (globalTopology != null) {
             final MockConsumer<byte[], byte[]> globalConsumer = createGlobalConsumer();
             for (final String topicName : globalTopology.sourceTopics()) {
-                List<PartitionInfo> partitionInfos = new ArrayList<>();
+                final List<PartitionInfo> partitionInfos = new ArrayList<>();
                 partitionInfos.add(new PartitionInfo(topicName, 1, null, null, null));
                 globalConsumer.updatePartitions(topicName, partitionInfos);
                 final TopicPartition partition = new TopicPartition(topicName, 1);
@@ -221,12 +216,15 @@ public class ProcessorTopologyTestDriver {
                                   partitionsByTopic.values(),
                                   topology,
                                   consumer,
-                                  new StoreChangelogReader(restoreStateConsumer, Time.SYSTEM, 5000),
+                                  new StoreChangelogReader(
+                                      createRestoreConsumer(topology.storeToChangelogTopic()),
+                                      Time.SYSTEM,
+                                      5000),
                                   config,
                                   streamsMetrics, stateDirectory,
                                   cache,
                                   new MockTime(),
-                                  new RecordCollectorImpl(producer, "id"));
+                                  producer);
         }
     }
 
@@ -238,12 +236,15 @@ public class ProcessorTopologyTestDriver {
      * @param value the raw message value
      * @param timestamp the raw message timestamp
      */
-    private void process(String topicName, byte[] key, byte[] value, long timestamp) {
+    private void process(final String topicName,
+                         final byte[] key,
+                         final byte[] value,
+                         final long timestamp) {
 
-        TopicPartition tp = partitionsByTopic.get(topicName);
+        final TopicPartition tp = partitionsByTopic.get(topicName);
         if (tp != null) {
             // Add the record ...
-            long offset = offsetsByTopicPartition.get(tp).incrementAndGet();
+            final long offset = offsetsByTopicPartition.get(tp).incrementAndGet();
             task.addRecords(tp, records(new ConsumerRecord<>(tp.topic(), tp.partition(), offset, timestamp, TimestampType.CREATE_TIME, 0L, 0, 0, key, value)));
             producer.clear();
 
@@ -253,7 +254,7 @@ public class ProcessorTopologyTestDriver {
             task.commit();
 
             // Capture all the records sent to the producer ...
-            for (ProducerRecord<byte[], byte[]> record : producer.history()) {
+            for (final ProducerRecord<byte[], byte[]> record : producer.history()) {
                 Queue<ProducerRecord<byte[], byte[]>> outputRecords = outputRecordsByTopic.get(record.topic());
                 if (outputRecords == null) {
                     outputRecords = new LinkedList<>();
@@ -284,7 +285,9 @@ public class ProcessorTopologyTestDriver {
      * @param key the raw message key
      * @param value the raw message value
      */
-    public void process(String topicName, byte[] key, byte[] value) {
+    public void process(final String topicName,
+                        final byte[] key,
+                        final byte[] value) {
         process(topicName, key, value, 0L);
     }
 
@@ -297,7 +300,11 @@ public class ProcessorTopologyTestDriver {
      * @param keySerializer the serializer for the key
      * @param valueSerializer the serializer for the value
      */
-    public <K, V> void process(String topicName, K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
+    public <K, V> void process(final String topicName,
+                               final K key,
+                               final V value,
+                               final Serializer<K> keySerializer,
+                               final Serializer<V> valueSerializer) {
         process(topicName, keySerializer.serialize(topicName, key), valueSerializer.serialize(topicName, value));
     }
 
@@ -308,9 +315,11 @@ public class ProcessorTopologyTestDriver {
      * @param topic the name of the topic
      * @return the next record on that topic, or null if there is no record available
      */
-    public ProducerRecord<byte[], byte[]> readOutput(String topic) {
-        Queue<ProducerRecord<byte[], byte[]>> outputRecords = outputRecordsByTopic.get(topic);
-        if (outputRecords == null) return null;
+    public ProducerRecord<byte[], byte[]> readOutput(final String topic) {
+        final Queue<ProducerRecord<byte[], byte[]>> outputRecords = outputRecordsByTopic.get(topic);
+        if (outputRecords == null) {
+            return null;
+        }
         return outputRecords.poll();
     }
 
@@ -323,15 +332,19 @@ public class ProcessorTopologyTestDriver {
      * @param valueDeserializer the deserializer for the value type
      * @return the next record on that topic, or null if there is no record available
      */
-    public <K, V> ProducerRecord<K, V> readOutput(String topic, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
-        ProducerRecord<byte[], byte[]> record = readOutput(topic);
-        if (record == null) return null;
-        K key = keyDeserializer.deserialize(record.topic(), record.key());
-        V value = valueDeserializer.deserialize(record.topic(), record.value());
-        return new ProducerRecord<K, V>(record.topic(), record.partition(), record.timestamp(), key, value);
+    public <K, V> ProducerRecord<K, V> readOutput(final String topic,
+                                                  final Deserializer<K> keyDeserializer,
+                                                  final Deserializer<V> valueDeserializer) {
+        final ProducerRecord<byte[], byte[]> record = readOutput(topic);
+        if (record == null) {
+            return null;
+        }
+        final K key = keyDeserializer.deserialize(record.topic(), record.key());
+        final V value = valueDeserializer.deserialize(record.topic(), record.value());
+        return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), key, value);
     }
 
-    private Iterable<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]> record) {
+    private Iterable<ConsumerRecord<byte[], byte[]>> records(final ConsumerRecord<byte[], byte[]> record) {
         return Collections.singleton(record);
     }
 
@@ -347,7 +360,7 @@ public class ProcessorTopologyTestDriver {
      * @return the state store, or null if no store has been registered with the given name
      * @see #getKeyValueStore(String)
      */
-    public StateStore getStateStore(String name) {
+    public StateStore getStateStore(final String name) {
         return ((ProcessorContextImpl) task.context()).getStateMgr().getStore(name);
     }
 
@@ -365,8 +378,8 @@ public class ProcessorTopologyTestDriver {
      * @see #getStateStore(String)
      */
     @SuppressWarnings("unchecked")
-    public <K, V> KeyValueStore<K, V> getKeyValueStore(String name) {
-        StateStore store = getStateStore(name);
+    public <K, V> KeyValueStore<K, V> getKeyValueStore(final String name) {
+        final StateStore store = getStateStore(name);
         return store instanceof KeyValueStore ? (KeyValueStore<K, V>) getStateStore(name) : null;
     }
 
@@ -375,12 +388,12 @@ public class ProcessorTopologyTestDriver {
      */
     public void close() {
         if (task != null) {
-            task.close();
+            task.close(true);
         }
         if (globalStateTask != null) {
             try {
                 globalStateTask.close();
-            } catch (IOException e) {
+            } catch (final IOException e) {
                 // ignore
             }
         }
@@ -390,35 +403,29 @@ public class ProcessorTopologyTestDriver {
      * Utility method that creates the {@link MockConsumer} used for restoring state, which should not be done by this
      * driver object unless this method is overwritten with a functional consumer.
      *
-     * @param id the ID of the stream task
      * @param storeToChangelogTopic the map of the names of the stores to the changelog topics
      * @return the mock consumer; never null
      */
-    protected MockConsumer<byte[], byte[]> createRestoreConsumer(TaskId id, Map<String, String> storeToChangelogTopic) {
-        MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.LATEST) {
+    private MockConsumer<byte[], byte[]> createRestoreConsumer(final Map<String, String> storeToChangelogTopic) {
+        final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.LATEST) {
             @Override
-            public synchronized void seekToEnd(Collection<TopicPartition> partitions) {
-                // do nothing ...
-            }
+            public synchronized void seekToEnd(final Collection<TopicPartition> partitions) {}
 
             @Override
-            public synchronized void seekToBeginning(Collection<TopicPartition> partitions) {
-                // do nothing ...
-            }
+            public synchronized void seekToBeginning(final Collection<TopicPartition> partitions) {}
 
             @Override
-            public synchronized long position(TopicPartition partition) {
-                // do nothing ...
+            public synchronized long position(final TopicPartition partition) {
                 return 0L;
             }
         };
         // For each store ...
-        for (Map.Entry<String, String> storeAndTopic: storeToChangelogTopic.entrySet()) {
-            String topicName = storeAndTopic.getValue();
+        for (final Map.Entry<String, String> storeAndTopic: storeToChangelogTopic.entrySet()) {
+            final String topicName = storeAndTopic.getValue();
             // Set up the restore-state topic ...
             // consumer.subscribe(new TopicPartition(topicName, 1));
             // Set up the partition that matches the ID (which is what ProcessorStateManager expects) ...
-            List<PartitionInfo> partitionInfos = new ArrayList<>();
+            final List<PartitionInfo> partitionInfos = new ArrayList<>();
             partitionInfos.add(new PartitionInfo(topicName, PARTITION_ID, null, null, null));
             consumer.updatePartitions(topicName, partitionInfos);
             consumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(topicName, PARTITION_ID), 0L));
@@ -426,25 +433,19 @@ public class ProcessorTopologyTestDriver {
         return consumer;
     }
 
-    protected MockConsumer<byte[], byte[]> createGlobalConsumer() {
-        MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.LATEST) {
+    private MockConsumer<byte[], byte[]> createGlobalConsumer() {
+        return new MockConsumer<byte[], byte[]>(OffsetResetStrategy.LATEST) {
             @Override
-            public synchronized void seekToEnd(Collection<TopicPartition> partitions) {
-                // do nothing ...
-            }
+            public synchronized void seekToEnd(final Collection<TopicPartition> partitions) {}
 
             @Override
-            public synchronized void seekToBeginning(Collection<TopicPartition> partitions) {
-                // do nothing ...
-            }
+            public synchronized void seekToBeginning(final Collection<TopicPartition> partitions) {}
 
             @Override
-            public synchronized long position(TopicPartition partition) {
-                // do nothing ...
+            public synchronized long position(final TopicPartition partition) {
                 return 0L;
             }
         };
-
-        return consumer;
     }
+
 }


Mime
View raw message