kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3708: Better exception handling in Kafka Streams
Date Wed, 28 Sep 2016 18:07:48 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk c526c0c3f -> d83cde7ca


KAFKA-3708: Better exception handling in Kafka Streams

KafkaExceptions currently thrown from within StreamThread/StreamTask currently bubble up without
any additional context. This makes it hard to figure out where something went wrong, i.e,
which topic had the serialization exception etc

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1819 from dguy/kafka-3708 and squashes the following commits:

d6feaa8 [Damian Guy] address comments
15b89e7 [Damian Guy] merge trunk
6b8a8af [Damian Guy] catch exceptions in various places and throw more informative versions
c86eeda [Damian Guy] fix conflicts
8f37e2c [Damian Guy] add some context to exceptions


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d83cde7c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d83cde7c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d83cde7c

Branch: refs/heads/trunk
Commit: d83cde7cabe4e86951c6760e68e65b99752cfe0e
Parents: c526c0c
Author: Damian Guy <damian.guy@gmail.com>
Authored: Wed Sep 28 11:07:44 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Sep 28 11:07:44 2016 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/StreamsConfig.java | 23 +++--
 .../processor/internals/AbstractTask.java       | 15 +++-
 .../processor/internals/ProcessorNode.java      | 13 ++-
 .../internals/ProcessorStateManager.java        | 31 +++++--
 .../processor/internals/RecordCollector.java    | 37 +++++---
 .../processor/internals/RecordQueue.java        | 19 +++-
 .../streams/processor/internals/StreamTask.java | 13 +++
 .../processor/internals/StreamThread.java       |  9 --
 .../apache/kafka/streams/StreamsConfigTest.java | 39 +++++++++
 .../processor/internals/AbstractTaskTest.java   | 92 ++++++++++++++++++++
 .../processor/internals/ProcessorNodeTest.java  | 65 ++++++++++++++
 .../internals/RecordCollectorTest.java          | 42 +++++++++
 .../processor/internals/RecordQueueTest.java    | 21 +++++
 .../processor/internals/StreamTaskTest.java     | 89 +++++++++++++++++++
 14 files changed, 469 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d83cde7c/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 23b5287..6c88b11 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.ConsumerRecordTimestampExtractor;
 import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
 import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
