kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [3/3] kafka git commit: KAFKA-4923: Add Exactly-Once Semantics to Streams
Date Wed, 17 May 2017 00:23:16 GMT
KAFKA-4923: Add Exactly-Once Semantics to Streams

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Apurva Metha, Ismael Juma, Damian Guy, Eno Thereska, Guozhang Wang

Closes #2945 from mjsax/kafka-4923-add-eos-to-streams


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ebc7f7ca
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ebc7f7ca
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ebc7f7ca

Branch: refs/heads/trunk
Commit: ebc7f7caaeb47c9588d79a2f3ed496daa0bd39e5
Parents: 670193f
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Tue May 16 17:23:11 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue May 16 17:23:11 2017 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/MockConsumer.java    |  78 +--
 .../kafka/clients/producer/MockProducer.java    |  23 +
 .../kafka/common/config/AbstractConfig.java     |  16 +
 .../clients/producer/MockProducerTest.java      | 122 ++++
 .../org/apache/kafka/streams/StreamsConfig.java |  74 ++-
 .../processor/internals/AbstractTask.java       |  34 +-
 .../processor/internals/PartitionGroup.java     |  65 +-
 .../internals/ProcessorStateManager.java        |  17 +-
 .../processor/internals/RecordCollector.java    |  12 +
 .../internals/RecordCollectorImpl.java          |   9 +-
 .../processor/internals/StandbyContextImpl.java |  22 +-
 .../processor/internals/StandbyTask.java        |  15 +-
 .../streams/processor/internals/StreamTask.java | 158 +++--
 .../processor/internals/StreamThread.java       | 180 ++++--
 .../state/internals/OffsetCheckpoint.java       |  75 +--
 .../apache/kafka/streams/StreamsConfigTest.java | 171 ++++-
 .../processor/internals/AbstractTaskTest.java   |  13 +-
 .../processor/internals/PartitionGroupTest.java |  12 +-
 .../internals/ProcessorStateManagerTest.java    | 272 +++++---
 .../processor/internals/StreamTaskTest.java     | 289 ++++++---
 .../processor/internals/StreamThreadTest.java   | 645 +++++++++++++------
 .../StreamThreadStateStoreProviderTest.java     |  69 +-
 .../apache/kafka/test/MockClientSupplier.java   |  23 +-
 .../apache/kafka/test/NoOpRecordCollector.java  |   9 +-
 .../kafka/test/ProcessorTopologyTestDriver.java | 155 ++---
 25 files changed, 1812 insertions(+), 746 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index d81270a..91cb6f1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -75,29 +75,29 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
