kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: HOTFIX: handle commit failed exception on stream thread's suspend task
Date Tue, 01 Aug 2017 21:52:12 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 7e0ec9c76 -> 452e2eedb


HOTFIX: handle commit failed exception on stream thread's suspend task

1. Capture `CommitFailedException` in `StreamThread#suspendTasksAndState`.

2. Remove `Cache` from AbstractTask as it is not needed any more; remove not used cleanup
related variables from StreamThread (cc dguy to double check).

3.  Also fix log4j outputs for error and warn, such that for WARN we do not print stack trace,
and for ERROR we remove the dangling colon since the exception stack trace will start in newline.

4. Update one log4j entry to always print as WARN for errors closing a zombie task (cc mjsax
).

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Damian Guy <damian.guy@gmail.com>

Closes #3574 from guozhangwang/KHotfix-handle-commit-failed-exception-in-suspend

(cherry picked from commit 228a4fdb6dc15d1b8615615e00825e7ce53a41fa)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>

fix unit test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/452e2eed
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/452e2eed
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/452e2eed

Branch: refs/heads/0.11.0
Commit: 452e2eedb84b77c6f16085cf5b59d9cadb609149
Parents: 7e0ec9c
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Tue Aug 1 12:28:24 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Aug 1 14:51:52 2017 -0700

