kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: KAFKA-6150: KIP-204 part III; Purge repartition topics with the admin client
Date Mon, 04 Dec 2017 18:21:47 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 39c438ec1 -> 4b8a29f12


http://git-wip-us.apache.org/repos/asf/kafka/blob/4b8a29f1/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 0166480..c852ae3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -16,15 +16,11 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.MockConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.record.TimestampType;
@@ -38,7 +34,6 @@ import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -48,17 +43,15 @@ import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.MockProcessorNode;
 import org.apache.kafka.test.MockSourceNode;
 import org.apache.kafka.test.MockStateRestoreListener;
-import org.apache.kafka.test.MockStateStoreSupplier;
+import org.apache.kafka.test.MockStateStore;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.NoOpProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
-import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -68,7 +61,6 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -89,37 +81,55 @@ public class StreamTaskTest {
     private final Serializer<Integer> intSerializer = new IntegerSerializer();
     private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
     private final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
-    private final String[] topic1 = {"topic1"};
-    private final String[] topic2 = {"topic2"};
-    private final TopicPartition partition1 = new TopicPartition(topic1[0], 1);
-    private final TopicPartition partition2 = new TopicPartition(topic2[0], 1);
+    private final String topic1 = "topic1";
+    private final String topic2 = "topic2";
+    private final TopicPartition partition1 = new TopicPartition(topic1, 1);
+    private final TopicPartition partition2 = new TopicPartition(topic2, 1);
     private final Set<TopicPartition> partitions = Utils.mkSet(partition1, partition2);
 
-    private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<>(topic1, intDeserializer, intDeserializer);
-    private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<>(topic2, intDeserializer, intDeserializer);
+    private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<>(new String[]{topic1}, intDeserializer, intDeserializer);
+    private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<>(new String[]{topic2}, intDeserializer, intDeserializer);
+    private final MockSourceNode<Integer, Integer> source3 = new MockSourceNode<Integer, Integer>(new String[]{topic2}, intDeserializer, intDeserializer) {
+        @Override
+        public void process(Integer key, Integer value) {
+            throw new RuntimeException("KABOOM!");
+        }
+
+        @Override
+        public void close() {
+            throw new RuntimeException("KABOOM!");
+        }
+    };
     private final MockProcessorNode<Integer, Integer> processorStreamTime = new MockProcessorNode<>(10L);
     private final MockProcessorNode<Integer, Integer> processorSystemTime = new MockProcessorNode<>(10L, PunctuationType.WALL_CLOCK_TIME);
 
-    private final ProcessorTopology topology = new ProcessorTopology(
-            Arrays.<ProcessorNode>asList(source1, source2, processorStreamTime, processorSystemTime),
+    private final String storeName = "store";
+    private final StateStore stateStore = new MockStateStore(storeName, false);
+    private final TopicPartition changelogPartition = new TopicPartition("store-changelog", 0);
+    private final Long offset = 543L;
+
+    private final ProcessorTopology topology = ProcessorTopology.withSources(
+            Utils.<ProcessorNode>mkList(source1, source2, processorStreamTime, processorSystemTime),
             new HashMap<String, SourceNode>() {
                 {
-                    put(topic1[0], source1);
-                    put(topic2[0], source2);
+                    put(topic1, source1);
+                    put(topic2, source2);
                 }
-            },
-            Collections.<String, SinkNode>emptyMap(),
-            Collections.<StateStore>emptyList(),
-            Collections.<String, String>emptyMap(),
-            Collections.<StateStore>emptyList());
+            }
+    );
+
     private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
     private final MockProducer<byte[], byte[]> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer);
     private final MockConsumer<byte[], byte[]> restoreStateConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
     private final StateRestoreListener stateRestoreListener = new MockStateRestoreListener();
-    private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer, stateRestoreListener, new LogContext("stream-task-test "));
+    private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer, stateRestoreListener, new LogContext("stream-task-test ")) {
+        @Override
+        public Map<TopicPartition, Long> restoredOffsets() {
+            return Collections.singletonMap(changelogPartition, offset);
+        }
+    };
     private final byte[] recordValue = intSerializer.serialize(null, 10);
     private final byte[] recordKey = intSerializer.serialize(null, 1);
-    private final String applicationId = "applicationId";
     private final Metrics metrics = new Metrics();
     private final StreamsMetrics streamsMetrics = new MockStreamsMetrics(metrics);
     private final TaskId taskId00 = new TaskId(0, 0);