@@ -412,17 +413,23 @@ public class StreamsConfig extends AbstractConfig {
     }
 
     public Serde keySerde() {
-        Serde<?> serde = getConfiguredInstance(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serde.class);
-        serde.configure(originals(), true);
-
-        return serde;
+        try {
+            Serde<?> serde = getConfiguredInstance(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serde.class);
+            serde.configure(originals(), true);
+            return serde;
+        } catch (Exception e) {
+            throw new StreamsException(String.format("Failed to configure key serde %s",
get(StreamsConfig.KEY_SERDE_CLASS_CONFIG)), e);
+        }
     }
 
     public Serde valueSerde() {
-        Serde<?> serde = getConfiguredInstance(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serde.class);
-        serde.configure(originals(), false);
-
-        return serde;
+        try {
+            Serde<?> serde = getConfiguredInstance(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serde.class);
+            serde.configure(originals(), false);
+            return serde;
+        } catch (Exception e) {
+            throw new StreamsException(String.format("Failed to configure value serde %s",
get(StreamsConfig.VALUE_SERDE_CLASS_CONFIG)), e);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/d83cde7c/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 54cbe4e..fe0d99c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -19,7 +19,10 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
@@ -122,8 +125,16 @@ public abstract class AbstractTask {
 
     protected void initializeOffsetLimits() {
         for (TopicPartition partition : partitions) {
-            OffsetAndMetadata metadata = consumer.committed(partition); // TODO: batch API?
-            stateMgr.putOffsetLimit(partition, metadata != null ? metadata.offset() : 0L);
+            try {
+                OffsetAndMetadata metadata = consumer.committed(partition); // TODO: batch
API?
+                stateMgr.putOffsetLimit(partition, metadata != null ? metadata.offset() :
0L);
+            } catch (AuthorizationException e) {
+                throw new ProcessorStateException(String.format("AuthorizationException when
initializing offsets for %s", partition), e);
+            } catch (WakeupException e) {
+                throw e;
+            } catch (KafkaException e) {
+                throw new ProcessorStateException(String.format("Failed to initialize offsets
for %s", partition), e);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d83cde7c/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index c05702b..d777a4b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
@@ -62,11 +63,19 @@ public class ProcessorNode<K, V> {
     }
 
     public void init(ProcessorContext context) {
-        processor.init(context);
+        try {
+            processor.init(context);
+        } catch (Exception e) {
+            throw new StreamsException(String.format("failed to initialize processor %s",
name), e);
+        }
     }
 
     public void close() {
-        processor.close();
+        try {
+            processor.close();
+        } catch (Exception e) {
+            throw new StreamsException(String.format("failed to close processor %s", name),
e);
+        }
     }
 
     public void process(final K key, final V value) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d83cde7c/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index aa2f365..2e1e4da 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -21,6 +21,8 @@ import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
@@ -158,7 +160,12 @@ public class ProcessorStateManager {
                 // ignore
             }
 
-            List<PartitionInfo> partitionInfos = restoreConsumer.partitionsFor(topic);
+            List<PartitionInfo> partitionInfos = null;
+            try {
+                partitionInfos = restoreConsumer.partitionsFor(topic);
+            } catch (TimeoutException e) {
+                throw new StreamsException(String.format("task [%s] Could not find partition
info for topic: %s", taskId, topic));
+            }
             if (partitionInfos == null) {
                 throw new StreamsException(String.format("task [%s] Could not find partition
info for topic: %s", taskId, topic));
             }
@@ -190,7 +197,7 @@ public class ProcessorStateManager {
 
         // subscribe to the store's partition
         if (!restoreConsumer.subscription().isEmpty()) {
-            throw new IllegalStateException(String.format("task [%s]  Restore consumer should
have not subscribed to any partitions beforehand", taskId));
+            throw new IllegalStateException(String.format("task [%s] Restore consumer should
have not subscribed to any partitions beforehand", taskId));
         }
         TopicPartition storePartition = new TopicPartition(topicName, getPartition(topicName));
         restoreConsumer.assign(Collections.singletonList(storePartition));
@@ -268,7 +275,11 @@ public class ProcessorStateManager {
         int count = 0;
         for (ConsumerRecord<byte[], byte[]> record : records) {
             if (record.offset() < limit) {
-                restoreCallback.restore(record.key(), record.value());
+                try {
+                    restoreCallback.restore(record.key(), record.value());
+                } catch (Exception e) {
+                    throw new ProcessorStateException(String.format("task [%s] exception
caught while trying to restore state from %s", taskId, storePartition), e);
+                }
                 lastOffset = record.offset();
             } else {
                 if (remainingRecords == null)
@@ -305,7 +316,11 @@ public class ProcessorStateManager {
                 if (processorNode != null) {
                     context.setCurrentNode(processorNode);
                 }
-                store.flush();
+                try {
+                    store.flush();
+                } catch (Exception e) {
+                    throw new ProcessorStateException(String.format("task [%s] Failed to
flush state store %s", taskId, store.name()), e);
+                }
             }
         }
     }
@@ -321,8 +336,12 @@ public class ProcessorStateManager {
                 log.debug("task [{}] Closing stores.", taskId);
                 for (Map.Entry<String, StateStore> entry : stores.entrySet()) {
                     log.debug("task [{}} Closing storage engine {}", taskId, entry.getKey());
-                    entry.getValue().flush();
-                    entry.getValue().close();
+                    try {
+                        entry.getValue().flush();
+                        entry.getValue().close();
+                    } catch (Exception e) {
+                        throw new ProcessorStateException(String.format("task [%s] Failed
to close state store %s", taskId, entry.getKey()), e);
+                    }
                 }
 
                 Map<TopicPartition, Long> checkpointOffsets = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/d83cde7c/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index fe88b95..45687c8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -23,7 +23,10 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,6 +36,8 @@ import java.util.List;
 import java.util.Map;
 
 public class RecordCollector {
+    private static final int MAX_SEND_ATTEMPTS = 3;
+    private static final long SEND_RETRY_BACKOFF = 100L;
 
     /**
      * A supplier of a {@link RecordCollector} instance.
@@ -76,18 +81,30 @@ public class RecordCollector {
                 new ProducerRecord<>(record.topic(), partition, record.timestamp(),
keyBytes, valBytes);
         final String topic = serializedRecord.topic();
 
-        this.producer.send(serializedRecord, new Callback() {
-            @Override
-            public void onCompletion(RecordMetadata metadata, Exception exception) {
-                if (exception == null) {
-                    TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
-                    offsets.put(tp, metadata.offset());
-                } else {
-                    String prefix = String.format("task [%s]", streamTaskId);
-                    log.error(String.format("%s Error sending record to topic %s", prefix,
topic), exception);
+        for (int attempt = 1; attempt <= MAX_SEND_ATTEMPTS; attempt++) {
+            try {
+                this.producer.send(serializedRecord, new Callback() {
+                    @Override
+                    public void onCompletion(RecordMetadata metadata, Exception exception)
{
+                        if (exception == null) {
+                            TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
+                            offsets.put(tp, metadata.offset());
+                        } else {
+                            String prefix = String.format("task [%s]", streamTaskId);
+                            log.error(String.format("%s Error sending record to topic %s",
prefix, topic), exception);
+                        }
+                    }
+                });
+                return;
+            } catch (TimeoutException e) {
+                if (attempt == MAX_SEND_ATTEMPTS) {
+                    throw new StreamsException(String.format("task [%s] failed to send record
to topic %s after %d attempts", streamTaskId, topic, attempt));
                 }
+                log.warn(String.format("task [%s] timeout exception caught when sending record
to topic %s attempt %s", streamTaskId, topic, attempt));
+                Utils.sleep(SEND_RETRY_BACKOFF);
             }
-        });
+
+        }
     }
 
     public void flush() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d83cde7c/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index 7e5baf3..156b45d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -25,6 +25,8 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
 
 import java.util.ArrayDeque;
 
+import static java.lang.String.format;
+
 /**
  * RecordQueue is a FIFO queue of {@link StampedRecord} (ConsumerRecord + timestamp). It
also keeps track of the
  * partition timestamp defined as the minimum timestamp of records in its queue; in addition,
its partition
@@ -75,8 +77,21 @@ public class RecordQueue {
     public int addRawRecords(Iterable<ConsumerRecord<byte[], byte[]>> rawRecords,
TimestampExtractor timestampExtractor) {
         for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
             // deserialize the raw record, extract the timestamp and put into the queue
-            Object key = source.deserializeKey(rawRecord.topic(), rawRecord.key());
-            Object value = source.deserializeValue(rawRecord.topic(), rawRecord.value());
+            final Object key;
+            try {
+                key = source.deserializeKey(rawRecord.topic(), rawRecord.key());
+            } catch (Exception e) {
+                throw new StreamsException(format("Failed to deserialize key for record.
topic=%s, partition=%d, offset=%d",
+                                                  rawRecord.topic(), rawRecord.partition(),
rawRecord.offset()), e);
+            }
+
+            final Object value;
+            try {
+                value = source.deserializeValue(rawRecord.topic(), rawRecord.value());
+            } catch (Exception e) {
+                throw new StreamsException(format("Failed to deserialize value for record.
topic=%s, partition=%d, offset=%d",
+                                                  rawRecord.topic(), rawRecord.partition(),
rawRecord.offset()), e);
+            }
 
             ConsumerRecord<Object, Object> record = new ConsumerRecord<>(rawRecord.topic(),
rawRecord.partition(), rawRecord.offset(),
                                                                          rawRecord.timestamp(),
TimestampType.CREATE_TIME,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d83cde7c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 2d40d88..14daf56 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -21,9 +21,11 @@ import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -34,6 +36,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
+import static java.lang.String.format;
 import static java.util.Collections.singleton;
 
 /**
@@ -189,6 +192,14 @@ public class StreamTask extends AbstractTask implements Punctuator {
             if (partitionGroup.topQueueSize() <= this.maxBufferedSize) {
                 requiresPoll = true;
             }
+        } catch (KafkaException ke) {
+            throw new StreamsException(format("exception caught in process. taskId=%s, processor=%s,
topic=%s, partition=%d, offset=%d",
+                                              id.toString(),
+                                              currNode.name(),
+                                              record.topic(),
+                                              record.partition(),
+                                              record.offset()
+                                              ), ke);
         } finally {
             processorContext.setCurrentNode(null);
             this.currNode = null;
@@ -234,6 +245,8 @@ public class StreamTask extends AbstractTask implements Punctuator {
         updateProcessorContext(createRecordContext(stampedRecord), node);
         try {
             node.processor().punctuate(timestamp);
+        } catch (KafkaException ke) {
+            throw new StreamsException(String.format("exception caught in punctuate. taskId=%s
processor=%s", id,  node.name()), ke);
         } finally {
             processorContext.setCurrentNode(null);
             currNode = null;

http://git-wip-us.apache.org/repos/asf/kafka/blob/d83cde7c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 9c3b07c..f1913b8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -313,9 +313,6 @@ public class StreamThread extends Thread {
         boolean requiresPoll = true;
         boolean polledRecords = false;
 
-        // TODO: this can be removed after KIP-62
-        long lastPoll = 0L;
-
         if (topicPattern != null) {
             consumer.subscribe(topicPattern, rebalanceListener);
         } else {
@@ -332,7 +329,6 @@ public class StreamThread extends Thread {
                 boolean longPoll = totalNumBuffered == 0;
 
                 ConsumerRecords<byte[], byte[]> records = consumer.poll(longPoll ?
this.pollTimeMs : 0);
-                lastPoll = time.milliseconds();
 
                 if (rebalanceException != null)
                     throw new StreamsException(String.format("stream-thread [%s] Failed to
rebalance", this.getName()), rebalanceException);
@@ -373,11 +369,6 @@ public class StreamThread extends Thread {
                             commitOne(task);
                     }
 
-                    // if pollTimeMs has passed since the last poll, we poll to respond to
a possible rebalance
-                    // even when we paused all partitions.
-                    if (lastPoll + this.pollTimeMs < this.timerStartedMs)
-                        requiresPoll = true;
-
                 } else {
                     // even when no task is assigned, we must poll to get a task.
                     requiresPoll = true;

http://git-wip-us.apache.org/repos/asf/kafka/blob/d83cde7c/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 30306f0..5f36b9d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -19,9 +19,12 @@ package org.apache.kafka.streams;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -172,4 +175,40 @@ public class StreamsConfigTest {
     }
 
 
+
+    @Test(expected = StreamsException.class)
+    public void shouldThrowStreamsExceptionIfKeySerdeConfigFails() throws Exception {
+        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        streamsConfig.keySerde();
+    }
+
+    @Test(expected = StreamsException.class)
+    public void shouldThrowStreamsExceptionIfValueSerdeConfigFails() throws Exception {
+        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        streamsConfig.valueSerde();
+    }
+
+    static class MisconfiguredSerde implements Serde {
+        @Override
+        public void configure(final Map configs, final boolean isKey) {
+            throw new RuntimeException("boom");
+        }
+
+        @Override
+        public void close() {
+
+        }
+
+        @Override
+        public Serializer serializer() {
+            return null;
+        }
+
+        @Override
+        public Deserializer deserializer() {
+            return null;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d83cde7c/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
new file mode 100644
index 0000000..b069cd9
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.state.internals.ThreadCache;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Test;
+
+import java.util.Collections;
+
+public class AbstractTaskTest {
+
+    @Test(expected = ProcessorStateException.class)
+    public void shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenAuthorizationException()
throws Exception {
+        final Consumer consumer = mockConsumer(new AuthorizationException("blah"));
+        final AbstractTask task = createTask(consumer);
+        task.initializeOffsetLimits();
+    }
+
+    @Test(expected = ProcessorStateException.class)
+    public void shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenKafkaException()
throws Exception {
+        final Consumer consumer = mockConsumer(new KafkaException("blah"));
+        final AbstractTask task = createTask(consumer);
+        task.initializeOffsetLimits();
+    }
+
+    @Test(expected = WakeupException.class)
+    public void shouldThrowWakeupExceptionOnInitializeOffsetsWhenWakeupException() throws
Exception {
+        final Consumer consumer = mockConsumer(new WakeupException());
+        final AbstractTask task = createTask(consumer);
+        task.initializeOffsetLimits();
+    }
+
+    private AbstractTask createTask(final Consumer consumer) {
+        return new AbstractTask(new TaskId(0, 0),
+                                "app",
+                                Collections.singletonList(new TopicPartition("t", 0)),
+                                new ProcessorTopology(Collections.<ProcessorNode>emptyList(),
+                                                      Collections.<String, SourceNode>emptyMap(),
+                                                      Collections.<String, SinkNode>emptyMap(),
+                                                      Collections.<StateStore>emptyList(),
+                                                      Collections.<String, String>emptyMap(),
+                                                      Collections.<StateStore, ProcessorNode>emptyMap()
+                                               ),
+                                consumer,
+                                consumer,
+                                false,
+                                new StateDirectory("app", TestUtils.tempDirectory().getPath()),
+                                new ThreadCache(0)) {
+            @Override
+            public void commit() {
+                // do nothing
+            }
+        };
+    }
+
+    private Consumer mockConsumer(final RuntimeException toThrow) {
+        return new MockConsumer(OffsetResetStrategy.EARLIEST) {
+            @Override
+            public OffsetAndMetadata committed(final TopicPartition partition) {
+                throw toThrow;
+            }
+        };
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/d83cde7c/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
new file mode 100644
index 0000000..3aafc00
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.junit.Test;
+
+import java.util.Collections;
+
+public class ProcessorNodeTest {
+
+    @SuppressWarnings("unchecked")
+    @Test (expected = StreamsException.class)
+    public void shouldThrowStreamsExceptionIfExceptionCaughtDuringInit() throws Exception
{
+        final ProcessorNode node = new ProcessorNode("name", new ExceptionalProcessor(),
Collections.emptySet());
+        node.init(null);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test (expected = StreamsException.class)
+    public void shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() throws Exception
{
+        final ProcessorNode node = new ProcessorNode("name", new ExceptionalProcessor(),
Collections.emptySet());
+        node.close();
+    }
+
+    private static class ExceptionalProcessor implements Processor {
+        @Override
+        public void init(final ProcessorContext context) {
+            throw new RuntimeException();
+        }
+
+        @Override
+        public void process(final Object key, final Object value) {
+            throw new RuntimeException();
+        }
+
+        @Override
+        public void punctuate(final long timestamp) {
+            throw new RuntimeException();
+        }
+
+        @Override
+        public void close() {
+            throw new RuntimeException();
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/d83cde7c/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 922ddb3..23abf8a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -17,15 +17,19 @@
 
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.junit.Test;
 
@@ -33,6 +37,8 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
 
@@ -119,4 +125,40 @@ public class RecordCollectorTest {
         assertEquals((Long) 0L, offsets.get(new TopicPartition("topic1", 2)));
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldRetryWhenTimeoutExceptionOccursOnSend() throws Exception {
+        final AtomicInteger attempt = new AtomicInteger(0);
+        RecordCollector collector = new RecordCollector(
+                new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer,
byteArraySerializer) {
+                    @Override
+                    public synchronized Future<RecordMetadata> send(final ProducerRecord
record, final Callback callback) {
+                        if (attempt.getAndIncrement() == 0) {
+                            throw new TimeoutException();
+                        }
+                        return super.send(record, callback);
+                    }
+                },
+                "test");
+
+        collector.send(new ProducerRecord<>("topic1", "3", "0"), stringSerializer,
stringSerializer, streamPartitioner);
+        final Long offset = collector.offsets().get(new TopicPartition("topic1", 0));
+        assertEquals(Long.valueOf(0L), offset);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test(expected = StreamsException.class)
+    public void shouldThrowStreamsExceptionAfterMaxAttempts() throws Exception {
+        RecordCollector collector = new RecordCollector(
+                new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer,
byteArraySerializer) {
+                    @Override
+                    public synchronized Future<RecordMetadata> send(final ProducerRecord
record, final Callback callback) {
+                        throw new TimeoutException();
+                    }
+                },
+                "test");
+
+        collector.send(new ProducerRecord<>("topic1", "3", "0"), stringSerializer,
stringSerializer, streamPartitioner);
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d83cde7c/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index 7870611..f30e0e6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -26,13 +26,16 @@ import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.test.MockSourceNode;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.junit.Test;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 public class RecordQueueTest {
@@ -115,4 +118,22 @@ public class RecordQueueTest {
         assertEquals(2, queue.size());
         assertEquals(5L, queue.timestamp());
     }
+
+    @Test(expected = StreamsException.class)
+    public void shouldThrowStreamsExceptionWhenKeyDeserializationFails() throws Exception
{
+        final byte[] key = Serdes.Long().serializer().serialize("foo", 1L);
+        final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
+                new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME,
0L, 0, 0, key, recordValue));
+
+        queue.addRawRecords(records, timestampExtractor);
+    }
+
+    @Test(expected = StreamsException.class)
+    public void shouldThrowStreamsExceptionWhenValueDeserializationFails() throws Exception
{
+        final byte[] value = Serdes.Long().serializer().serialize("foo", 1L);
+        final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
+                new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME,
0L, 0, 0, recordKey, value));
+
+        queue.addRawRecords(records, timestampExtractor);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d83cde7c/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 2b05e80..15f93d9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -30,8 +31,12 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.apache.kafka.test.MockProcessorNode;
 import org.apache.kafka.test.MockSourceNode;
 import org.apache.kafka.test.MockTimestampExtractor;
@@ -44,12 +49,15 @@ import java.io.File;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class StreamTaskTest {
 
@@ -283,6 +291,87 @@ public class StreamTaskTest {
 
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContext() throws Exception
{
+        final StreamsConfig config = createConfig(baseDir);
+        final MockSourceNode processorNode = new MockSourceNode(topic1, intDeserializer,
intDeserializer) {
+
+            @Override
+            public void process(final Object key, final Object value) {
+                throw new KafkaException("KABOOM!");
+            }
+        };
+
+        final List<ProcessorNode> processorNodes = Collections.<ProcessorNode>singletonList(processorNode);
+        final Map<String, SourceNode> sourceNodes
+                = Collections.<String, SourceNode>singletonMap(topic1[0], processorNode);
+        final ProcessorTopology topology = new ProcessorTopology(processorNodes,
+                                                                 sourceNodes,
+                                                                 Collections.<String,
SinkNode>emptyMap(),
+                                                                 Collections.<StateStore>emptyList(),
+                                                                 Collections.<String,
String>emptyMap(),
+                                                                 Collections.<StateStore,
ProcessorNode>emptyMap());
+        final StreamTask streamTask = new StreamTask(new TaskId(0, 0), "applicationId", partitions,
topology, consumer, producer, restoreStateConsumer, config, null, stateDirectory, new ThreadCache(0));
+        final int offset = 20;
+        streamTask.addRecords(partition1, Collections.singletonList(
+                new ConsumerRecord<>(partition1.topic(), partition1.partition(), offset,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
+
+        try {
+            streamTask.process();
+            fail("Should've thrown StreamsException");
+        } catch (StreamsException e) {
+            final String message = e.getMessage();
+            assertTrue("message=" + message + " should contain topic", message.contains("topic="
+ topic1[0]));
+            assertTrue("message=" + message + " should contain partition", message.contains("partition="
+ partition1.partition()));
+            assertTrue("message=" + message + " should contain offset", message.contains("offset="
+ offset));
+            assertTrue("message=" + message + " should contain processor", message.contains("processor="
+ processorNode.name()));
+        }
+
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuating()
throws Exception {
+        final StreamsConfig config = createConfig(baseDir);
+        final ProcessorNode punctuator = new ProcessorNode("test", new AbstractProcessor()
{
+            @Override
+            public void init(final ProcessorContext context) {
+                context.schedule(1);
+            }
+
+            @Override
+            public void process(final Object key, final Object value) {
+                //
+            }
+
+            @Override
+            public void punctuate(final long timestamp) {
+                throw new KafkaException("KABOOM!");
+            }
+        }, Collections.<String>emptySet());
+
+        final List<ProcessorNode> processorNodes = Collections.singletonList(punctuator);
+
+
+        final ProcessorTopology topology = new ProcessorTopology(processorNodes,
+                                                                 Collections.<String,
SourceNode>emptyMap(),
+                                                                 Collections.<String,
SinkNode>emptyMap(),
+                                                                 Collections.<StateStore>emptyList(),
+                                                                 Collections.<String,
String>emptyMap(),
+                                                                 Collections.<StateStore,
ProcessorNode>emptyMap());
+        final StreamTask streamTask = new StreamTask(new TaskId(0, 0), "applicationId", partitions,
topology, consumer, producer, restoreStateConsumer, config, null, stateDirectory, new ThreadCache(0));
+
+        try {
+            streamTask.punctuate(punctuator, 1);
+            fail("Should've thrown StreamsException");
+        } catch (StreamsException e) {
+            final String message = e.getMessage();
+            assertTrue("message=" + message + " should contain processor", message.contains("processor=test"));
+        }
+
+    }
+
     private Iterable<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[],
byte[]>... recs) {
         return Arrays.asList(recs);
     }


Mime
View raw message