----------------------------------------------------------------------
 .../processor/internals/AbstractTask.java       | 15 ++---
 .../processor/internals/StandbyTask.java        |  2 +-
 .../streams/processor/internals/StreamTask.java | 39 ++++++------
 .../processor/internals/StreamThread.java       | 67 +++++++++++---------
 .../processor/internals/AbstractTaskTest.java   |  3 -
 .../processor/internals/StreamThreadTest.java   | 56 +++++++++++++++-
 6 files changed, 115 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/452e2eed/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 02129ab..8427e11 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -27,7 +27,6 @@ import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,14 +43,14 @@ public abstract class AbstractTask {
     final TaskId id;
     final String applicationId;
     final ProcessorTopology topology;
-    final Consumer consumer;
     final ProcessorStateManager stateMgr;
     final Set<TopicPartition> partitions;
-    InternalProcessorContext processorContext;
-    private final ThreadCache cache;
+    final Consumer consumer;
     final String logPrefix;
     final boolean eosEnabled;
 
+    InternalProcessorContext processorContext;
+
     /**
      * @throws ProcessorStateException if the state manager cannot be created
      */
@@ -63,15 +62,13 @@ public abstract class AbstractTask {
                  final ChangelogReader changelogReader,
                  final boolean isStandby,
                  final StateDirectory stateDirectory,
-                 final ThreadCache cache,
                  final StreamsConfig config) {
         this.id = id;
         this.applicationId = applicationId;
         this.partitions = new HashSet<>(partitions);
         this.topology = topology;
         this.consumer = consumer;
-        this.cache = cache;
-        eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
+        this.eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
 
         logPrefix = String.format("%s [%s]", isStandby ? "standby-task" : "task", id());
 
@@ -116,10 +113,6 @@ public abstract class AbstractTask {
         return processorContext;
     }
 
-    public final ThreadCache cache() {
-        return cache;
-    }
-
     public StateStore getStore(final String name) {
         return stateMgr.getStore(name);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/452e2eed/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 8d518ae..df3bea4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -59,7 +59,7 @@ public class StandbyTask extends AbstractTask {
                 final StreamsConfig config,
                 final StreamsMetrics metrics,
                 final StateDirectory stateDirectory) {
-        super(id, applicationId, partitions, topology, consumer, changelogReader, true, stateDirectory,
null, config);
+        super(id, applicationId, partitions, topology, consumer, changelogReader, true, stateDirectory,
config);
 
         // initialize the topology with its own context
         processorContext = new StandbyContextImpl(id, applicationId, config, stateMgr, metrics);

http://git-wip-us.apache.org/repos/asf/kafka/blob/452e2eed/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 71c9e69..c337911 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -108,7 +108,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
                       final ThreadCache cache,
                       final Time time,
                       final Producer<byte[], byte[]> producer) {
-        super(id, applicationId, partitions, topology, consumer, changelogReader, false,
stateDirectory, cache, config);
+        super(id, applicationId, partitions, topology, consumer, changelogReader, false,
stateDirectory, config);
         punctuationQueue = new PunctuationQueue();
         maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
         this.metrics = new TaskMetrics(metrics);
@@ -254,7 +254,6 @@ public class StreamTask extends AbstractTask implements Punctuator {
     @Override
     public void commit() {
         commit(true);
-
     }
 
     // visible for testing
@@ -312,7 +311,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
                 try {
                     consumer.commitSync(consumedOffsetsAndMetadata);
                 } catch (final CommitFailedException e) {
-                    log.warn("{} Failed offset commits {}: ", logPrefix, consumedOffsetsAndMetadata,
e);
+                    log.warn("{} Failed offset commits {} due to CommitFailedException",
logPrefix, consumedOffsetsAndMetadata);
                     throw e;
                 }
             }
@@ -401,7 +400,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
             if (firstException == null) {
                 firstException = e;
             }
-            log.error("{} Could not close state manager: ", logPrefix, e);
+            log.error("{} Could not close state manager due to the following error:", logPrefix,
e);
         }
 
         try {
@@ -420,7 +419,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
                 try {
                     recordCollector.close();
                 } catch (final Throwable e) {
-                    log.error("{} Failed to close producer: ", logPrefix, e);
+                    log.error("{} Failed to close producer due to the following error:",
logPrefix, e);
                 }
             }
         }
@@ -430,20 +429,20 @@ public class StreamTask extends AbstractTask implements Punctuator {
         }
     }
 
-        /**
-         * <pre>
-         * - {@link #suspend(boolean) suspend(clean)}
-         *   - close topology
-         *   - if (clean) {@link #commit()}
-         *     - flush state and producer
-         *     - commit offsets
-         * - close state
-         *   - if (clean) write checkpoint
-         * - if (eos) close producer
-         * </pre>
-         * @param clean shut down cleanly (ie, incl. flush and commit) if {@code true} --
-         *              otherwise, just close open resources
-         */
+    /**
+     * <pre>
+     * - {@link #suspend(boolean) suspend(clean)}
+     *   - close topology
+     *   - if (clean) {@link #commit()}
+     *     - flush state and producer
+     *     - commit offsets
+     * - close state
+     *   - if (clean) write checkpoint
+     * - if (eos) close producer
+     * </pre>
+     * @param clean shut down cleanly (ie, incl. flush and commit) if {@code true} --
+     *              otherwise, just close open resources
+     */
     @Override
     public void close(boolean clean) {
         log.debug("{} Closing", logPrefix);
@@ -454,7 +453,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
         } catch (final RuntimeException e) {
             clean = false;
             firstException = e;
-            log.error("{} Could not close task: ", logPrefix, e);
+            log.error("{} Could not close task due to the following error:", logPrefix, e);
         }
 
         closeSuspended(clean, firstException);

http://git-wip-us.apache.org/repos/asf/kafka/blob/452e2eed/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index db42061..9774627 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -257,7 +257,7 @@ public class StreamThread extends Thread {
                     } catch (final LockException e) {
                         // ignore and retry
                         if (!retryingTasks.contains(taskId)) {
-                            log.warn("{} Could not create task {}. Will retry: ", logPrefix,
taskId, e);
+                            log.warn("{} Could not create task {} due to {}; will retry",
logPrefix, taskId, e);
                             retryingTasks.add(taskId);
                         }
                     }