@@ -156,23 +166,20 @@ public class StreamTaskTest {
     @Before
     public void setup() throws IOException {
         consumer.assign(Arrays.asList(partition1, partition2));
-        source1.addChild(processorStreamTime);
-        source2.addChild(processorStreamTime);
-        source1.addChild(processorSystemTime);
-        source2.addChild(processorSystemTime);
         config = createConfig(false);
         eosConfig = createConfig(true);
         stateDirectory = new StateDirectory(config, new MockTime());
-        task = new StreamTask(taskId00, partitions, topology, consumer,
-                              changelogReader, config, streamsMetrics, stateDirectory, null, time, producer);
-        task.initialize();
     }
 
     @After
     public void cleanup() throws IOException {
         try {
             if (task != null) {
-                task.close(true, false);
+                try {
+                    task.close(true, false);
+                } catch (Exception e) {
+                    // swallow
+                }
             }
         } finally {
             Utils.delete(baseDir);
@@ -182,6 +189,8 @@ public class StreamTaskTest {
     @SuppressWarnings("unchecked")
     @Test
     public void testProcessOrder() {
+        task = createStatelessTask(false);
+
         task.addRecords(partition1, records(
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
@@ -238,6 +247,8 @@ public class StreamTaskTest {
 
     @Test
     public void testMetrics() {
+        task = createStatelessTask(false);
+
         final String name = task.id().toString();
         final Map<String, String> metricTags = new LinkedHashMap<>();
         metricTags.put("task-id", name);
@@ -254,6 +265,8 @@ public class StreamTaskTest {
     @SuppressWarnings("unchecked")
     @Test
     public void testPauseResume() {
+        task = createStatelessTask(false);
+
         task.addRecords(partition1, records(
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
@@ -307,6 +320,9 @@ public class StreamTaskTest {
     @SuppressWarnings("unchecked")
     @Test
     public void testMaybePunctuateStreamTime() {
+        task = createStatelessTask(false);
+        task.initialize();
+
         task.addRecords(partition1, records(
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
@@ -370,6 +386,9 @@ public class StreamTaskTest {
     @SuppressWarnings("unchecked")
     @Test
     public void testCancelPunctuateStreamTime() {
+        task = createStatelessTask(false);
+        task.initialize();
+
         task.addRecords(partition1, records(
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
@@ -399,6 +418,8 @@ public class StreamTaskTest {
 
     @Test
     public void shouldPunctuateSystemTimeWhenIntervalElapsed() {
+        task = createStatelessTask(false);
+        task.initialize();
         long now = time.milliseconds();
         time.sleep(10);
         assertTrue(task.maybePunctuateSystemTime());
@@ -411,6 +432,8 @@ public class StreamTaskTest {
 
     @Test
     public void shouldNotPunctuateSystemTimeWhenIntervalNotElapsed() {
+        task = createStatelessTask(false);
+        task.initialize();
         long now = time.milliseconds();
         assertTrue(task.maybePunctuateSystemTime()); // first time we always punctuate
         time.sleep(9);
@@ -420,6 +443,8 @@ public class StreamTaskTest {
 
     @Test
     public void testCancelPunctuateSystemTime() {
+        task = createStatelessTask(false);
+        task.initialize();
         long now = time.milliseconds();
         time.sleep(10);
         assertTrue(task.maybePunctuateSystemTime());
@@ -429,92 +454,42 @@ public class StreamTaskTest {
         processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10);
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContext() {
-        final MockSourceNode processorNode = new MockSourceNode(topic1, intDeserializer, intDeserializer) {
-
-            @Override
-            public void process(final Object key, final Object value) {
-                throw new KafkaException("KABOOM!");
-            }
-        };
-
-        final List<ProcessorNode> processorNodes = Collections.<ProcessorNode>singletonList(processorNode);
-        final Map<String, SourceNode> sourceNodes = new HashMap() {
-            {
-                put(topic1[0], processorNode);
-                put(topic2[0], processorNode);
-            }
-        };
-        final ProcessorTopology topology = new ProcessorTopology(processorNodes,
-                                                                 sourceNodes,
-                                                                 Collections.<String, SinkNode>emptyMap(),
-                                                                 Collections.<StateStore>emptyList(),
-                                                                 Collections.<String, String>emptyMap(),
-                                                                 Collections.<StateStore>emptyList());
-
-        task.close(true, false);
-
-        task = new StreamTask(taskId00, partitions, topology, consumer, changelogReader, config,
-            streamsMetrics, stateDirectory, null, time, producer);
+        task = createTaskThatThrowsException();
         task.initialize();
-        final int offset = 20;
-        task.addRecords(partition1, Collections.singletonList(
-                new ConsumerRecord<>(partition1.topic(), partition1.partition(), offset, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
+        task.addRecords(partition2, Collections.singletonList(
+                new ConsumerRecord<>(partition2.topic(), partition2.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
 
         try {
             task.process();
             fail("Should've thrown StreamsException");
-        } catch (final StreamsException e) {
-            final String message = e.getMessage();
-            assertTrue("message=" + message + " should contain topic", message.contains("topic=" + topic1[0]));
-            assertTrue("message=" + message + " should contain partition", message.contains("partition=" + partition1.partition()));
-            assertTrue("message=" + message + " should contain offset", message.contains("offset=" + offset));
-            assertTrue("message=" + message + " should contain processor", message.contains("processor=" + processorNode.name()));
+        } catch (final Exception e) {
+            assertThat(((ProcessorContextImpl) task.processorContext()).currentNode(), nullValue());
         }
     }
 
-    @SuppressWarnings(value = {"unchecked", "deprecation"})
     @Test
-    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingDeprecated() {
-        final Processor processor = new AbstractProcessor() {
-            @Override
-            public void init(final ProcessorContext context) {
-                context.schedule(1);
-            }
-
-            @Override
-            public void process(final Object key, final Object value) {}
-
-            @Override
-            public void punctuate(final long timestamp) {
-                throw new KafkaException("KABOOM!");
-            }
-        };
-
-        final ProcessorNode punctuator = new ProcessorNode("test", processor, Collections.<String>emptySet());
-        punctuator.init(new NoOpProcessorContext());
+    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuating() {
+        task = createStatelessTask(false);
+        task.initialize();
 
         try {
-            task.punctuate(punctuator, 1, PunctuationType.STREAM_TIME, new Punctuator() {
+            task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, new Punctuator() {
                 @Override
                 public void punctuate(long timestamp) {
-                    processor.punctuate(timestamp);
+                    throw new KafkaException("KABOOM!");
                 }
             });
             fail("Should've thrown StreamsException");
         } catch (final StreamsException e) {
-            final String message = e.getMessage();
-            assertTrue("message=" + message + " should contain processor", message.contains("processor 'test'"));
             assertThat(((ProcessorContextImpl) task.processorContext()).currentNode(), nullValue());
         }
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingStreamTime() {
-        final Processor processor = new AbstractProcessor() {
+        final Processor<Object, Object> processor = new AbstractProcessor<Object, Object>() {
             @Override
             public void init(final ProcessorContext context) {
             }
@@ -526,9 +501,12 @@ public class StreamTaskTest {
             public void punctuate(final long timestamp) {}
         };
 
-        final ProcessorNode punctuator = new ProcessorNode("test", processor, Collections.<String>emptySet());
+        final ProcessorNode<Object, Object> punctuator = new ProcessorNode<>("test", processor, Collections.<String>emptySet());
         punctuator.init(new NoOpProcessorContext());
 
+        task = createStatelessTask(false);
+        task.initialize();
+
         try {
             task.punctuate(punctuator, 1, PunctuationType.STREAM_TIME, new Punctuator() {
                 @Override
@@ -538,8 +516,6 @@ public class StreamTaskTest {
             });
             fail("Should've thrown StreamsException");
         } catch (final StreamsException e) {
-            final String message = e.getMessage();
-            assertTrue("message=" + message + " should contain processor", message.contains("processor 'test'"));
             assertThat(((ProcessorContextImpl) task.processorContext()).currentNode(), nullValue());
         }
     }
@@ -565,136 +541,32 @@ public class StreamTaskTest {
         assertTrue(flushed.get());
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldCheckpointOffsetsOnCommit() throws IOException {
-        final String storeName = "test";
-        final String changelogTopic = ProcessorStateManager.storeChangelogTopic("appId", storeName);
-        final InMemoryKeyValueStore inMemoryStore = new InMemoryKeyValueStore(storeName, null, null) {
-            @Override
-            public void init(final ProcessorContext context, final StateStore root) {
-                context.register(root, false, null);
-            }
-
-            @Override
-            public boolean persistent() {
-                return true;
-            }
-        };
-        Map<String, SourceNode> sourceByTopics =  new HashMap() { {
-                put(partition1.topic(), source1);
-                put(partition2.topic(), source2);
-            }
-        };
-        final ProcessorTopology topology = new ProcessorTopology(Collections.<ProcessorNode>emptyList(),
-                                                                 sourceByTopics,
-                                                                 Collections.<String, SinkNode>emptyMap(),
-                                                                 Collections.<StateStore>singletonList(inMemoryStore),
-                                                                 Collections.singletonMap(storeName, changelogTopic),
-                                                                 Collections.<StateStore>emptyList());
-
-        final TopicPartition partition = new TopicPartition(changelogTopic, 0);
-
-        restoreStateConsumer.updatePartitions(changelogTopic,
-                                              Collections.singletonList(
-                                                      new PartitionInfo(changelogTopic, 0, null, new Node[0], new Node[0])));
-        restoreStateConsumer.updateEndOffsets(Collections.singletonMap(partition, 0L));
-        restoreStateConsumer.updateBeginningOffsets(Collections.singletonMap(partition, 0L));
-
-        final long offset = 543L;
-        final StreamTask streamTask = new StreamTask(taskId00, partitions, topology, consumer,
-            changelogReader, config, streamsMetrics, stateDirectory, null, time, producer) {
-
-            @Override
-            RecordCollector createRecordCollector(final LogContext logContext) {
-                return new NoOpRecordCollector() {
-                    @Override
-                    public Map<TopicPartition, Long> offsets() {
-
-                        return Collections.singletonMap(partition, offset);
-                    }
-                };
-            }
-        };
-        streamTask.initialize();
-
-        time.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));
-
-        streamTask.commit();
+        task = createStatefulTask(false, true);
+        task.initialize();
+        task.commit();
         final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId00),
                                                                           ProcessorStateManager.CHECKPOINT_FILE_NAME));
 
-        assertThat(checkpoint.read(), equalTo(Collections.singletonMap(partition, offset + 1)));
+        assertThat(checkpoint.read(), equalTo(Collections.singletonMap(changelogPartition, offset)));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldNotCheckpointOffsetsOnCommitIfEosIsEnabled() {
-        final Map<String, Object> properties = config.originals();
-        properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
-        final StreamsConfig testConfig = new StreamsConfig(properties);
-
-        final String storeName = "test";
-        final String changelogTopic = ProcessorStateManager.storeChangelogTopic("appId", storeName);
-        final InMemoryKeyValueStore inMemoryStore = new InMemoryKeyValueStore(storeName, null, null) {
-            @Override
-            public void init(final ProcessorContext context, final StateStore root) {
-                context.register(root, false, null);
-            }
-
-            @Override
-            public boolean persistent() {
-                return true;
-            }
-        };
-        Map<String, SourceNode> sourceByTopics =  new HashMap() {
-            {
-                put(partition1.topic(), source1);
-                put(partition2.topic(), source2);
-            }
-        };
-        final ProcessorTopology topology = new ProcessorTopology(Collections.<ProcessorNode>emptyList(),
-            sourceByTopics,
-            Collections.<String, SinkNode>emptyMap(),
-            Collections.<StateStore>singletonList(inMemoryStore),
-            Collections.singletonMap(storeName, changelogTopic),
-            Collections.<StateStore>emptyList());
-
-        final TopicPartition partition = new TopicPartition(changelogTopic, 0);
-
-        restoreStateConsumer.updatePartitions(changelogTopic,
-            Collections.singletonList(
-                new PartitionInfo(changelogTopic, 0, null, new Node[0], new Node[0])));
-        restoreStateConsumer.updateEndOffsets(Collections.singletonMap(partition, 0L));
-        restoreStateConsumer.updateBeginningOffsets(Collections.singletonMap(partition, 0L));
-
-        final long offset = 543L;
-        final StreamTask streamTask = new StreamTask(taskId00, partitions, topology, consumer,
-            changelogReader, testConfig, streamsMetrics, stateDirectory, null, time, producer) {
-
-            @Override
-            RecordCollector createRecordCollector(final LogContext logContext) {
-                return new NoOpRecordCollector() {
-                    @Override
-                    public Map<TopicPartition, Long> offsets() {
-
-                        return Collections.singletonMap(partition, offset);
-                    }
-                };
-            }
-        };
-
-        time.sleep(testConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));
-
-        streamTask.commit();
-
+        task = createStatefulTask(true, true);
+        task.initialize();
+        task.commit();
         final File checkpointFile = new File(stateDirectory.directoryForTask(taskId00),
                                              ProcessorStateManager.CHECKPOINT_FILE_NAME);
+
         assertFalse(checkpointFile.exists());
     }
 
     @Test
     public void shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled() {
+        task = createStatelessTask(false);
+        task.initialize();
         ((ProcessorContextImpl) task.processorContext()).setCurrentNode(processorStreamTime);
         try {
             task.punctuate(processorStreamTime, 10, PunctuationType.STREAM_TIME, punctuator);
@@ -706,6 +578,8 @@ public class StreamTaskTest {
 
     @Test
     public void shouldCallPunctuateOnPassedInProcessorNode() {
+        task = createStatelessTask(false);
+        task.initialize();
         task.punctuate(processorStreamTime, 5, PunctuationType.STREAM_TIME, punctuator);
         assertThat(punctuatedAt, equalTo(5L));
         task.punctuate(processorStreamTime, 10, PunctuationType.STREAM_TIME, punctuator);
@@ -714,12 +588,15 @@ public class StreamTaskTest {
 
     @Test
     public void shouldSetProcessorNodeOnContextBackToNullAfterSuccesfullPunctuate() {
+        task = createStatelessTask(false);
+        task.initialize();
         task.punctuate(processorStreamTime, 5, PunctuationType.STREAM_TIME, punctuator);
         assertThat(((ProcessorContextImpl) task.processorContext()).currentNode(), nullValue());
     }
 
     @Test(expected = IllegalStateException.class)
     public void shouldThrowIllegalStateExceptionOnScheduleIfCurrentNodeIsNull() {
+        task = createStatelessTask(false);
         task.schedule(1, PunctuationType.STREAM_TIME, new Punctuator() {
             @Override
             public void punctuate(long timestamp) {
@@ -730,6 +607,7 @@ public class StreamTaskTest {
 
     @Test
     public void shouldNotThrowExceptionOnScheduleIfCurrentNodeIsNotNull() {
+        task = createStatelessTask(false);
         ((ProcessorContextImpl) task.processorContext()).setCurrentNode(processorStreamTime);
         task.schedule(1, PunctuationType.STREAM_TIME, new Punctuator() {
             @Override
@@ -741,8 +619,7 @@ public class StreamTaskTest {
 
     @Test
     public void shouldThrowExceptionIfAnyExceptionsRaisedDuringCloseButStillCloseAllProcessorNodesTopology() {
-        task.close(true, false);
-        task = createTaskThatThrowsExceptionOnClose();
+        task = createTaskThatThrowsException();
         task.initialize();
         try {
             task.close(true, false);
@@ -750,16 +627,14 @@ public class StreamTaskTest {
         } catch (final RuntimeException e) {
             task = null;
         }
+        assertTrue(processorSystemTime.closed);
         assertTrue(processorStreamTime.closed);
         assertTrue(source1.closed);
-        assertTrue(source2.closed);
     }
 
     @Test
     public void shouldInitAndBeginTransactionOnCreateIfEosEnabled() {
-        final MockProducer producer = new MockProducer();
-        task = new StreamTask(taskId00, partitions, topology, consumer, changelogReader,
-            eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+        task = createStatelessTask(true);
 
         assertTrue(producer.transactionInitialized());
         assertTrue(producer.transactionInFlight());
@@ -767,9 +642,7 @@ public class StreamTaskTest {
 
     @Test
     public void shouldNotInitOrBeginTransactionOnCreateIfEosDisabled() {
-        final MockProducer producer = new MockProducer();
-        task = new StreamTask(taskId00, partitions, topology, consumer, changelogReader,
-            config, streamsMetrics, stateDirectory, null, time, producer);
+        task = createStatelessTask(false);
 
         assertFalse(producer.transactionInitialized());
         assertFalse(producer.transactionInFlight());
@@ -777,9 +650,7 @@ public class StreamTaskTest {
 
     @Test
     public void shouldSendOffsetsAndCommitTransactionButNotStartNewTransactionOnSuspendIfEosEnabled() {
-        final MockProducer producer = new MockProducer();
-        task = new StreamTask(taskId00, partitions, topology, consumer, changelogReader,
-            eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+        task = createStatelessTask(true);
 
         task.addRecords(partition1, Collections.singletonList(
             new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -793,26 +664,21 @@ public class StreamTaskTest {
 
     @Test
     public void shouldCommitTransactionOnSuspendEvenIfTransactionIsEmptyIfEosEnabled() {
-        final MockProducer producer = new MockProducer();
-        task = new StreamTask(taskId00, partitions, topology, consumer, changelogReader,
-            eosConfig, streamsMetrics, stateDirectory, null, time, producer);
-
+        task = createStatelessTask(true);
         task.suspend();
+
         assertTrue(producer.transactionCommitted());
         assertFalse(producer.transactionInFlight());
     }
 
     @Test
     public void shouldNotSendOffsetsAndCommitTransactionNorStartNewTransactionOnSuspendIfEosDisabled() {
-        final MockProducer producer = new MockProducer();
-        task = new StreamTask(taskId00, partitions, topology, consumer, changelogReader,
-            config, streamsMetrics, stateDirectory, null, time, producer);
-
+        task = createStatelessTask(false);
         task.addRecords(partition1, Collections.singletonList(
             new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
         task.process();
-
         task.suspend();
+
         assertFalse(producer.sentOffsets());
         assertFalse(producer.transactionCommitted());
         assertFalse(producer.transactionInFlight());
@@ -820,9 +686,7 @@ public class StreamTaskTest {
 
     @Test
     public void shouldStartNewTransactionOnResumeIfEosEnabled() {
-        final MockProducer producer = new MockProducer();
-        task = new StreamTask(taskId00, partitions, topology, consumer, changelogReader,
-            eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+        task = createStatelessTask(true);
 
         task.addRecords(partition1, Collections.singletonList(
             new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -835,9 +699,7 @@ public class StreamTaskTest {
 
     @Test
     public void shouldNotStartNewTransactionOnResumeIfEosDisabled() {
-        final MockProducer producer = new MockProducer();
-        task = new StreamTask(taskId00, partitions, topology, consumer, changelogReader,
-            config, streamsMetrics, stateDirectory, null, time, producer);
+        task = createStatelessTask(false);
 
         task.addRecords(partition1, Collections.singletonList(
             new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -850,9 +712,7 @@ public class StreamTaskTest {
 
     @Test
     public void shouldStartNewTransactionOnCommitIfEosEnabled() {
-        final MockProducer producer = new MockProducer();
-        task = new StreamTask(taskId00, partitions, topology, consumer, changelogReader,
-            eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+        task = createStatelessTask(true);
 
         task.addRecords(partition1, Collections.singletonList(
             new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -864,9 +724,7 @@ public class StreamTaskTest {
 
     @Test
     public void shouldNotStartNewTransactionOnCommitIfEosDisabled() {
-        final MockProducer producer = new MockProducer();
-        task = new StreamTask(taskId00, partitions, topology, consumer, changelogReader,
-            config, streamsMetrics, stateDirectory, null, time, producer);
+        task = createStatelessTask(false);
 
         task.addRecords(partition1, Collections.singletonList(
             new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -878,65 +736,44 @@ public class StreamTaskTest {
 
     @Test
     public void shouldAbortTransactionOnDirtyClosedIfEosEnabled() {
-        final MockProducer producer = new MockProducer();
-        task = new StreamTask(taskId00, partitions, topology, consumer, changelogReader,
-            eosConfig, streamsMetrics, stateDirectory, null, time, producer);
-
+        task = createStatelessTask(true);
         task.close(false, false);
         task = null;
+
         assertTrue(producer.transactionAborted());
     }
 
     @Test
     public void shouldNotAbortTransactionOnZombieClosedIfEosEnabled() throws Exception {
-        final MockProducer producer = new MockProducer();
-        task = new StreamTask(taskId00, partitions, topology, consumer, changelogReader,
-            eosConfig, streamsMetrics, stateDirectory, null, time, producer);
-
+        task = createStatelessTask(true);
         task.close(false, true);
         task = null;
+
         assertFalse(producer.transactionAborted());
     }
 
     @Test
     public void shouldNotAbortTransactionOnDirtyClosedIfEosDisabled() {
-        final MockProducer producer = new MockProducer();
-        task = new StreamTask(taskId00, partitions, topology, consumer, changelogReader,
-            config, streamsMetrics, stateDirectory, null, time, producer);
-
+        task = createStatelessTask(false);
         task.close(false, false);
+        task = null;
+
         assertFalse(producer.transactionAborted());
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldCloseProducerOnCloseWhenEosEnabled() {
-        final MockProducer producer = new MockProducer();
-
-        task = new StreamTask(taskId00, partitions, topology, consumer,
-            changelogReader, eosConfig, streamsMetrics, stateDirectory, null, time, producer);
-
+        task = createStatelessTask(true);
         task.close(true, false);
         task = null;
+
         assertTrue(producer.closed());
     }
 
-    @SuppressWarnings("unchecked")
     @Test
-    public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringFlushStateWhenCommitting() {
-        final MockProducer producer = new MockProducer();
-        final Consumer<byte[], byte[]> consumer = EasyMock.createStrictMock(Consumer.class);
-        EasyMock.expect(consumer.committed(EasyMock.anyObject(TopicPartition.class)))
-                .andStubReturn(new OffsetAndMetadata(1L));
-        EasyMock.replay(consumer);
-        final StreamTask task = new StreamTask(taskId00, partitions, topology, consumer,
-                              changelogReader, eosConfig, streamsMetrics, stateDirectory, null, time, producer) {
-
-            @Override
-            protected void flushState() {
-                throw new RuntimeException("KABOOM!");
-            }
-        };
+    public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringFlushing() {
+        task = createTaskThatThrowsException();
+        task.initialize();
 
         try {
             task.commit();
@@ -944,32 +781,11 @@ public class StreamTaskTest {
         } catch (Exception e) {
             // all good
         }
-        EasyMock.verify(consumer);
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringTaskSuspension() {
-        final MockProducer producer = new MockProducer();
-        final Consumer<byte[], byte[]> consumer = EasyMock.createStrictMock(Consumer.class);
-        EasyMock.expect(consumer.committed(EasyMock.anyObject(TopicPartition.class)))
-                .andStubReturn(new OffsetAndMetadata(1L));
-        EasyMock.replay(consumer);
-        MockSourceNode sourceNode = new MockSourceNode(topic1, intDeserializer, intDeserializer) {
-            @Override
-            public void close() {
-                throw new RuntimeException("KABOOM!");
-            }
-        };
-
-        final ProcessorTopology topology = new ProcessorTopology(Collections.<ProcessorNode>singletonList(sourceNode),
-                                                                 Collections.<String, SourceNode>singletonMap(topic1[0], sourceNode),
-                                                                 Collections.<String, SinkNode>emptyMap(),
-                                                                 Collections.<StateStore>emptyList(),
-                                                                 Collections.<String, String>emptyMap(),
-                                                                 Collections.<StateStore>emptyList());
-        final StreamTask task = new StreamTask(taskId00, Utils.mkSet(partition1), topology, consumer,
-                                               changelogReader, eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+        final StreamTask task = createTaskThatThrowsException();
 
         task.initialize();
         try {
@@ -978,41 +794,29 @@ public class StreamTaskTest {
         } catch (Exception e) {
             // all good
         }
-        EasyMock.verify(consumer);
     }
 
     @Test
     public void shouldCloseStateManagerIfFailureOnTaskClose() {
-        final AtomicBoolean stateManagerCloseCalled = new AtomicBoolean(false);
-        final StreamTask streamTask = new StreamTask(taskId00, partitions, topology, consumer,
-                                               changelogReader, eosConfig, streamsMetrics, stateDirectory, null,
-                                                     time, new MockProducer<byte[], byte[]>()) {
-
-            @Override
-            void suspend(boolean val) {
-                throw new RuntimeException("KABOOM!");
-            }
-
-            @Override
-            void closeStateManager(final boolean writeCheckpoint) throws ProcessorStateException {
-                stateManagerCloseCalled.set(true);
-            }
-        };
+        task = createStatefulTaskThatThrowsExceptionOnClose(true, false);
+        task.initialize();
 
         try {
-            streamTask.close(true, false);
+            task.close(true, false);
             fail("should have thrown an exception");
         } catch (Exception e) {
             // all good
         }
-        assertTrue(stateManagerCloseCalled.get());
+
+        task = null;
+        assertFalse(stateStore.isOpen());
     }
 
     @Test
     public void shouldNotCloseTopologyProcessorNodesIfNotInitialized() {
-        final StreamTask task = createTaskThatThrowsExceptionOnClose();
+        final StreamTask task = createTaskThatThrowsException();
         try {
-            task.close(true, false);
+            task.close(false, false);
         } catch (Exception e) {
             fail("should have not closed unitialized topology");
         }
@@ -1020,82 +824,129 @@ public class StreamTaskTest {
 
     @Test
     public void shouldBeInitializedIfChangelogPartitionsIsEmpty() {
-        final ProcessorTopology topology = new ProcessorTopology(Collections.<ProcessorNode>singletonList(source1),
-                                                                 Collections.<String, SourceNode>singletonMap(topic1[0], source1),
-                                                                 Collections.<String, SinkNode>emptyMap(),
-                                                                 Collections.<StateStore>singletonList(
-                                                                         new MockStateStoreSupplier.MockStateStore("store",
-                                                                                                                   false)),
-                                                                 Collections.<String, String>emptyMap(),
-                                                                 Collections.<StateStore>emptyList());
-
-
-        final StreamTask task = new StreamTask(taskId00,
-                                               Utils.mkSet(partition1),
-                                               topology,
-                                               consumer,
-                                               changelogReader,
-                                               config,
-                                               streamsMetrics,
-                                               stateDirectory,
-                                               null,
-                                               time,
-                                               producer);
+        final StreamTask task = createStatefulTask(false, false);
 
         assertTrue(task.initialize());
     }
 
     @Test
     public void shouldNotBeInitializedIfChangelogPartitionsIsNonEmpty() {
-        final ProcessorTopology topology = new ProcessorTopology(Collections.<ProcessorNode>singletonList(source1),
-                                                                 Collections.<String, SourceNode>singletonMap(topic1[0], source1),
-                                                                 Collections.<String, SinkNode>emptyMap(),
-                                                                 Collections.<StateStore>singletonList(
-                                                                         new MockStateStoreSupplier.MockStateStore("store",
-                                                                                                                   false)),
-                                                                 Collections.singletonMap("store", "changelog"),
-                                                                 Collections.<StateStore>emptyList());
-
-
-        final StreamTask task = new StreamTask(taskId00,
-                                               Utils.mkSet(partition1),
-                                               topology,
-                                               consumer,
-                                               changelogReader,
-                                               config,
-                                               streamsMetrics,
-                                               stateDirectory,
-                                               null,
-                                               time,
-                                               producer);
+        final StreamTask task = createStatefulTask(false, true);
 
         assertFalse(task.initialize());
     }
 
-    @SuppressWarnings("unchecked")
-    private StreamTask createTaskThatThrowsExceptionOnClose() {
-        final MockSourceNode processorNode = new MockSourceNode(topic1, intDeserializer, intDeserializer) {
+    @Test
+    public void shouldReturnOffsetsForRepartitionTopicsForPurging() {
+        final TopicPartition repartition = new TopicPartition("repartition", 1);
+        final ProcessorTopology topology = ProcessorTopology.withRepartitionTopics(
+                Utils.<ProcessorNode>mkList(source1, source2),
+                new HashMap<String, SourceNode>() {
+                    {
+                        put(topic1, source1);
+                        put(repartition.topic(), source2);
+                    }
+                },
+                Collections.singleton(repartition.topic())
+        );
+        consumer.assign(Arrays.asList(partition1, repartition));
+
+        task = new StreamTask(taskId00, Utils.mkSet(partition1, repartition), topology, consumer, changelogReader, config,
+                streamsMetrics, stateDirectory, null, time, producer);
+        task.initialize();
+
+        task.addRecords(partition1, Collections.singletonList(
+                new ConsumerRecord<>(partition1.topic(), partition1.partition(), 5L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
+        task.addRecords(repartition, Collections.singletonList(
+                new ConsumerRecord<>(repartition.topic(), repartition.partition(), 10L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
+
+        assertTrue(task.process());
+        assertTrue(task.process());
+
+        task.commit();
+
+        Map<TopicPartition, Long> map = task.purgableOffsets();
+
+        assertThat(map, equalTo(Collections.singletonMap(repartition, 11L)));
+    }
+
+    private StreamTask createStatefulTask(final boolean eosEnabled, final boolean logged) {
+        final ProcessorTopology topology = ProcessorTopology.with(
+                Utils.<ProcessorNode>mkList(source1, source2),
+                new HashMap<String, SourceNode>() {
+                    {
+                        put(topic1, source1);
+                        put(topic2, source2);
+                    }
+                },
+                Collections.singletonList(stateStore),
+                logged ? Collections.singletonMap(storeName, storeName + "-changelog") : Collections.<String, String>emptyMap());
+
+        return new StreamTask(taskId00, partitions, topology, consumer, changelogReader, eosEnabled ? eosConfig : config,
+                streamsMetrics, stateDirectory, null, time, producer);
+    }
+
+    private StreamTask createStatefulTaskThatThrowsExceptionOnClose(final boolean eosEnabled, final boolean logged) {
+        final ProcessorTopology topology = ProcessorTopology.with(
+                Utils.<ProcessorNode>mkList(source1, source3),
+                new HashMap<String, SourceNode>() {
+                    {
+                        put(topic1, source1);
+                        put(topic2, source3);
+                    }
+                },
+                Collections.singletonList(stateStore),
+                logged ? Collections.singletonMap(storeName, changelogPartition.topic()) : Collections.<String, String>emptyMap());
+
+        return new StreamTask(taskId00, partitions, topology, consumer, changelogReader, eosEnabled ? eosConfig : config,
+                streamsMetrics, stateDirectory, null, time, producer);
+    }
+
+    private StreamTask createStatelessTask(final boolean eosEnabled) {
+        final ProcessorTopology topology = ProcessorTopology.withSources(
+                Utils.<ProcessorNode>mkList(source1, source2, processorStreamTime, processorSystemTime),
+                new HashMap<String, SourceNode>() {
+                    {
+                        put(topic1, source1);
+                        put(topic2, source2);
+                    }
+                }
+        );
+
+        source1.addChild(processorStreamTime);
+        source2.addChild(processorStreamTime);
+        source1.addChild(processorSystemTime);
+        source2.addChild(processorSystemTime);
+
+        return new StreamTask(taskId00, partitions, topology, consumer, changelogReader, eosEnabled ? eosConfig : config,
+                streamsMetrics, stateDirectory, null, time, producer);
+    }
+
+    // this task will throw exception when processing (on partition2), flushing, suspending and closing
+    private StreamTask createTaskThatThrowsException() {
+        final ProcessorTopology topology = ProcessorTopology.withSources(
+                Utils.<ProcessorNode>mkList(source1, source3, processorStreamTime, processorSystemTime),
+                new HashMap<String, SourceNode>() {
+                    {
+                        put(topic1, source1);
+                        put(topic2, source3);
+                    }
+                }
+        );
+
+        source1.addChild(processorStreamTime);
+        source3.addChild(processorStreamTime);
+        source1.addChild(processorSystemTime);
+        source3.addChild(processorSystemTime);
+
+        return new StreamTask(taskId00, partitions, topology, consumer, changelogReader, config,
+            streamsMetrics, stateDirectory, null, time, producer) {
+
             @Override
-            public void close() {
+            protected void flushState() {
                 throw new RuntimeException("KABOOM!");
             }
         };
-        final List<ProcessorNode> processorNodes = Arrays.asList(processorNode, processorStreamTime, source1, source2);
-        final Map<String, SourceNode> sourceNodes = new HashMap() {
-            {
-                put(topic1[0], processorNode);
-                put(topic2[0], processorNode);
-            }
-        };
-        final ProcessorTopology topology = new ProcessorTopology(processorNodes,
-                                                                 sourceNodes,
-                                                                 Collections.<String, SinkNode>emptyMap(),
-                                                                 Collections.<StateStore>emptyList(),
-                                                                 Collections.<String, String>emptyMap(),
-                                                                 Collections.<StateStore>emptyList());
-
-        return new StreamTask(taskId00, partitions, topology, consumer, changelogReader, config,
-            streamsMetrics, stateDirectory, null, time, producer);
     }
 
     private Iterable<ConsumerRecord<byte[], byte[]>> records(final ConsumerRecord<byte[], byte[]>... recs) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b8a29f1/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index c3a372c..daf6fad 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -328,7 +328,7 @@ public class StreamThreadTest {
 
     @SuppressWarnings({"ThrowableNotThrown", "unchecked"})
     private TaskManager mockTaskManagerCommit(final Consumer<byte[], byte[]> consumer, final int numberOfCommits, final int commits) {
-        final TaskManager taskManager = EasyMock.createMock(TaskManager.class);
+        final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
         EasyMock.expect(taskManager.commitAll()).andReturn(commits).times(numberOfCommits);
         EasyMock.replay(taskManager, consumer);
         return taskManager;

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b8a29f1/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 55dcf79..37a683c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -17,8 +17,14 @@
 
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.DeleteRecordsResult;
+import org.apache.kafka.clients.admin.DeletedRecords;
+import org.apache.kafka.clients.admin.RecordsToDelete;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.TaskId;
@@ -84,6 +90,8 @@ public class TaskManagerTest {
     @Mock(type = MockType.NICE)
     private StreamsKafkaClient streamsKafkaClient;
     @Mock(type = MockType.NICE)
+    private AdminClient adminClient;
+    @Mock(type = MockType.NICE)
     private StreamTask streamTask;
     @Mock(type = MockType.NICE)
     private StandbyTask standbyTask;
@@ -121,6 +129,7 @@ public class TaskManagerTest {
                                       activeTaskCreator,
                                       standbyTaskCreator,
                                       streamsKafkaClient,
+                                      adminClient,
                                       active,
                                       standby);
         taskManager.setConsumer(consumer);
@@ -133,7 +142,8 @@ public class TaskManagerTest {
                         activeTaskCreator,
                         standbyTaskCreator,
                         active,
-                        standby);
+                        standby,
+                        adminClient);
     }
 
     @Test
@@ -543,6 +553,56 @@ public class TaskManagerTest {
     }
 
     @Test
+    public void shouldSendPurgeData() {
+        final KafkaFutureImpl<DeletedRecords> futureDeletedRecords = new KafkaFutureImpl<>();
+        final Map<TopicPartition, RecordsToDelete> recordsToDelete = Collections.singletonMap(t1p1, RecordsToDelete.beforeOffset(5L));
+        final DeleteRecordsResult deleteRecordsResult = new DeleteRecordsResult(Collections.singletonMap(t1p1, (KafkaFuture<DeletedRecords>) futureDeletedRecords));
+
+        futureDeletedRecords.complete(null);
+
+        EasyMock.expect(active.recordsToDelete()).andReturn(Collections.singletonMap(t1p1, 5L)).times(2);
+        EasyMock.expect(adminClient.deleteRecords(recordsToDelete)).andReturn(deleteRecordsResult).times(2);
+        replay();
+
+        taskManager.maybePurgeCommitedRecords();
+        taskManager.maybePurgeCommitedRecords();
+        verify(active, adminClient);
+    }
+
+    @Test
+    public void shouldNotSendPurgeDataIfPreviousNotDone() {
+        final KafkaFuture<DeletedRecords> futureDeletedRecords = new KafkaFutureImpl<>();
+        final Map<TopicPartition, RecordsToDelete> recordsToDelete = Collections.singletonMap(t1p1, RecordsToDelete.beforeOffset(5L));
+        final DeleteRecordsResult deleteRecordsResult = new DeleteRecordsResult(Collections.singletonMap(t1p1, futureDeletedRecords));
+
+        EasyMock.expect(active.recordsToDelete()).andReturn(Collections.singletonMap(t1p1, 5L)).once();
+        EasyMock.expect(adminClient.deleteRecords(recordsToDelete)).andReturn(deleteRecordsResult).once();
+        replay();
+
+        taskManager.maybePurgeCommitedRecords();
+        // second call should be no-op as the previous one is not done yet
+        taskManager.maybePurgeCommitedRecords();
+        verify(active, adminClient);
+    }
+
+    @Test
+    public void shouldIgnorePurgeDataErrors() {
+        final KafkaFutureImpl<DeletedRecords> futureDeletedRecords = new KafkaFutureImpl<>();
+        final Map<TopicPartition, RecordsToDelete> recordsToDelete = Collections.singletonMap(t1p1, RecordsToDelete.beforeOffset(5L));
+        final DeleteRecordsResult deleteRecordsResult = new DeleteRecordsResult(Collections.singletonMap(t1p1, (KafkaFuture<DeletedRecords>) futureDeletedRecords));
+
+        futureDeletedRecords.completeExceptionally(new Exception("KABOOM!"));
+
+        EasyMock.expect(active.recordsToDelete()).andReturn(Collections.singletonMap(t1p1, 5L)).times(2);
+        EasyMock.expect(adminClient.deleteRecords(recordsToDelete)).andReturn(deleteRecordsResult).times(2);
+        replay();
+
+        taskManager.maybePurgeCommitedRecords();
+        taskManager.maybePurgeCommitedRecords();
+        verify(active, adminClient);
+    }
+
+    @Test
     public void shouldMaybeCommitActiveTasks() {
         EasyMock.expect(active.maybeCommit()).andReturn(5);
         replay();

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b8a29f1/streams/src/test/java/org/apache/kafka/test/MockStateStore.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateStore.java b/streams/src/test/java/org/apache/kafka/test/MockStateStore.java
new file mode 100644
index 0000000..f218f04
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/MockStateStore.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+
+import java.util.ArrayList;
+
+public class MockStateStore implements StateStore {
+    private final String name;
+    private final boolean persistent;
+
+    public boolean initialized = false;
+    public boolean flushed = false;
+    public boolean closed = true;
+    public final ArrayList<Integer> keys = new ArrayList<>();
+
+    public MockStateStore(final String name,
+                          final boolean persistent) {
+        this.name = name;
+        this.persistent = persistent;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public void init(final ProcessorContext context,
+                     final StateStore root) {
+        context.register(root, false, stateRestoreCallback);
+        initialized = true;
+        closed = false;
+    }
+
+    @Override
+    public void flush() {
+        flushed = true;
+    }
+
+    @Override
+    public void close() {
+        closed = true;
+    }
+
+    @Override
+    public boolean persistent() {
+        return persistent;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return !closed;
+    }
+
+    public final StateRestoreCallback stateRestoreCallback = new StateRestoreCallback() {
+        private final Deserializer<Integer> deserializer = new IntegerDeserializer();
+
+        @Override
+        public void restore(final byte[] key,
+                            final byte[] value) {
+            keys.add(deserializer.deserialize("", key));
+        }
+    };
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b8a29f1/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
index 100a11c..042bb63 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
@@ -16,14 +16,9 @@
  */
 package org.apache.kafka.test;
 
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Map;
 
@@ -65,63 +60,4 @@ public class MockStateStoreSupplier implements StateStoreSupplier {
     public boolean loggingEnabled() {
         return loggingEnabled;
     }
-
-    public static class MockStateStore implements StateStore {
-        private final String name;
-        private final boolean persistent;
-
-        public boolean initialized = false;
-        public boolean flushed = false;
-        public boolean closed = true;
-        public final ArrayList<Integer> keys = new ArrayList<>();
-
-        public MockStateStore(final String name,
-                              final boolean persistent) {
-            this.name = name;
-            this.persistent = persistent;
-        }
-
-        @Override
-        public String name() {
-            return name;
-        }
-
-        @Override
-        public void init(final ProcessorContext context,
-                         final StateStore root) {
-            context.register(root, false, stateRestoreCallback);
-            initialized = true;
-            closed = false;
-        }
-
-        @Override
-        public void flush() {
-            flushed = true;
-        }
-
-        @Override
-        public void close() {
-            closed = true;
-        }
-
-        @Override
-        public boolean persistent() {
-            return persistent;
-        }
-
-        @Override
-        public boolean isOpen() {
-            return !closed;
-        }
-
-        public final StateRestoreCallback stateRestoreCallback = new StateRestoreCallback() {
-            private final Deserializer<Integer> deserializer = new IntegerDeserializer();
-
-            @Override
-            public void restore(final byte[] key,
-                                final byte[] value) {
-                keys.add(deserializer.deserialize("", key));
-            }
-        };
-    }
 }


Mime
View raw message