-    public Set<TopicPartition> assignment() {
+    public synchronized Set<TopicPartition> assignment() {
         return this.subscriptions.assignedPartitions();
     }
 
     /** Simulate a rebalance event. */
-    public void rebalance(Collection<TopicPartition> newAssignment) {
+    public synchronized void rebalance(Collection<TopicPartition> newAssignment) {
         // TODO: Rebalance callbacks
         this.records.clear();
         this.subscriptions.assignFromSubscribed(newAssignment);
     }
 
     @Override
-    public Set<String> subscription() {
+    public synchronized Set<String> subscription() {
         return this.subscriptions.subscription();
     }
 
     @Override
-    public void subscribe(Collection<String> topics) {
+    public synchronized void subscribe(Collection<String> topics) {
         subscribe(topics, new NoOpConsumerRebalanceListener());
     }
 
     @Override
-    public void subscribe(Pattern pattern, final ConsumerRebalanceListener listener) {
+    public synchronized void subscribe(Pattern pattern, final ConsumerRebalanceListener listener) {
         ensureNotClosed();
         this.subscriptions.subscribe(pattern, listener);
         Set<String> topicsToSubscribe = new HashSet<>();
@@ -111,25 +111,25 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
-    public void subscribe(Collection<String> topics, final ConsumerRebalanceListener listener) {
+    public synchronized void subscribe(Collection<String> topics, final ConsumerRebalanceListener listener) {
         ensureNotClosed();
         this.subscriptions.subscribe(new HashSet<>(topics), listener);
     }
 
     @Override
-    public void assign(Collection<TopicPartition> partitions) {
+    public synchronized void assign(Collection<TopicPartition> partitions) {
         ensureNotClosed();
         this.subscriptions.assignFromUser(new HashSet<>(partitions));
     }
 
     @Override
-    public void unsubscribe() {
+    public synchronized void unsubscribe() {
         ensureNotClosed();
         subscriptions.unsubscribe();
     }
 
     @Override
-    public ConsumerRecords<K, V> poll(long timeout) {
+    public synchronized ConsumerRecords<K, V> poll(long timeout) {
         ensureNotClosed();
 
         // Synchronize around the entire execution so new tasks to be triggered on subsequent poll calls can be added in
@@ -176,7 +176,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         return new ConsumerRecords<>(results);
     }
 
-    public void addRecord(ConsumerRecord<K, V> record) {
+    public synchronized void addRecord(ConsumerRecord<K, V> record) {
         ensureNotClosed();
         TopicPartition tp = new TopicPartition(record.topic(), record.partition());
         Set<TopicPartition> currentAssigned = new HashSet<>(this.subscriptions.assignedPartitions());
@@ -190,12 +190,12 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         recs.add(record);
     }
 
-    public void setException(KafkaException exception) {
+    public synchronized void setException(KafkaException exception) {
         this.exception = exception;
     }
 
     @Override
-    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
+    public synchronized void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
         ensureNotClosed();
         for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet())
             subscriptions.committed(entry.getKey(), entry.getValue());
@@ -205,34 +205,34 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
-    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
+    public synchronized void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
         commitAsync(offsets, null);
     }
 
     @Override
-    public void commitAsync() {
+    public synchronized void commitAsync() {
         commitAsync(null);
     }
 
     @Override
-    public void commitAsync(OffsetCommitCallback callback) {
+    public synchronized void commitAsync(OffsetCommitCallback callback) {
         ensureNotClosed();
         commitAsync(this.subscriptions.allConsumed(), callback);
     }
 
     @Override
-    public void commitSync() {
+    public synchronized void commitSync() {
         commitSync(this.subscriptions.allConsumed());
     }
 
     @Override
-    public void seek(TopicPartition partition, long offset) {
+    public synchronized void seek(TopicPartition partition, long offset) {
         ensureNotClosed();
         subscriptions.seek(partition, offset);
     }
 
     @Override
-    public OffsetAndMetadata committed(TopicPartition partition) {
+    public synchronized OffsetAndMetadata committed(TopicPartition partition) {
         ensureNotClosed();
         if (subscriptions.isAssigned(partition)) {
             return subscriptions.committed(partition);
@@ -241,7 +241,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
-    public long position(TopicPartition partition) {
+    public synchronized long position(TopicPartition partition) {
         ensureNotClosed();
         if (!this.subscriptions.isAssigned(partition))
             throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
@@ -254,52 +254,52 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
-    public void seekToBeginning(Collection<TopicPartition> partitions) {
+    public synchronized void seekToBeginning(Collection<TopicPartition> partitions) {
         ensureNotClosed();
         for (TopicPartition tp : partitions)
             subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);
     }
 
-    public void updateBeginningOffsets(Map<TopicPartition, Long> newOffsets) {
+    public synchronized void updateBeginningOffsets(Map<TopicPartition, Long> newOffsets) {
         beginningOffsets.putAll(newOffsets);
     }
 
     @Override
-    public void seekToEnd(Collection<TopicPartition> partitions) {
+    public synchronized void seekToEnd(Collection<TopicPartition> partitions) {
         ensureNotClosed();
         for (TopicPartition tp : partitions)
             subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
     }
 
-    public void updateEndOffsets(Map<TopicPartition, Long> newOffsets) {
+    public synchronized void updateEndOffsets(Map<TopicPartition, Long> newOffsets) {
         endOffsets.putAll(newOffsets);
     }
 
     @Override
-    public Map<MetricName, ? extends Metric> metrics() {
+    public synchronized Map<MetricName, ? extends Metric> metrics() {
         ensureNotClosed();
         return Collections.emptyMap();
     }
 
     @Override
-    public List<PartitionInfo> partitionsFor(String topic) {
+    public synchronized List<PartitionInfo> partitionsFor(String topic) {
         ensureNotClosed();
         return this.partitions.get(topic);
     }
 
     @Override
-    public Map<String, List<PartitionInfo>> listTopics() {
+    public synchronized Map<String, List<PartitionInfo>> listTopics() {
         ensureNotClosed();
         return partitions;
     }
 
-    public void updatePartitions(String topic, List<PartitionInfo> partitions) {
+    public synchronized void updatePartitions(String topic, List<PartitionInfo> partitions) {
         ensureNotClosed();
         this.partitions.put(topic, partitions);
     }
 
     @Override
-    public void pause(Collection<TopicPartition> partitions) {
+    public synchronized void pause(Collection<TopicPartition> partitions) {
         for (TopicPartition partition : partitions) {
             subscriptions.pause(partition);
             paused.add(partition);
@@ -307,7 +307,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
-    public void resume(Collection<TopicPartition> partitions) {
+    public synchronized void resume(Collection<TopicPartition> partitions) {
         for (TopicPartition partition : partitions) {
             subscriptions.resume(partition);
             paused.remove(partition);
@@ -315,12 +315,12 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
-    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
+    public synchronized Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     @Override
-    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
+    public synchronized Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
         Map<TopicPartition, Long> result = new HashMap<>();
         for (TopicPartition tp : partitions) {
             Long beginningOffset = beginningOffsets.get(tp);
@@ -332,7 +332,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
-    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
+    public synchronized Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
         Map<TopicPartition, Long> result = new HashMap<>();
         for (TopicPartition tp : partitions) {
             Long endOffset = endOffsets.get(tp);
@@ -344,22 +344,22 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
-    public void close() {
+    public synchronized void close() {
         close(KafkaConsumer.DEFAULT_CLOSE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
     }
 
     @Override
-    public void close(long timeout, TimeUnit unit) {
+    public synchronized void close(long timeout, TimeUnit unit) {
         ensureNotClosed();
         this.closed = true;
     }
 
-    public boolean closed() {
+    public synchronized boolean closed() {
         return this.closed;
     }
 
     @Override
-    public void wakeup() {
+    public synchronized void wakeup() {
         wakeup.set(true);
     }
 
@@ -368,13 +368,13 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
      * invocation. You can use this repeatedly to mock out multiple responses to poll invocations.
      * @param task the task to be executed
      */
-    public void schedulePollTask(Runnable task) {
+    public synchronized void schedulePollTask(Runnable task) {
         synchronized (pollTasks) {
             pollTasks.add(task);
         }
     }
 
-    public void scheduleNopPollTask() {
+    public synchronized void scheduleNopPollTask() {
         schedulePollTask(new Runnable() {
             @Override
             public void run() {
@@ -383,7 +383,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         });
     }
 
-    public Set<TopicPartition> paused() {
+    public synchronized Set<TopicPartition> paused() {
         return Collections.unmodifiableSet(new HashSet<>(paused));
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 15ea454..22fa755 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -37,6 +37,7 @@ import java.util.Deque;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
@@ -65,6 +66,8 @@ public class MockProducer<K, V> implements Producer<K, V> {
     private boolean transactionCommitted;
     private boolean transactionAborted;
     private boolean producerFenced;
+    private boolean sentOffsets;
+    private long commitCount = 0L;
 
     /**
      * Create a mock producer
@@ -148,6 +151,7 @@ public class MockProducer<K, V> implements Producer<K, V> {
         this.transactionInFlight = true;
         this.transactionCommitted = false;
         this.transactionAborted = false;
+        this.sentOffsets = false;
     }
 
     @Override
@@ -156,12 +160,17 @@ public class MockProducer<K, V> implements Producer<K, V> {
         verifyProducerState();
         verifyTransactionsInitialized();
         verifyNoTransactionInFlight();
+        Objects.requireNonNull(consumerGroupId);
+        if (offsets.size() == 0) {
+            return;
+        }
         Map<TopicPartition, OffsetAndMetadata> uncommittedOffsets = this.uncommittedConsumerGroupOffsets.get(consumerGroupId);
         if (uncommittedOffsets == null) {
             uncommittedOffsets = new HashMap<>();
             this.uncommittedConsumerGroupOffsets.put(consumerGroupId, uncommittedOffsets);
         }
         uncommittedOffsets.putAll(offsets);
+        this.sentOffsets = true;
     }
 
     @Override
@@ -182,6 +191,7 @@ public class MockProducer<K, V> implements Producer<K, V> {
         this.transactionAborted = false;
         this.transactionInFlight = false;
 
+        ++this.commitCount;
     }
 
     @Override
@@ -276,6 +286,7 @@ public class MockProducer<K, V> implements Producer<K, V> {
     }
 
     public synchronized void flush() {
+        verifyProducerState();
         while (!this.completions.isEmpty())
             completeNext();
     }
@@ -329,6 +340,18 @@ public class MockProducer<K, V> implements Producer<K, V> {
         return this.transactionAborted;
     }
 
+    public boolean flushed() {
+        return this.completions.isEmpty();
+    }
+
+    public boolean sentOffsets() {
+        return this.sentOffsets;
+    }
+
+    public long commitCount() {
+        return this.commitCount;
+    }
+
     /**
      * Get the list of sent records since the last call to {@link #clear()}
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index aa8cf0d..dc7fd7c 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -60,6 +60,11 @@ public class AbstractConfig {
                 throw new ConfigException(entry.getKey().toString(), entry.getValue(), "Key must be a string.");
         this.originals = (Map<String, ?>) originals;
         this.values = definition.parse(this.originals);
+        Map<String, Object> configUpdates = postProcessParsedConfig(Collections.unmodifiableMap(this.values));
+        for (Map.Entry<String, Object> update : configUpdates.entrySet()) {
+            this.values.put(update.getKey(), update.getValue());
+        }
+        definition.parse(this.values);
         this.used = Collections.synchronizedSet(new HashSet<String>());
         this.definition = definition;
         if (doLog)
@@ -70,6 +75,17 @@ public class AbstractConfig {
         this(definition, originals, true);
     }
 
+    /**
+     * Called directly after user configs got parsed (and thus default values got set).
+     * This allows to change default values for "secondary defaults" if required.
+     *
+     * @param parsedValues unmodifiable map of current configuration
+     * @return a map of updates that should be applied to the configuration (will be validated to prevent bad updates)
+     */
+    protected Map<String, Object> postProcessParsedConfig(Map<String, Object> parsedValues) {
+        return Collections.emptyMap();
+    }
+
     protected Object get(String key) {
         if (!values.containsKey(key))
             throw new ConfigException(String.format("Unknown configuration '%s'", key));

http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index 468ea49..eeb9b5f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -171,6 +171,28 @@ public class MockProducerTest {
         assertFalse(producer.transactionAborted());
     }
 
+    @Test
+    public void shouldCountCommittedTransaction() {
+        producer.initTransactions();
+        producer.beginTransaction();
+
+        assertThat(producer.commitCount(), equalTo(0L));
+        producer.commitTransaction();
+        assertThat(producer.commitCount(), equalTo(1L));
+    }
+
+    @Test
+    public void shouldNotCountAbortedTransaction() {
+        producer.initTransactions();
+
+        producer.beginTransaction();
+        producer.abortTransaction();
+
+        producer.beginTransaction();
+        producer.commitTransaction();
+        assertThat(producer.commitCount(), equalTo(1L));
+    }
+
     @Test(expected = IllegalStateException.class)
     public void shouldThrowOnAbortIfTransactionsNotInitialized() {
         producer.abortTransaction();
@@ -231,6 +253,16 @@ public class MockProducerTest {
     }
 
     @Test
+    public void shouldThrowOnFlushIfProducerGotFenced() {
+        producer.initTransactions();
+        producer.fenceProducer();
+        try {
+            producer.flush();
+            fail("Should have thrown as producer is fenced off");
+        } catch (ProducerFencedException e) { }
+    }
+
+    @Test
     public void shouldThrowOnSendOffsetsToTransactionIfProducerGotFenced() {
         producer.initTransactions();
         producer.fenceProducer();
@@ -377,6 +409,61 @@ public class MockProducerTest {
     }
 
     @Test
+    public void shouldThrowOnNullConsumerGroupIdWhenSendOffsetsToTransaction() {
+        producer.initTransactions();
+        producer.beginTransaction();
+
+        try {
+            producer.sendOffsetsToTransaction(Collections.<TopicPartition, OffsetAndMetadata>emptyMap(), null);
+            fail("Should have thrown NullPointerException");
+        } catch (NullPointerException e) { }
+    }
+
+    @Test
+    public void shouldIgnoreEmptyOffsetsWhenSendOffsetsToTransaction() {
+        producer.initTransactions();
+        producer.beginTransaction();
+        producer.sendOffsetsToTransaction(Collections.<TopicPartition, OffsetAndMetadata>emptyMap(), "groupId");
+        assertFalse(producer.sentOffsets());
+    }
+
+    @Test
+    public void shouldAddOffsetsWhenSendOffsetsToTransaction() {
+        producer.initTransactions();
+        producer.beginTransaction();
+
+        assertFalse(producer.sentOffsets());
+
+        Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<TopicPartition, OffsetAndMetadata>() {
+            {
+                put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L, null));
+            }
+        };
+        producer.sendOffsetsToTransaction(groupCommit, "groupId");
+        assertTrue(producer.sentOffsets());
+    }
+
+    @Test
+    public void shouldResetSentOffsetsFlagOnlyWhenBeginningNewTransaction() {
+        producer.initTransactions();
+        producer.beginTransaction();
+
+        assertFalse(producer.sentOffsets());
+
+        Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<TopicPartition, OffsetAndMetadata>() {
+            {
+                put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L, null));
+            }
+        };
+        producer.sendOffsetsToTransaction(groupCommit, "groupId");
+        producer.commitTransaction(); // commit should not reset "sentOffsets" flag
+        assertTrue(producer.sentOffsets());
+
+        producer.beginTransaction();
+        assertFalse(producer.sentOffsets());
+    }
+
+    @Test
     public void shouldPublishLatestAndCumulativeConsumerGroupOffsetsOnlyAfterCommitIfTransactionsAreEnabled() {
         producer.initTransactions();
         producer.beginTransaction();
@@ -528,6 +615,41 @@ public class MockProducerTest {
         } catch (IllegalStateException e) { }
     }
 
+    @Test
+    public void shouldThrowOnFlushProducerIfProducerIsClosed() {
+        producer.close();
+        try {
+            producer.flush();
+            fail("Should have thrown as producer is already closed");
+        } catch (IllegalStateException e) { }
+    }
+
+    @Test
+    public void shouldBeFlushedIfNoBufferedRecords() {
+        assertTrue(producer.flushed());
+    }
+
+    @Test
+    public void shouldBeFlushedWithAutoCompleteIfBufferedRecords() {
+        producer.send(record1);
+        assertTrue(producer.flushed());
+    }
+
+    @Test
+    public void shouldNotBeFlushedWithNoAutoCompleteIfBufferedRecords() {
+        MockProducer<byte[], byte[]> producer = new MockProducer<>(false, new MockSerializer(), new MockSerializer());
+        producer.send(record1);
+        assertFalse(producer.flushed());
+    }
+
+    @Test
+    public void shouldNotBeFlushedAfterFlush() {
+        MockProducer<byte[], byte[]> producer = new MockProducer<>(false, new MockSerializer(), new MockSerializer());
+        producer.send(record1);
+        producer.flush();
+        assertTrue(producer.flushed());
+    }
+
     private boolean isError(Future<?> future) {
         try {
             future.get();

http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index c1fd2d6..af9b8e7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -35,14 +35,18 @@ import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
 import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+import static org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED;
 
 /**
  * Configuration for a {@link KafkaStreams} instance.
@@ -78,8 +82,14 @@ import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
  */
 public class StreamsConfig extends AbstractConfig {
 
+    private final static Logger log = LoggerFactory.getLogger(StreamsConfig.class);
+
     private static final ConfigDef CONFIG;
 
+    private final boolean eosEnabled;
+    private final static long DEFAULT_COMMIT_INTERVAL_MS = 30000L;
+    private final static long EOS_DEFAULT_COMMIT_INTERVAL_MS = 100L;
+
     /**
      * Prefix used to isolate {@link KafkaConsumer consumer} configs from {@link KafkaProducer producer} configs.
      * It is recommended to use {@link #consumerPrefix(String)} to add this prefix to {@link ConsumerConfig consumer
@@ -129,7 +139,9 @@ public class StreamsConfig extends AbstractConfig {
 
     /** {@code commit.interval.ms} */
     public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms";
-    private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with which to save the position of the processor.";
+    private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with which to save the position of the processor." +
+        " (Note, if 'processing.guarantee' is set to '" + EXACTLY_ONCE + "', the default value is " + EOS_DEFAULT_COMMIT_INTERVAL_MS + "," +
+        " otherwise the default value is " + DEFAULT_COMMIT_INTERVAL_MS + ".";
 
     /** {@code connections.max.idle.ms} */
     public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
@@ -321,7 +333,7 @@ public class StreamsConfig extends AbstractConfig {
                         DEFAULT_VALUE_SERDE_CLASS_DOC)
             .define(COMMIT_INTERVAL_MS_CONFIG,
                     Type.LONG,
-                    30000,
+                    DEFAULT_COMMIT_INTERVAL_MS,
                     Importance.LOW,
                     COMMIT_INTERVAL_MS_DOC)
             .define(POLL_MS_CONFIG,
@@ -458,6 +470,16 @@ public class StreamsConfig extends AbstractConfig {
         PRODUCER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempProducerDefaultOverrides);
     }
 
+    private static final Map<String, Object> PRODUCER_EOS_OVERRIDES;
+    static {
+        final Map<String, Object> tempProducerDefaultOverrides = new HashMap<>(PRODUCER_DEFAULT_OVERRIDES);
+        tempProducerDefaultOverrides.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
+        tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+        tempProducerDefaultOverrides.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
+
+        PRODUCER_EOS_OVERRIDES = Collections.unmodifiableMap(tempProducerDefaultOverrides);
+    }
+
     private static final Map<String, Object> CONSUMER_DEFAULT_OVERRIDES;
     static {
         final Map<String, Object> tempConsumerDefaultOverrides = new HashMap<>();
@@ -475,6 +497,13 @@ public class StreamsConfig extends AbstractConfig {
         CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides);
     }
 
+    private static final Map<String, Object> CONSUMER_EOS_OVERRIDES;
+    static {
+        final Map<String, Object> tempConsumerDefaultOverrides = new HashMap<>(CONSUMER_DEFAULT_OVERRIDES);
+        tempConsumerDefaultOverrides.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, READ_COMMITTED.name().toLowerCase(Locale.ROOT));
+        CONSUMER_EOS_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides);
+    }
+
     public static class InternalConfig {
         public static final String STREAM_THREAD_INSTANCE = "__stream.thread.instance__";
     }
@@ -517,6 +546,21 @@ public class StreamsConfig extends AbstractConfig {
      */
     public StreamsConfig(final Map<?, ?> props) {
         super(CONFIG, props);
+        eosEnabled = EXACTLY_ONCE.equals(getString(PROCESSING_GUARANTEE_CONFIG));
+    }
+
+    @Override
+    protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
+        final Map<String, Object> configUpdates = new HashMap<>();
+
+        final boolean eosEnabled = EXACTLY_ONCE.equals(parsedValues.get(PROCESSING_GUARANTEE_CONFIG));
+        if (eosEnabled && !originals().containsKey(COMMIT_INTERVAL_MS_CONFIG)) {
+            log.debug("Using " + COMMIT_INTERVAL_MS_CONFIG + " default value of "
+                + EOS_DEFAULT_COMMIT_INTERVAL_MS + " as exactly once is enabled.");
+            configUpdates.put(COMMIT_INTERVAL_MS_CONFIG, EOS_DEFAULT_COMMIT_INTERVAL_MS);
+        }
+
+        return configUpdates;
     }
 
     private Map<String, Object> getCommonConsumerConfigs() throws ConfigException {
@@ -528,8 +572,14 @@ public class StreamsConfig extends AbstractConfig {
             throw new ConfigException("Unexpected user-specified consumer config " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
                 + ", as the streams client will always turn off auto committing.");
         }
+        if (eosEnabled) {
+            if (clientProvidedProps.containsKey(ConsumerConfig.ISOLATION_LEVEL_CONFIG)) {
+                throw new ConfigException("Unexpected user-specified consumer config " + ConsumerConfig.ISOLATION_LEVEL_CONFIG
+                    + "; because " + PROCESSING_GUARANTEE_CONFIG + " is set to '" + EXACTLY_ONCE + "' consumers will always read committed data only.");
+            }
+        }
 
-        final Map<String, Object> consumerProps = new HashMap<>(CONSUMER_DEFAULT_OVERRIDES);
+        final Map<String, Object> consumerProps = new HashMap<>(eosEnabled ? CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES);
         consumerProps.putAll(clientProvidedProps);
 
         // bootstrap.servers should be from StreamsConfig
@@ -604,9 +654,23 @@ public class StreamsConfig extends AbstractConfig {
      * @return Map of the producer configuration.
      */
     public Map<String, Object> getProducerConfigs(final String clientId) {
+        final Map<String, Object> clientProvidedProps = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());
+
+        if (eosEnabled) {
+            if (clientProvidedProps.containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)) {
+                throw new ConfigException("Unexpected user-specified consumer config " + ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG
+                    + "; because " + PROCESSING_GUARANTEE_CONFIG + " is set to '" + EXACTLY_ONCE + "' producer will always have idempotency enabled.");
+            }
+
+            if (clientProvidedProps.containsKey(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {
+                throw new ConfigException("Unexpected user-specified consumer config " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
+                    + "; because " + PROCESSING_GUARANTEE_CONFIG + " is set to '" + EXACTLY_ONCE + "' producer will always have only one in-flight request per connection.");
+            }
+        }
+
         // generate producer configs from original properties and overridden maps
-        final Map<String, Object> props = new HashMap<>(PRODUCER_DEFAULT_OVERRIDES);
-        props.putAll(getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames()));
+        final Map<String, Object> props = new HashMap<>(eosEnabled ? PRODUCER_EOS_OVERRIDES : PRODUCER_DEFAULT_OVERRIDES);
+        props.putAll(clientProvidedProps);
 
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, originals().get(BOOTSTRAP_SERVERS_CONFIG));
         // add client id with stream client id prefix

http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index d546118..d97f8f9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
@@ -40,15 +41,16 @@ import java.util.Set;
 public abstract class AbstractTask {
     private static final Logger log = LoggerFactory.getLogger(AbstractTask.class);
 
-    private final TaskId id;
-    protected final String applicationId;
-    protected final ProcessorTopology topology;
-    protected final Consumer consumer;
-    protected final ProcessorStateManager stateMgr;
-    protected final Set<TopicPartition> partitions;
+    final TaskId id;
+    final String applicationId;
+    final ProcessorTopology topology;
+    final Consumer consumer;
+    final ProcessorStateManager stateMgr;
+    final Set<TopicPartition> partitions;
     InternalProcessorContext processorContext;
-    protected final ThreadCache cache;
+    private final ThreadCache cache;
     final String logPrefix;
+    final boolean eosEnabled;
 
     /**
      * @throws ProcessorStateException if the state manager cannot be created
@@ -61,28 +63,38 @@ public abstract class AbstractTask {
                  final ChangelogReader changelogReader,
                  final boolean isStandby,
                  final StateDirectory stateDirectory,
-                 final ThreadCache cache) {
+                 final ThreadCache cache,
+                 final StreamsConfig config) {
         this.id = id;
         this.applicationId = applicationId;
         this.partitions = new HashSet<>(partitions);
         this.topology = topology;
         this.consumer = consumer;
         this.cache = cache;
+        eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
 
         logPrefix = String.format("%s [%s]", isStandby ? "standby-task" : "task", id());
 
         // create the processor state manager
         try {
-            stateMgr = new ProcessorStateManager(id, partitions, isStandby, stateDirectory, topology.storeToChangelogTopic(), changelogReader);
+            stateMgr = new ProcessorStateManager(
+                id,
+                partitions,
+                isStandby,
+                stateDirectory,
+                topology.storeToChangelogTopic(),
+                changelogReader,
+                eosEnabled);
         } catch (final IOException e) {
             throw new ProcessorStateException(String.format("%s Error while creating the state manager", logPrefix), e);
         }
     }
 
     public abstract void resume();
+
     public abstract void commit();
     public abstract void suspend();
-    public abstract void close();
+    public abstract void close(final boolean clean);
 
     public final TaskId id() {
         return id;
@@ -108,7 +120,6 @@ public abstract class AbstractTask {
         return cache;
     }
 
-
     public StateStore getStore(final String name) {
         return stateMgr.getStore(name);
     }
@@ -200,5 +211,4 @@ public abstract class AbstractTask {
         stateMgr.close(writeCheckpoint ? recordCollectorOffsets() : null);
     }
 
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index bae6f22..1e875b5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -36,7 +36,7 @@ public class PartitionGroup {
     private final PriorityQueue<RecordQueue> queuesByTime;
 
     public static class RecordInfo {
-        public RecordQueue queue;
+        RecordQueue queue;
 
         public ProcessorNode node() {
             return queue.source();
@@ -46,7 +46,7 @@ public class PartitionGroup {
             return queue.partition();
         }
 
-        public RecordQueue queue() {
+        RecordQueue queue() {
             return queue;
         }
     }
@@ -54,23 +54,27 @@ public class PartitionGroup {
     // since task is thread-safe, we do not need to synchronize on local variables
     private int totalBuffered;
 
-    public PartitionGroup(Map<TopicPartition, RecordQueue> partitionQueues) {
-        this.queuesByTime = new PriorityQueue<>(partitionQueues.size(), new Comparator<RecordQueue>() {
+    PartitionGroup(final Map<TopicPartition, RecordQueue> partitionQueues) {
+        queuesByTime = new PriorityQueue<>(partitionQueues.size(), new Comparator<RecordQueue>() {
 
             @Override
-            public int compare(RecordQueue queue1, RecordQueue queue2) {
-                long time1 = queue1.timestamp();
-                long time2 = queue2.timestamp();
-
-                if (time1 < time2) return -1;
-                if (time1 > time2) return 1;
+            public int compare(final RecordQueue queue1, final RecordQueue queue2) {
+                final long time1 = queue1.timestamp();
+                final long time2 = queue2.timestamp();
+
+                if (time1 < time2) {
+                    return -1;
+                }
+                if (time1 > time2) {
+                    return 1;
+                }
                 return 0;
             }
         });
 
         this.partitionQueues = partitionQueues;
 
-        this.totalBuffered = 0;
+        totalBuffered = 0;
     }
 
     /**
@@ -78,10 +82,10 @@ public class PartitionGroup {
      *
      * @return StampedRecord
      */
-    public StampedRecord nextRecord(RecordInfo info) {
+    StampedRecord nextRecord(final RecordInfo info) {
         StampedRecord record = null;
 
-        RecordQueue queue = queuesByTime.poll();
+        final RecordQueue queue = queuesByTime.poll();
         if (queue != null) {
             // get the first record from this queue.
             record = queue.poll();
@@ -92,7 +96,9 @@ public class PartitionGroup {
         }
         info.queue = queue;
 
-        if (record != null) totalBuffered--;
+        if (record != null) {
+            --totalBuffered;
+        }
 
         return record;
     }
@@ -104,11 +110,11 @@ public class PartitionGroup {
      * @param rawRecords  the raw records
      * @return the queue size for the partition
      */
-    public int addRawRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) {
-        RecordQueue recordQueue = partitionQueues.get(partition);
+    int addRawRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) {
+        final RecordQueue recordQueue = partitionQueues.get(partition);
 
-        int oldSize = recordQueue.size();
-        int newSize = recordQueue.addRawRecords(rawRecords);
+        final int oldSize = recordQueue.size();
+        final int newSize = recordQueue.addRawRecords(rawRecords);
 
         // add this record queue to be considered for processing in the future if it was empty before
         if (oldSize == 0 && newSize > 0) {
@@ -132,9 +138,10 @@ public class PartitionGroup {
         // we should always return the smallest timestamp of all partitions
         // to avoid group partition time goes backward
         long timestamp = Long.MAX_VALUE;
-        for (RecordQueue queue : partitionQueues.values()) {
-            if (timestamp > queue.timestamp())
+        for (final RecordQueue queue : partitionQueues.values()) {
+            if (timestamp > queue.timestamp()) {
                 timestamp = queue.timestamp();
+            }
         }
         return timestamp;
     }
@@ -142,21 +149,17 @@ public class PartitionGroup {
     /**
      * @throws IllegalStateException if the record's partition does not belong to this partition group
      */
-    public int numBuffered(TopicPartition partition) {
-        RecordQueue recordQueue = partitionQueues.get(partition);
+    int numBuffered(final TopicPartition partition) {
+        final RecordQueue recordQueue = partitionQueues.get(partition);
 
-        if (recordQueue == null)
+        if (recordQueue == null) {
             throw new IllegalStateException("Record's partition does not belong to this partition-group.");
+        }
 
         return recordQueue.size();
     }
 
-    public int topQueueSize() {
-        RecordQueue recordQueue = queuesByTime.peek();
-        return (recordQueue == null) ? 0 : recordQueue.size();
-    }
-
-    public int numBuffered() {
+    int numBuffered() {
         return totalBuffered;
     }
 
@@ -167,8 +170,8 @@ public class PartitionGroup {
 
     public void clear() {
         queuesByTime.clear();
-        for (RecordQueue queue : partitionQueues.values()) {
+        for (final RecordQueue queue : partitionQueues.values()) {
             queue.clear();
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 1a0e34a..d1bdf95 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -58,11 +58,12 @@ public class ProcessorStateManager implements StateManager {
     private final Map<TopicPartition, Long> checkpointedOffsets;
     private final Map<String, StateRestoreCallback> restoreCallbacks; // used for standby tasks, keyed by state topic name
     private final Map<String, String> storeToChangelogTopic;
+    private final boolean eosEnabled;
 
     // TODO: this map does not work with customized grouper where multiple partitions
     // of the same topic can be assigned to the same topic.
     private final Map<String, TopicPartition> partitionForTopic;
-    private final OffsetCheckpoint checkpoint;
+    private OffsetCheckpoint checkpoint;
 
     /**
      * @throws LockException if the state directory cannot be locked because another thread holds the lock
@@ -74,7 +75,8 @@ public class ProcessorStateManager implements StateManager {
                                  final boolean isStandby,
                                  final StateDirectory stateDirectory,
                                  final Map<String, String> storeToChangelogTopic,
-                                 final ChangelogReader changelogReader) throws LockException, IOException {
+                                 final ChangelogReader changelogReader,
+                                 final boolean eosEnabled) throws LockException, IOException {
         this.taskId = taskId;
         this.stateDirectory = stateDirectory;
         this.changelogReader = changelogReader;
@@ -91,6 +93,7 @@ public class ProcessorStateManager implements StateManager {
         this.isStandby = isStandby;
         restoreCallbacks = isStandby ? new HashMap<String, StateRestoreCallback>() : null;
         this.storeToChangelogTopic = storeToChangelogTopic;
+        this.eosEnabled = eosEnabled;
 
         if (!stateDirectory.lock(taskId, 5)) {
             throw new LockException(String.format("%s Failed to lock the state directory for task %s",
@@ -110,6 +113,12 @@ public class ProcessorStateManager implements StateManager {
         checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME));
         checkpointedOffsets = new HashMap<>(checkpoint.read());
 
+        if (eosEnabled) {
+            // delete the checkpoint file after finish loading its stored offsets
+            checkpoint.delete();
+            checkpoint = null;
+        }
+
         log.info("{} Created state store manager for task {} with the acquired state dir lock", logPrefix, taskId);
     }
 
@@ -325,6 +334,9 @@ public class ProcessorStateManager implements StateManager {
         }
         // write the checkpoint file before closing, to indicate clean shutdown
         try {
+            if (checkpoint == null) {
+                checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME));
+            }
             checkpoint.write(checkpointedOffsets);
         } catch (final IOException e) {
             log.warn("Failed to write checkpoint file to {}", new File(baseDir, CHECKPOINT_FILE_NAME), e);
@@ -333,7 +345,6 @@ public class ProcessorStateManager implements StateManager {
 
     private int getPartition(final String topic) {
         final TopicPartition partition = partitionForTopic.get(topic);
-
         return partition == null ? taskId.partition : partition.partition();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index 4516f8c..b083869 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.StreamPartitioner;
@@ -40,10 +41,21 @@ public interface RecordCollector {
                      final Serializer<V> valueSerializer,
                      final StreamPartitioner<? super K, ? super V> partitioner);
 
+    /**
+     * Flush the internal {@link Producer}.
+     */
     void flush();
 
+    /**
+     * Close the internal {@link Producer}.
+     */
     void close();
 
+    /**
+     * The last acked offsets from the internal {@link Producer}.
+     *
+     * @return the map from TopicPartition to offset
+     */
     Map<TopicPartition, Long> offsets();
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 9d2ac03..e1a86b4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -136,20 +136,13 @@ public class RecordCollectorImpl implements RecordCollector {
         checkForException();
     }
 
-    /**
-     * Closes this RecordCollector
-     */
     @Override
     public void close() {
+        log.debug("{} Closing producer", logPrefix);
         producer.close();
         checkForException();
     }
 
-    /**
-     * The last ack'd offset from the producer
-     *
-     * @return the map from TopicPartition to offset
-     */
     @Override
     public Map<TopicPartition, Long> offsets() {
         return offsets;

http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index d738a19..0791c67 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.ThreadCache;
+
 import java.util.Collections;
 import java.util.Map;
 
@@ -50,14 +51,10 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
                                 final StreamPartitioner<? super K, ? super V> partitioner) {}
 
         @Override
-        public void flush() {
-
-        }
+        public void flush() {}
 
         @Override
-        public void close() {
-
-        }
+        public void close() {}
 
         @Override
         public Map<TopicPartition, Long> offsets() {
@@ -65,11 +62,11 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
         }
     };
 
-    public StandbyContextImpl(final TaskId id,
-                              final String applicationId,
-                              final StreamsConfig config,
-                              final ProcessorStateManager stateMgr,
-                              final StreamsMetrics metrics) {
+    StandbyContextImpl(final TaskId id,
+                       final String applicationId,
+                       final StreamsConfig config,
+                       final ProcessorStateManager stateMgr,
+                       final StreamsMetrics metrics) {
         super(id, applicationId, config, metrics, stateMgr, new ThreadCache("zeroCache", 0, metrics));
     }
 
@@ -163,7 +160,6 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
         throw new UnsupportedOperationException("this should not happen: schedule() not supported in standby tasks.");
     }
 
-
     /**
      * @throws UnsupportedOperationException on every invocation
      */
@@ -180,7 +176,6 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
         throw new UnsupportedOperationException("this should not happen: setRecordContext not supported in standby tasks.");
     }
 
-
     @Override
     public void setCurrentNode(final ProcessorNode currentNode) {
         // no-op. can't throw as this is called on commit when the StateStores get flushed.
@@ -193,4 +188,5 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
     public ProcessorNode currentNode() {
         throw new UnsupportedOperationException("this should not happen: currentNode not supported in standby tasks.");
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index b09c8cd..b06b998 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -59,7 +59,7 @@ public class StandbyTask extends AbstractTask {
                 final StreamsConfig config,
                 final StreamsMetrics metrics,
                 final StateDirectory stateDirectory) {
-        super(id, applicationId, partitions, topology, consumer, changelogReader, true, stateDirectory, null);
+        super(id, applicationId, partitions, topology, consumer, changelogReader, true, stateDirectory, null, config);
 
         // initialize the topology with its own context
         processorContext = new StandbyContextImpl(id, applicationId, config, stateMgr, metrics);
@@ -110,8 +110,16 @@ public class StandbyTask extends AbstractTask {
         stateMgr.checkpoint(Collections.<TopicPartition, Long>emptyMap());
     }
 
+    /**
+     * <pre>
+     * - {@link #commit()}
+     * - close state
+     * <pre>
+     * @param clean ignored by {@code StandbyTask} as it can always try to close cleanly
+     *              (ie, commit, flush, and write checkpoint file)
+     */
     @Override
-    public void close() {
+    public void close(final boolean clean) {
         log.debug("{} Closing", logPrefix);
         boolean committedSuccessfully = false;
         try {
@@ -127,7 +135,8 @@ public class StandbyTask extends AbstractTask {
      *
      * @return a list of records not consumed
      */
-    public List<ConsumerRecord<byte[], byte[]>> update(final TopicPartition partition, final List<ConsumerRecord<byte[], byte[]>> records) {
+    public List<ConsumerRecord<byte[], byte[]>> update(final TopicPartition partition,
+                                                       final List<ConsumerRecord<byte[], byte[]>> records) {
         log.debug("{} Updating standby replicas of its state store for partition [{}]", logPrefix, partition);
         return stateMgr.updateStandbyStates(partition, records);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 568a5b4..731030d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -20,8 +20,10 @@ import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
@@ -56,8 +58,8 @@ public class StreamTask extends AbstractTask implements Punctuator {
 
     private final Map<TopicPartition, Long> consumedOffsets;
     private final RecordCollector recordCollector;
+    private final Producer<byte[], byte[]> producer;
     private final int maxBufferedSize;
-    private final boolean exactlyOnceEnabled;
 
     private boolean commitRequested = false;
     private boolean commitOffsetNeeded = false;
@@ -91,7 +93,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
      * @param config                the {@link StreamsConfig} specified by the user
      * @param metrics               the {@link StreamsMetrics} created by the thread
      * @param stateDirectory        the {@link StateDirectory} created by the thread
-     * @param recordCollector       the instance of {@link RecordCollector} used to produce records
+     * @param producer              the instance of {@link Producer} used to produce records
      */
     public StreamTask(final TaskId id,
                       final String applicationId,
@@ -104,11 +106,10 @@ public class StreamTask extends AbstractTask implements Punctuator {
                       final StateDirectory stateDirectory,
                       final ThreadCache cache,
                       final Time time,
-                      final RecordCollector recordCollector) {
-        super(id, applicationId, partitions, topology, consumer, changelogReader, false, stateDirectory, cache);
+                      final Producer<byte[], byte[]> producer) {
+        super(id, applicationId, partitions, topology, consumer, changelogReader, false, stateDirectory, cache, config);
         punctuationQueue = new PunctuationQueue();
         maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
-        exactlyOnceEnabled = config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG).equals(StreamsConfig.EXACTLY_ONCE);
         this.metrics = new TaskMetrics(metrics);
 
         // create queues for each assigned partition and associate them
@@ -128,8 +129,8 @@ public class StreamTask extends AbstractTask implements Punctuator {
         // initialize the consumed offset cache
         consumedOffsets = new HashMap<>();
 
-        // create the record recordCollector that maintains the produced offsets
-        this.recordCollector = recordCollector;
+        this.producer = producer;
+        recordCollector = createRecordCollector();
 
         // initialize the topology with its own context
         processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics, cache);
@@ -137,16 +138,26 @@ public class StreamTask extends AbstractTask implements Punctuator {
         log.debug("{} Initializing", logPrefix);
         initializeStateStores();
         stateMgr.registerGlobalStateStores(topology.globalStateStores());
+        if (eosEnabled) {
+            producer.initTransactions();
+            producer.beginTransaction();
+        }
         initTopology();
         processorContext.initialized();
     }
 
     /**
-     * re-initialize the task
+     * <pre>
+     * - re-initialize the task
+     * - if (eos) begin new transaction
+     * </pre>
      */
     @Override
     public void resume() {
         log.debug("{} Resuming", logPrefix);
+        if (eosEnabled) {
+            producer.beginTransaction();
+        }
         initTopology();
     }
 
@@ -229,13 +240,18 @@ public class StreamTask extends AbstractTask implements Punctuator {
 
     /**
      * <pre>
-     *  - flush state and producer
-     *  - write checkpoint
-     *  - commit offsets
+     * - flush state and producer
+     * - if(!eos) write checkpoint
+     * - commit offsets and start new transaction
      * </pre>
      */
     @Override
     public void commit() {
+        commitImpl(true);
+    }
+
+    // visible for testing
+    void commitImpl(final boolean startNewTransaction) {
         log.trace("{} Committing", logPrefix);
         metrics.metrics.measureLatencyNs(
             time,
@@ -243,8 +259,10 @@ public class StreamTask extends AbstractTask implements Punctuator {
                 @Override
                 public void run() {
                     flushState();
-                    stateMgr.checkpoint(recordCollectorOffsets());
-                    commitOffsets();
+                    if (!eosEnabled) {
+                        stateMgr.checkpoint(recordCollectorOffsets());
+                    }
+                    commitOffsets(startNewTransaction);
                 }
             },
             metrics.taskCommitTimeSensor);
@@ -262,7 +280,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
         recordCollector.flush();
     }
 
-    private void commitOffsets() {
+    private void commitOffsets(final boolean startNewTransaction) {
         if (commitOffsetNeeded) {
             log.debug("{} Committing offsets", logPrefix);
             final Map<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata = new HashMap<>(consumedOffsets.size());
@@ -272,11 +290,20 @@ public class StreamTask extends AbstractTask implements Punctuator {
                 consumedOffsetsAndMetadata.put(partition, new OffsetAndMetadata(offset));
                 stateMgr.putOffsetLimit(partition, offset);
             }
-            try {
-                consumer.commitSync(consumedOffsetsAndMetadata);
-            } catch (final CommitFailedException cfe) {
-                log.warn("{} Failed offset commits: {} ", logPrefix, consumedOffsetsAndMetadata);
-                throw cfe;
+
+            if (eosEnabled) {
+                producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, applicationId);
+                producer.commitTransaction();
+                if (startNewTransaction) {
+                    producer.beginTransaction();
+                }
+            } else {
+                try {
+                    consumer.commitSync(consumedOffsetsAndMetadata);
+                } catch (final CommitFailedException e) {
+                    log.warn("{} Failed offset commits {} due to {}", logPrefix, consumedOffsetsAndMetadata, e.getMessage());
+                    throw e;
+                }
             }
             commitOffsetNeeded = false;
         }
@@ -299,18 +326,33 @@ public class StreamTask extends AbstractTask implements Punctuator {
 
     /**
      * <pre>
-     *  - close topology
-     *  - {@link #commit()}
-     *    - flush state and producer
-     *    - write checkpoint
-     *    - commit offsets
+     * - close topology
+     * - {@link #commit()}
+     *   - flush state and producer
+     *   - if (!eos) write checkpoint
+     *   - commit offsets
      * </pre>
      */
     @Override
     public void suspend() {
+        suspend(true);
+    }
+
+    /**
+     * <pre>
+     * - close topology
+     * - if (clean) {@link #commit()}
+     *   - flush state and producer
+     *   - if (!eos) write checkpoint
+     *   - commit offsets
+     * </pre>
+     */
+    private void suspend(final boolean clean) {
         log.debug("{} Suspending", logPrefix);
-        closeTopology();
-        commit();
+        closeTopology(); // should we call this only on clean suspend?
+        if (clean) {
+            commitImpl(false);
+        }
     }
 
     private void closeTopology() {
@@ -339,28 +381,53 @@ public class StreamTask extends AbstractTask implements Punctuator {
 
     /**
      * <pre>
-     *  - {@link #suspend()}
-     *    - close topology
-     *    - {@link #commit()}
-     *      - flush state and producer
-     *      - write checkpoint
-     *      - commit offsets
-     *  - close state
+     * - {@link #suspend(boolean) suspend(clean)}
+     *   - close topology
+     *   - if (clean) {@link #commit()}
+     *     - flush state and producer
+     *     - commit offsets
+     * - close state
+     *   - if (clean) write checkpoint
+     * - if (eos) close producer
      * </pre>
+     * @param clean shut down cleanly (ie, incl. flush and commit) if {@code true} --
+     *              otherwise, just close open resources
      */
     @Override
-    public void close() {
+    public void close(boolean clean) {
         log.debug("{} Closing", logPrefix);
+
+        RuntimeException firstException = null;
+        try {
+            suspend(clean);
+        } catch (final RuntimeException e) {
+            clean = false;
+            firstException = e;
+            log.error("{} Could not close task due to {}", logPrefix, e);
+        }
+
+        try {
+            closeStateManager(clean);
+        } catch (final RuntimeException e) {
+            clean = false;
+            if (firstException == null) {
+                firstException = e;
+            }
+            log.error("{} Could not close state manager due to {}", logPrefix, e);
+        }
+
         try {
-            suspend();
-            closeStateManager(true);
             partitionGroup.close();
             metrics.removeAllSensors();
-        } catch (final RuntimeException e) {
-            closeStateManager(false);
-            throw e;
         } finally {
-            if (exactlyOnceEnabled) {
+            if (eosEnabled) {
+                if (!clean) {
+                    try {
+                        producer.abortTransaction();
+                    } catch (final ProducerFencedException e) {
+                        // can be ignored: transaction got already aborted by brokers/transactional-coordinator if this happens
+                    }
+                }
                 try {
                     recordCollector.close();
                 } catch (final Throwable e) {
@@ -368,6 +435,10 @@ public class StreamTask extends AbstractTask implements Punctuator {
                 }
             }
         }
+
+        if (firstException != null) {
+            throw firstException;
+        }
     }
 
     /**
@@ -450,9 +521,14 @@ public class StreamTask extends AbstractTask implements Punctuator {
         return processorContext;
     }
 
-    // for testing only
+    // visible for testing only
     RecordCollector recordCollector() {
         return recordCollector;
     }
 
+    // visible for testing only
+    RecordCollector createRecordCollector() {
+        return new RecordCollectorImpl(producer, id.toString());
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 7918196..f16e323 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -24,10 +24,12 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
@@ -305,6 +307,11 @@ public class StreamThread extends Thread {
         }
     }
 
+    interface StreamTaskAction {
+        String name();
+        void apply(final StreamTask task);
+    }
+
     /**
      * This class extends {@link StreamsMetricsImpl(Metrics, String, String, Map)} and
      * overrides one of its functions for efficiency
@@ -409,7 +416,7 @@ public class StreamThread extends Thread {
     private long lastCleanMs;
     private long lastCommitMs;
     private Throwable rebalanceException = null;
-    private final boolean exactlyOnceEnabled;
+    private final boolean eosEnabled;
 
     private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords;
     private boolean processStandbyRecords = false;
@@ -451,7 +458,7 @@ public class StreamThread extends Thread {
             log.warn("{} Negative cache size passed in thread. Reverting to cache size of 0 bytes.", logPrefix);
         }
         cache = new ThreadCache(threadClientId, cacheSizeBytes, streamsMetrics);
-        exactlyOnceEnabled = config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG).equals(StreamsConfig.EXACTLY_ONCE);
+        eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
 
 
         // set the consumer clients
@@ -507,8 +514,10 @@ public class StreamThread extends Thread {
     public void run() {
         log.info("{} Starting", logPrefix);
 
+        boolean cleanRun = false;
         try {
             runLoop();
+            cleanRun = true;
         } catch (final KafkaException e) {
             // just re-throw the exception as it should be logged already
             throw e;
@@ -518,7 +527,7 @@ public class StreamThread extends Thread {
             log.error("{} Streams application error during processing: {}", logPrefix, e);
             throw e;
         } finally {
-            shutdown();
+            shutdown(cleanRun);
         }
     }
 
@@ -568,7 +577,9 @@ public class StreamThread extends Thread {
         }
 
         if (rebalanceException != null) {
-            throw new StreamsException(logPrefix + " Failed to rebalance.", rebalanceException);
+            if (!(rebalanceException instanceof ProducerFencedException)) {
+                throw new StreamsException(logPrefix + " Failed to rebalance.", rebalanceException);
+            }
         }
 
         return records;
@@ -651,14 +662,22 @@ public class StreamThread extends Thread {
         // until no task has any records left
         do {
             totalProcessedEachRound = 0;
-            for (final StreamTask task : tasks.values()) {
-                // we processed one record,
-                // and more are buffered waiting for the next round
-                if (task.process()) {
-                    totalProcessedEachRound++;
-                    totalProcessedSinceLastMaybeCommit++;
+            final Iterator<Map.Entry<TaskId, StreamTask>> it = tasks.entrySet().iterator();
+            while (it.hasNext()) {
+                final StreamTask task = it.next().getValue();
+                try {
+                    // we processed one record,
+                    // if more are buffered waiting for the next round
+                    if (task.process()) {
+                        totalProcessedEachRound++;
+                        totalProcessedSinceLastMaybeCommit++;
+                    }
+                } catch (final ProducerFencedException e) {
+                    closeZombieTask(task);
+                    it.remove();
                 }
             }
+
             if (recordsProcessedBeforeCommit != UNLIMITED_RECORDS &&
                 totalProcessedSinceLastMaybeCommit >= recordsProcessedBeforeCommit) {
                 totalProcessedSinceLastMaybeCommit = 0;
@@ -670,11 +689,25 @@ public class StreamThread extends Thread {
         } while (totalProcessedEachRound != 0);
 
         // go over the tasks again to punctuate or commit
-        for (final StreamTask task : tasks.values()) {
-            maybePunctuate(task);
-            if (task.commitNeeded()) {
-                commitOne(task);
+        final RuntimeException e = performOnStreamTasks(new StreamTaskAction() {
+            private String name;
+            @Override
+            public String name() {
+                return name;
             }
+
+            @Override
+            public void apply(final StreamTask task) {
+                name = "punctuate";
+                maybePunctuate(task);
+                if (task.commitNeeded()) {
+                    name = "commit";
+                    commitOne(task);
+                }
+            }
+        });
+        if (e != null) {
+            throw e;
         }
 
         return totalProcessedSinceLastMaybeCommit;
@@ -745,9 +778,21 @@ public class StreamThread extends Thread {
      * Commit the states of all its tasks
      */
     private void commitAll() {
-        for (final StreamTask task : activeTasks.values()) {
-            commitOne(task);
+        final RuntimeException e = performOnStreamTasks(new StreamTaskAction() {
+            @Override
+            public String name() {
+                return "commit";
+            }
+
+            @Override
+            public void apply(final StreamTask task) {
+                commitOne(task);
+            }
+        });
+        if (e != null) {
+            throw e;
         }
+
         for (final StandbyTask task : standbyTasks.values()) {
             commitOne(task);
         }
@@ -762,10 +807,10 @@ public class StreamThread extends Thread {
             task.commit();
         } catch (final CommitFailedException e) {
             // commit failed. Just log it.
-            log.warn("{} Failed to commit {} {} state: {}", logPrefix, task.getClass().getSimpleName(), task.id(), e);
+            log.warn("{} Failed to commit {} {} state: ", logPrefix, task.getClass().getSimpleName(), task.id(), e);
         } catch (final KafkaException e) {
             // commit failed due to an unexpected exception. Log it and rethrow the exception.
-            log.error("{} Failed to commit {} {} state: {}", logPrefix, task.getClass().getSimpleName(), task.id(), e);
+            log.error("{} Failed to commit {} {} state: ", logPrefix, task.getClass().getSimpleName(), task.id(), e);
             throw e;
         }
 
@@ -983,9 +1028,9 @@ public class StreamThread extends Thread {
         this.partitionAssignor = partitionAssignor;
     }
 
-    private void shutdown() {
+    private void shutdown(final boolean cleanRun) {
         log.info("{} Shutting down", logPrefix);
-        shutdownTasksAndState();
+        shutdownTasksAndState(cleanRun);
 
         // close all embedded clients
         if (threadProducer != null) {
@@ -1022,7 +1067,7 @@ public class StreamThread extends Thread {
     }
 
     @SuppressWarnings("ThrowableNotThrown")
-    private void shutdownTasksAndState() {
+    private void shutdownTasksAndState(final boolean cleanRun) {
         log.debug("{} shutdownTasksAndState: shutting down" +
                 "active tasks {}, standby tasks {}, suspended tasks {}, and suspended standby tasks {}",
             logPrefix, activeTasks.keySet(), standbyTasks.keySet(),
@@ -1030,7 +1075,7 @@ public class StreamThread extends Thread {
 
         for (final AbstractTask task : allTasks()) {
             try {
-                task.close();
+                task.close(cleanRun);
             } catch (final RuntimeException e) {
                 log.error("{} Failed while closing {} {} due to {}: ",
                     logPrefix,
@@ -1054,7 +1099,19 @@ public class StreamThread extends Thread {
 
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
 
-        for (final AbstractTask task : activeAndStandbytasks()) {
+        firstException.compareAndSet(null, performOnStreamTasks(new StreamTaskAction() {
+            @Override
+            public String name() {
+                return "suspend";
+            }
+
+            @Override
+            public void apply(final StreamTask task) {
+                task.suspend();
+            }
+        }));
+
+        for (final StandbyTask task : standbyTasks.values()) {
             try {
                 task.suspend();
             } catch (final RuntimeException e) {
@@ -1131,7 +1188,7 @@ public class StreamThread extends Thread {
             if (!task.partitions().equals(assignedPartitionsForTask)) {
                 log.debug("{} Closing suspended non-assigned active task {}", logPrefix, task.id());
                 try {
-                    task.close();
+                    task.close(true);
                 } catch (final Exception e) {
                     log.error("{} Failed to remove suspended task {}: {}", logPrefix, next.getKey(), e);
                 } finally {
@@ -1150,7 +1207,7 @@ public class StreamThread extends Thread {
                 final StandbyTask task = suspendedTask.getValue();
                 log.debug("{} Closing suspended non-assigned standby task {}", logPrefix, task.id());
                 try {
-                    task.close();
+                    task.close(true);
                 } catch (final Exception e) {
                     log.error("{} Failed to remove suspended standby task {}: {}", logPrefix, task.id(), e);
                 } finally {
@@ -1165,27 +1222,32 @@ public class StreamThread extends Thread {
 
         streamsMetrics.taskCreatedSensor.record();
 
-        return new StreamTask(
-            id,
-            applicationId,
-            partitions,
-            builder.build(id.topicGroupId),
-            consumer,
-            storeChangelogReader,
-            config,
-            streamsMetrics,
-            stateDirectory,
-            cache,
-            time,
-            createRecordCollector(id));
+        try {
+            return new StreamTask(
+                id,
+                applicationId,
+                partitions,
+                builder.build(id.topicGroupId),
+                consumer,
+                storeChangelogReader,
+                config,
+                streamsMetrics,
+                stateDirectory,
+                cache,
+                time,
+                createProducer(id));
+        } finally {
+            log.info("{} Created active task {} with assigned partitions {}", logPrefix, id, partitions);
+        }
     }
 
-    private RecordCollector createRecordCollector(final TaskId id) {
+    private Producer<byte[], byte[]> createProducer(final TaskId id) {
         final Map<String, Object> producerConfigs = config.getProducerConfigs(threadClientId);
 
         final Producer<byte[], byte[]> producer;
-        if (exactlyOnceEnabled) {
+        if (eosEnabled) {
             log.info("{} Creating producer client for task {}", logPrefix, id);
+            producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, applicationId + "-" + id);
             producer = clientSupplier.getProducer(producerConfigs);
         } else {
             if (threadProducer == null) {
@@ -1195,7 +1257,7 @@ public class StreamThread extends Thread {
             producer = threadProducer;
         }
 
-        return new RecordCollectorImpl(producer, id.toString());
+        return producer;
     }
 
     private void addStreamTasks(final Collection<TopicPartition> assignment, final long start) {
@@ -1354,4 +1416,40 @@ public class StreamThread extends Thread {
         standbyRecords.clear();
     }
 
+    private void closeZombieTask(final StreamTask task) {
+        log.warn("{} Producer of task {} fenced; closing zombie task.", logPrefix, task.id);
+        try {
+            task.close(false);
+        } catch (final Exception f) {
+            log.warn("{} Failed to close zombie task: ", logPrefix, f);
+        }
+        activeTasks.remove(task.id);
+    }
+
+
+    private RuntimeException performOnStreamTasks(final StreamTaskAction action) {
+        RuntimeException firstException = null;
+        final Iterator<Map.Entry<TaskId, StreamTask>> it = activeTasks.entrySet().iterator();
+        while (it.hasNext()) {
+            final StreamTask task = it.next().getValue();
+            try {
+                action.apply(task);
+            } catch (final ProducerFencedException e) {
+                closeZombieTask(task);
+                it.remove();
+            } catch (final RuntimeException t) {
+                log.error("{} Failed to {} stream task {} due to: {}",
+                    logPrefix,
+                    action.name(),
+                    task.id(),
+                    t);
+                if (firstException == null) {
+                    firstException = t;
+                }
+            }
+        }
+
+        return firstException;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
index 54d2165..8c14737 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
@@ -57,27 +57,28 @@ public class OffsetCheckpoint {
     private final File file;
     private final Object lock;
 
-    public OffsetCheckpoint(File file) {
+    public OffsetCheckpoint(final File file) {
         this.file = file;
-        this.lock = new Object();
+        lock = new Object();
     }
 
     /**
      * @throws IOException if any file operation fails with an IO exception
      */
-    public void write(Map<TopicPartition, Long> offsets) throws IOException {
+    public void write(final Map<TopicPartition, Long> offsets) throws IOException {
         synchronized (lock) {
             // write to temp file and then swap with the existing file
-            File temp = new File(file.getAbsolutePath() + ".tmp");
+            final File temp = new File(file.getAbsolutePath() + ".tmp");
 
-            FileOutputStream fileOutputStream = new FileOutputStream(temp);
+            final FileOutputStream fileOutputStream = new FileOutputStream(temp);
             try (BufferedWriter writer = new BufferedWriter(
                     new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))) {
                 writeIntLine(writer, VERSION);
                 writeIntLine(writer, offsets.size());
 
-                for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet())
+                for (final Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
                     writeEntry(writer, entry.getKey(), entry.getValue());
+                }
 
                 writer.flush();
                 fileOutputStream.getFD().sync();
@@ -90,7 +91,8 @@ public class OffsetCheckpoint {
     /**
      * @throws IOException if file write operations failed with any IO exception
      */
-    private void writeIntLine(BufferedWriter writer, int number) throws IOException {
+    private void writeIntLine(final BufferedWriter writer,
+                              final int number) throws IOException {
         writer.write(Integer.toString(number));
         writer.newLine();
     }
@@ -98,7 +100,9 @@ public class OffsetCheckpoint {
     /**
      * @throws IOException if file write operations failed with any IO exception
      */
-    private void writeEntry(BufferedWriter writer, TopicPartition part, long offset) throws IOException {
+    private void writeEntry(final BufferedWriter writer,
+                            final TopicPartition part,
+                            final long offset) throws IOException {
         writer.write(part.topic());
         writer.write(' ');
         writer.write(Integer.toString(part.partition()));
@@ -114,43 +118,39 @@ public class OffsetCheckpoint {
      */
     public Map<TopicPartition, Long> read() throws IOException {
         synchronized (lock) {
-            BufferedReader reader;
-            try {
-                reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8));
-            } catch (FileNotFoundException e) {
-                return Collections.emptyMap();
-            }
+            try (BufferedReader reader
+                     = new BufferedReader(new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8))) {
 
-            try {
-                int version = readInt(reader);
+                final int version = readInt(reader);
                 switch (version) {
                     case 0:
-                        int expectedSize = readInt(reader);
-                        Map<TopicPartition, Long> offsets = new HashMap<>();
+                        final int expectedSize = readInt(reader);
+                        final Map<TopicPartition, Long> offsets = new HashMap<>();
                         String line = reader.readLine();
                         while (line != null) {
-                            String[] pieces = line.split("\\s+");
-                            if (pieces.length != 3)
-                                throw new IOException(String.format("Malformed line in offset checkpoint file: '%s'.",
-                                    line));
-
-                            String topic = pieces[0];
-                            int partition = Integer.parseInt(pieces[1]);
-                            long offset = Long.parseLong(pieces[2]);
+                            final String[] pieces = line.split("\\s+");
+                            if (pieces.length != 3) {
+                                throw new IOException(
+                                    String.format("Malformed line in offset checkpoint file: '%s'.", line));
+                            }
+
+                            final String topic = pieces[0];
+                            final int partition = Integer.parseInt(pieces[1]);
+                            final long offset = Long.parseLong(pieces[2]);
                             offsets.put(new TopicPartition(topic, partition), offset);
                             line = reader.readLine();
                         }
-                        if (offsets.size() != expectedSize)
-                            throw new IOException(String.format("Expected %d entries but found only %d",
-                                expectedSize,
-                                offsets.size()));
+                        if (offsets.size() != expectedSize) {
+                            throw new IOException(
+                                String.format("Expected %d entries but found only %d", expectedSize, offsets.size()));
+                        }
                         return offsets;
 
                     default:
                         throw new IllegalArgumentException("Unknown offset checkpoint version: " + version);
                 }
-            } finally {
-                reader.close();
+            } catch (final FileNotFoundException e) {
+                return Collections.emptyMap();
             }
         }
     }
@@ -158,10 +158,11 @@ public class OffsetCheckpoint {
     /**
      * @throws IOException if file read ended prematurely
      */
-    private int readInt(BufferedReader reader) throws IOException {
-        String line = reader.readLine();
-        if (line == null)
+    private int readInt(final BufferedReader reader) throws IOException {
+        final String line = reader.readLine();
+        if (line == null) {
             throw new EOFException("File ended prematurely.");
+        }
         return Integer.parseInt(line);
     }
 
@@ -169,12 +170,12 @@ public class OffsetCheckpoint {
      * @throws IOException if there is any IO exception during delete
      */
     public void delete() throws IOException {
-        Files.delete(file.toPath());
+        Files.deleteIfExists(file.toPath());
     }
 
     @Override
     public String toString() {
-        return this.file.getAbsolutePath();
+        return file.getAbsolutePath();
     }
 
 }


Mime
View raw message