@@ -455,7 +455,7 @@ public class StreamThread extends Thread {
         streamsMetrics = new StreamsMetricsThreadImpl(metrics, "stream-metrics", "thread."
+ threadClientId,
             Collections.singletonMap("client-id", threadClientId));
         if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) < 0) {
-            log.warn("{} Negative cache size passed in thread. Reverting to cache size of
0 bytes.", logPrefix);
+            log.warn("{} Negative cache size passed in thread. Reverting to cache size of
0 bytes", logPrefix);
         }
         cache = new ThreadCache(threadClientId, cacheSizeBytes, streamsMetrics);
         eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
@@ -501,7 +501,6 @@ public class StreamThread extends Thread {
         lastCommitMs = timerStartedMs;
         rebalanceListener = new RebalanceListener(time, config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
         setState(State.RUNNING);
-
     }
 
     /**
@@ -524,7 +523,7 @@ public class StreamThread extends Thread {
         } catch (final Exception e) {
             // we have caught all Kafka related exceptions, and other runtime exceptions
             // should be due to user application errors
-            log.error("{} Streams application error during processing: ", logPrefix, e);
+            log.error("{} Encountered the following error during processing:", logPrefix,
e);
             throw e;
         } finally {
             shutdown(cleanRun);
@@ -668,6 +667,9 @@ public class StreamThread extends Thread {
                 try {
                     // we processed one record,
                     // if more are buffered waiting for the next round
+
+                    // TODO: We should check for stream time punctuation right after each
process call
+                    //       of the task instead of only calling it after all records being
processed
                     if (task.process()) {
                         totalProcessedEachRound++;
                         totalProcessedSinceLastMaybeCommit++;
@@ -714,6 +716,7 @@ public class StreamThread extends Thread {
                 }
             }
         });
+
         if (e != null) {
             throw e;
         }
@@ -729,7 +732,7 @@ public class StreamThread extends Thread {
                 streamsMetrics.punctuateTimeSensor.record(computeLatency(), timerStartedMs);
             }
         } catch (final KafkaException e) {
-            log.error("{} Failed to punctuate active task {}: ", logPrefix, task.id(), e);
+            log.error("{} Failed to punctuate active task {} due to the following error:",
logPrefix, task.id(), e);
             throw e;
         }
     }
@@ -819,11 +822,11 @@ public class StreamThread extends Thread {
         try {
             task.commit();
         } catch (final CommitFailedException e) {
-            // commit failed. Just log it.
-            log.warn("{} Failed to commit {} {} state: ", logPrefix, task.getClass().getSimpleName(),
task.id(), e);
+            // commit failed. This is already logged inside the task as WARN and we can just
log it again here.
+            log.warn("{} Failed to commit {} {} state due to CommitFailedException; this
task may be no longer owned by the thread", logPrefix, task.getClass().getSimpleName(), task.id());
         } catch (final KafkaException e) {
             // commit failed due to an unexpected exception. Log it and rethrow the exception.
-            log.error("{} Failed to commit {} {} state: ", logPrefix, task.getClass().getSimpleName(),
task.id(), e);
+            log.error("{} Failed to commit {} {} state due to the following error:", logPrefix,
task.getClass().getSimpleName(), task.id(), e);
             throw e;
         }
 
@@ -1052,23 +1055,23 @@ public class StreamThread extends Thread {
             try {
                 threadProducer.close();
             } catch (final Throwable e) {
-                log.error("{} Failed to close producer: ", logPrefix, e);
+                log.error("{} Failed to close producer due to the following error:", logPrefix,
e);
             }
         }
         try {
             consumer.close();
         } catch (final Throwable e) {
-            log.error("{} Failed to close consumer: ", logPrefix, e);
+            log.error("{} Failed to close consumer due to the following error:", logPrefix,
e);
         }
         try {
             restoreConsumer.close();
         } catch (final Throwable e) {
-            log.error("{} Failed to close restore consumer: ", logPrefix, e);
+            log.error("{} Failed to close restore consumer due to the following error:",
logPrefix, e);
         }
         try {
             partitionAssignor.close();
         } catch (final Throwable e) {
-            log.error("{} Failed to close KafkaStreamClient: ", logPrefix, e);
+            log.error("{} Failed to close KafkaStreamClient due to the following error:",
logPrefix, e);
         }
 
         removeStreamTasks();
@@ -1091,7 +1094,7 @@ public class StreamThread extends Thread {
             try {
                 task.close(cleanRun);
             } catch (final RuntimeException e) {
-                log.error("{} Failed while closing {} {}: ",
+                log.error("{} Failed while closing {} {} due to the following error:",
                     logPrefix,
                     task.getClass().getSimpleName(),
                     task.id(),
@@ -1123,11 +1126,15 @@ public class StreamThread extends Thread {
             public void apply(final StreamTask task) {
                 try {
                     task.suspend();
+                } catch (final CommitFailedException e) {
+                    // commit failed during suspension. Just log it.
+                    log.warn("{} Failed to commit task {} state when suspending due to CommitFailedException",
logPrefix, task.id);
                 } catch (final Exception e) {
+                    log.error("{} Suspending task {} failed due to the following error:",
logPrefix, task.id, e);
                     try {
                         task.close(false);
                     } catch (final Exception f) {
-                        log.error("{} Closing task {} failed: ", logPrefix, task.id, f);
+                        log.error("{} After suspending failed, closing the same task {} failed
again due to the following error:", logPrefix, task.id, f);
                     }
                     throw e;
                 }
@@ -1139,10 +1146,11 @@ public class StreamThread extends Thread {
                 try {
                     task.suspend();
                 } catch (final Exception e) {
+                    log.error("{} Suspending standby task {} failed due to the following
error:", logPrefix, task.id, e);
                     try {
                         task.close(false);
                     } catch (final Exception f) {
-                        log.error("{} Closing standby task {} failed: ", logPrefix, task.id,
f);
+                        log.error("{} After suspending failed, closing the same standby task
{} failed again due to the following error:", logPrefix, task.id, f);
                     }
                     throw e;
                 }
@@ -1166,7 +1174,7 @@ public class StreamThread extends Thread {
             // un-assign the change log partitions
             restoreConsumer.assign(Collections.<TopicPartition>emptyList());
         } catch (final RuntimeException e) {
-            log.error("{} Failed to un-assign change log partitions: ", logPrefix, e);
+            log.error("{} Failed to un-assign change log partitions due to the following
error:", logPrefix, e);
             return e;
         }
         return null;
@@ -1193,7 +1201,7 @@ public class StreamThread extends Thread {
     private StreamTask findMatchingSuspendedTask(final TaskId taskId, final Set<TopicPartition>
partitions) {
         if (suspendedTasks.containsKey(taskId)) {
             final StreamTask task = suspendedTasks.get(taskId);
-            if (task.partitions.equals(partitions)) {
+            if (task.partitions().equals(partitions)) {
                 return task;
             }
         }
@@ -1203,7 +1211,7 @@ public class StreamThread extends Thread {
     private StandbyTask findMatchingSuspendedStandbyTask(final TaskId taskId, final Set<TopicPartition>
partitions) {
         if (suspendedStandbyTasks.containsKey(taskId)) {
             final StandbyTask task = suspendedStandbyTasks.get(taskId);
-            if (task.partitions.equals(partitions)) {
+            if (task.partitions().equals(partitions)) {
                 return task;
             }
         }
@@ -1218,11 +1226,11 @@ public class StreamThread extends Thread {
             final StreamTask task = next.getValue();
             final Set<TopicPartition> assignedPartitionsForTask = newTaskAssignment.get(next.getKey());
             if (!task.partitions().equals(assignedPartitionsForTask)) {
-                log.debug("{} Closing suspended non-assigned active task {}", logPrefix,
task.id());
+                log.debug("{} Closing suspended and not re-assigned task {}", logPrefix,
task.id());
                 try {
                     task.closeSuspended(true, null);
                 } catch (final Exception e) {
-                    log.error("{} Failed to remove suspended task {}: ", logPrefix, next.getKey(),
e);
+                    log.error("{} Failed to close suspended task {} due to the following
error:", logPrefix, next.getKey(), e);
                 } finally {
                     suspendedTaskIterator.remove();
                 }
@@ -1237,11 +1245,11 @@ public class StreamThread extends Thread {
             final Map.Entry<TaskId, StandbyTask> suspendedTask = standByTaskIterator.next();
             if (!currentSuspendedTaskIds.contains(suspendedTask.getKey())) {
                 final StandbyTask task = suspendedTask.getValue();
-                log.debug("{} Closing suspended non-assigned standby task {}", logPrefix,
task.id());
+                log.debug("{} Closing suspended and not re-assigned standby task {}", logPrefix,
task.id());
                 try {
                     task.close(true);
                 } catch (final Exception e) {
-                    log.error("{} Failed to remove suspended standby task {}: ", logPrefix,
task.id(), e);
+                    log.error("{} Failed to remove suspended standby task {} due to the following
error:", logPrefix, task.id(), e);
                 } finally {
                     standByTaskIterator.remove();
                 }
@@ -1321,7 +1329,7 @@ public class StreamThread extends Thread {
                         newTasks.put(taskId, partitions);
                     }
                 } catch (final StreamsException e) {
-                    log.error("{} Failed to create an active task {}: ", logPrefix, taskId,
e);
+                    log.error("{} Failed to create an active task {} due to the following
error:", logPrefix, taskId, e);
                     throw e;
                 }
             } else {
@@ -1434,7 +1442,7 @@ public class StreamThread extends Thread {
             activeTasks.clear();
             activeTasksByPartition.clear();
         } catch (final Exception e) {
-            log.error("{} Failed to remove stream tasks: ", logPrefix, e);
+            log.error("{} Failed to remove stream tasks due to the following error:", logPrefix,
e);
         }
     }
 
@@ -1447,14 +1455,11 @@ public class StreamThread extends Thread {
     }
 
     private void closeZombieTask(final StreamTask task) {
-        log.warn("{} Producer of task {} fenced; closing zombie task.", logPrefix, task.id);
+        log.warn("{} Producer of task {} fenced; closing zombie task", logPrefix, task.id);
         try {
             task.close(false);
-        } catch (final Exception f) {
-            if (!log.isDebugEnabled() && !log.isTraceEnabled()) {
-                log.warn("{} Failed to close zombie task: {}", logPrefix, f.getMessage());
-            }
-            log.debug("{} Failed to close zombie task: ", logPrefix, f);
+        } catch (final Exception e) {
+            log.warn("{} Failed to close zombie task due to {}, ignore and proceed", logPrefix,
e);
         }
         activeTasks.remove(task.id);
     }
@@ -1471,7 +1476,7 @@ public class StreamThread extends Thread {
                 closeZombieTask(task);
                 it.remove();
             } catch (final RuntimeException t) {
-                log.error("{} Failed to {} stream task {}: ",
+                log.error("{} Failed to {} stream task {} due to the following error:",
                     logPrefix,
                     action.name(),
                     task.id(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/452e2eed/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index ba3230a..123cbf0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -24,14 +24,12 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 
@@ -80,7 +78,6 @@ public class AbstractTaskTest {
                                 new StoreChangelogReader(consumer, Time.SYSTEM, 5000),
                                 false,
                                 new StateDirectory("app", TestUtils.tempDirectory().getPath(),
time),
-                                new ThreadCache("testCache", 0, new MockStreamsMetrics(new
Metrics())),
                                 config) {
             @Override
             public void resume() {}

http://git-wip-us.apache.org/repos/asf/kafka/blob/452e2eed/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index a0882cf..d8d2e4f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -93,7 +94,6 @@ public class StreamThreadTest {
     private final StreamsConfig config = new StreamsConfig(configProps(false));
 
 
-
     @Before
     public void setUp() throws Exception {
         processId = UUID.randomUUID();
@@ -1533,6 +1533,60 @@ public class StreamThreadTest {
     }
 
     @Test
+    public void shouldCaptureCommitFailedExceptionOnTaskSuspension() throws Exception {
+        builder.stream("t1");
+
+        final TestStreamTask testStreamTask = new TestStreamTask(
+                new TaskId(0, 0),
+                applicationId,
+                Utils.mkSet(new TopicPartition("t1", 0)),
+                builder.build(0),
+                clientSupplier.consumer,
+                clientSupplier.getProducer(new HashMap<String, Object>()),
+                clientSupplier.restoreConsumer,
+                config,
+                new MockStreamsMetrics(new Metrics()),
+                new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG),
mockTime)) {
+
+            @Override
+            public void suspend() {
+                throw new CommitFailedException();
+            }
+        };
+
+        final StreamThread thread = new StreamThread(
+                builder,
+                config,
+                clientSupplier,
+                applicationId,
+                clientId,
+                processId,
+                metrics,
+                mockTime,
+                new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+                0) {
+
+            @Override
+            protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition>
partitions) {
+                return testStreamTask;
+            }
+        };
+
+        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+        activeTasks.put(testStreamTask.id(), testStreamTask.partitions);
+
+        thread.setPartitionAssignor(new MockStreamsPartitionAssignor(activeTasks));
+
+        thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+        thread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions);
+
+        thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+
+        assertFalse(testStreamTask.committed);
+    }
+
+
+    @Test
     public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringTaskSuspension() throws
Exception {
         final KStreamBuilder builder = new KStreamBuilder();
         builder.setApplicationId(applicationId);


Mime
View raw message