kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: KAFKA-5157; Options for handling corrupt data during deserialization
Date Mon, 10 Jul 2017 18:58:47 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 6a5cd67b9 -> a1f97c8dc


KAFKA-5157; Options for handling corrupt data during deserialization

This is the implementation of KIP-161: https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Damian Guy <damian.guy@gmail.com>, Matthias J. Sax <matthias@confluent.io>

Closes #3423 from enothereska/KAFKA-5157-deserialization-exceptions


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a1f97c8d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a1f97c8d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a1f97c8d

Branch: refs/heads/trunk
Commit: a1f97c8dc4edd49c7a8a2ebb72734728029a85aa
Parents: 6a5cd67
Author: Eno Thereska <eno.thereska@gmail.com>
Authored: Mon Jul 10 11:58:51 2017 -0700
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Mon Jul 10 11:58:51 2017 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/StreamsConfig.java | 18 +++++
 .../errors/DeserializationExceptionHandler.java | 60 +++++++++++++++
 .../errors/LogAndContinueExceptionHandler.java  | 52 +++++++++++++
 .../errors/LogAndFailExceptionHandler.java      | 52 +++++++++++++
 .../internals/GlobalStateUpdateTask.java        | 45 ++++++------
 .../processor/internals/GlobalStreamThread.java |  3 +-
 .../processor/internals/ProcessorNode.java      |  3 +
 .../processor/internals/RecordQueue.java        | 21 +++++-
 .../internals/SourceNodeRecordDeserializer.java | 34 ++++++++-
 .../streams/processor/internals/StreamTask.java | 21 +++---
 .../internals/GlobalStateTaskTest.java          | 77 ++++++++++++++++++--
 .../processor/internals/PartitionGroupTest.java |  5 +-
 .../processor/internals/RecordQueueTest.java    | 60 +++++++++++++--
 .../SourceNodeRecordDeserializerTest.java       |  6 +-
 .../kafka/test/ProcessorTopologyTestDriver.java |  3 +-
 15 files changed, 403 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a1f97c8d/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index e7a8c44..606e314 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -29,6 +29,8 @@ import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
+import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
@@ -146,6 +148,13 @@ public class StreamsConfig extends AbstractConfig {
     /** {@code connections.max.idle.ms} */
     public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
 
+    /**
+     * {@code default.deserialization.exception.handler}
+     */
+    public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.deserialization.exception.handler";
+    private static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>DeserializationExceptionHandler</code> interface.";
+
+
     /** {@code default key.serde} */
     public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG = "default.key.serde";
     private static final String DEFAULT_KEY_SERDE_CLASS_DOC = " Default serializer / deserializer class for key that implements the <code>Serde</code> interface.";
@@ -303,6 +312,11 @@ public class StreamsConfig extends AbstractConfig {
                     "",
                     Importance.MEDIUM,
                     CommonClientConfigs.CLIENT_ID_DOC)
+            .define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
+                Type.CLASS,
+                LogAndFailExceptionHandler.class.getName(),
+                Importance.MEDIUM,
+                DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC)
             .define(DEFAULT_KEY_SERDE_CLASS_CONFIG,
                     Type.CLASS,
                     Serdes.ByteArraySerde.class.getName(),
@@ -790,6 +804,10 @@ public class StreamsConfig extends AbstractConfig {
         return timestampExtractor;
     }
 
+    public DeserializationExceptionHandler defaultDeserializationExceptionHandler() {
+        return getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class);
+    }
+
     /**
      * Override any client properties in the original configs with overrides
      *

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1f97c8d/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java
new file mode 100644
index 0000000..c1abb4d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+/**
+ * Interface that specifies how an exception from source node deserialization
+ * (e.g., reading from Kafka) should be handled.
+ */
+public interface DeserializationExceptionHandler extends Configurable {
+    /**
+     * Inspect a record and the exception received.
+     * @param context processor context
+     * @param record record that failed deserialization
+     * @param exception the actual exception
+     */
+    DeserializationHandlerResponse handle(final ProcessorContext context,
+                                          final ConsumerRecord<byte[], byte[]> record,
+                                          final Exception exception);
+
+    /**
+     * Enumeration that describes the response from the exception handler.
+     */
+    enum DeserializationHandlerResponse {
+        /* continue with processing */
+        CONTINUE(0, "CONTINUE"),
+        /* fail the processing and stop */
+        FAIL(1, "FAIL");
+
+        /** an english description of the api--this is for debugging and can change */
+        public final String name;
+
+        /** the permanent and immutable id of an API--this can't change ever */
+        public final int id;
+
+        DeserializationHandlerResponse(final int id, final String name) {
+            this.id = id;
+            this.name = name;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1f97c8d/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
new file mode 100644
index 0000000..dde4b52
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Deserialization handler that logs a deserialization exception and then
+ * signals the processing pipeline to continue processing more records.
+ */
+public class LogAndContinueExceptionHandler implements DeserializationExceptionHandler {
+    private static final Logger log = LoggerFactory.getLogger(StreamThread.class);
+
+    @Override
+    public DeserializationHandlerResponse handle(final ProcessorContext context,
+                                                 final ConsumerRecord<byte[], byte[]> record,
+                                                 final Exception exception) {
+
+        log.warn("Exception caught during Deserialization, " +
+                        "taskId: {}, topic: {}, partition: {}, offset: {}",
+                context.taskId(), record.topic(), record.partition(), record.offset(),
+                exception);
+
+        return DeserializationHandlerResponse.CONTINUE;
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs) {
+        // ignore
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1f97c8d/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
new file mode 100644
index 0000000..23557a3
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+
+/**
+ * Deserialization handler that logs a deserialization exception and then
+ * signals the processing pipeline to stop processing more records and fail.
+ */
+public class LogAndFailExceptionHandler implements DeserializationExceptionHandler {
+    private static final Logger log = LoggerFactory.getLogger(StreamThread.class);
+
+    @Override
+    public DeserializationHandlerResponse handle(final ProcessorContext context,
+                                                 final ConsumerRecord<byte[], byte[]> record,
+                                                 final Exception exception) {
+
+        log.warn("Exception caught during Deserialization, " +
+                        "taskId: {}, topic: {}, partition: {}, offset: {}",
+                context.taskId(), record.topic(), record.partition(), record.offset(),
+                exception);
+
+        return DeserializationHandlerResponse.FAIL;
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs) {
+        // ignore
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1f97c8d/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index 2540af0..38beb63 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -29,31 +30,23 @@ import java.util.Set;
  */
 public class GlobalStateUpdateTask implements GlobalStateMaintainer {
 
-    private static class SourceNodeAndDeserializer {
-        private final SourceNode sourceNode;
-        private final RecordDeserializer deserializer;
-
-        SourceNodeAndDeserializer(final SourceNode sourceNode,
-                                  final RecordDeserializer deserializer) {
-            this.sourceNode = sourceNode;
-            this.deserializer = deserializer;
-        }
-    }
-
     private final ProcessorTopology topology;
     private final InternalProcessorContext processorContext;
     private final Map<TopicPartition, Long> offsets = new HashMap<>();
-    private final Map<String, SourceNodeAndDeserializer> deserializers = new HashMap<>();
+    private final Map<String, SourceNodeRecordDeserializer> deserializers = new HashMap<>();
     private final GlobalStateManager stateMgr;
+    private final DeserializationExceptionHandler deserializationExceptionHandler;
 
 
     public GlobalStateUpdateTask(final ProcessorTopology topology,
                                  final InternalProcessorContext processorContext,
-                                 final GlobalStateManager stateMgr) {
+                                 final GlobalStateManager stateMgr,
+                                 final DeserializationExceptionHandler deserializationExceptionHandler) {
 
         this.topology = topology;
         this.stateMgr = stateMgr;
         this.processorContext = processorContext;
+        this.deserializationExceptionHandler = deserializationExceptionHandler;
     }
 
     @SuppressWarnings("unchecked")
@@ -63,7 +56,7 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
         for (final String storeName : storeNames) {
             final String sourceTopic = storeNameToTopic.get(storeName);
             final SourceNode source = topology.source(sourceTopic);
-            deserializers.put(sourceTopic, new SourceNodeAndDeserializer(source, new SourceNodeRecordDeserializer(source)));
+            deserializers.put(sourceTopic, new SourceNodeRecordDeserializer(source, deserializationExceptionHandler));
         }
         initTopology();
         processorContext.initialized();
@@ -74,17 +67,21 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
     @SuppressWarnings("unchecked")
     @Override
     public void update(final ConsumerRecord<byte[], byte[]> record) {
-        final SourceNodeAndDeserializer sourceNodeAndDeserializer = deserializers.get(record.topic());
-        final ConsumerRecord<Object, Object> deserialized = sourceNodeAndDeserializer.deserializer.deserialize(record);
-        final ProcessorRecordContext recordContext =
+        final SourceNodeRecordDeserializer sourceNodeAndDeserializer = deserializers.get(record.topic());
+        final ConsumerRecord<Object, Object> deserialized = sourceNodeAndDeserializer.tryDeserialize(processorContext, record);
+
+        if (deserialized != null) {
+            final ProcessorRecordContext recordContext =
                 new ProcessorRecordContext(deserialized.timestamp(),
-                                           deserialized.offset(),
-                                           deserialized.partition(),
-                                           deserialized.topic());
-        processorContext.setRecordContext(recordContext);
-        processorContext.setCurrentNode(sourceNodeAndDeserializer.sourceNode);
-        sourceNodeAndDeserializer.sourceNode.process(deserialized.key(), deserialized.value());
-        offsets.put(new TopicPartition(record.topic(), record.partition()), deserialized.offset() + 1);
+                    deserialized.offset(),
+                    deserialized.partition(),
+                    deserialized.topic());
+            processorContext.setRecordContext(recordContext);
+            processorContext.setCurrentNode(sourceNodeAndDeserializer.sourceNode());
+            sourceNodeAndDeserializer.sourceNode().process(deserialized.key(), deserialized.value());
+        }
+
+        offsets.put(new TopicPartition(record.topic(), record.partition()), record.offset() + 1);
     }
 
     public void flushState() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1f97c8d/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index 11b89df..2d0dcfb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -277,7 +277,8 @@ public class GlobalStreamThread extends Thread {
                                                                           stateMgr,
                                                                           streamsMetrics,
                                                                           cache),
-                                                                  stateMgr),
+                                                                  stateMgr,
+                                                                  config.defaultDeserializationExceptionHandler()),
                                         time,
                                         config.getLong(StreamsConfig.POLL_MS_CONFIG),
                                         config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1f97c8d/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 47f6311..0cc746e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -168,6 +168,7 @@ public class ProcessorNode<K, V> {
         final Sensor nodeProcessTimeSensor;
         final Sensor nodePunctuateTimeSensor;
         final Sensor sourceNodeForwardSensor;
+        final Sensor sourceNodeSkippedDueToDeserializationError;
         final Sensor nodeCreationSensor;
         final Sensor nodeDestructionSensor;
 
@@ -186,6 +187,7 @@ public class ProcessorNode<K, V> {
             this.nodeCreationSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix + "." + name, "create", Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
             this.nodeDestructionSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix + "." + name, "destroy", Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
             this.sourceNodeForwardSensor = metrics.addThroughputSensor(scope, sensorNamePrefix + "." + name, "forward", Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
+            this.sourceNodeSkippedDueToDeserializationError = metrics.addThroughputSensor(scope, sensorNamePrefix + "." + name, "skippedDueToDeserializationError", Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
         }
 
         public void removeAllSensors() {
@@ -194,6 +196,7 @@ public class ProcessorNode<K, V> {
             metrics.removeSensor(sourceNodeForwardSensor);
             metrics.removeSensor(nodeCreationSensor);
             metrics.removeSensor(nodeDestructionSensor);
+            metrics.removeSensor(sourceNodeSkippedDueToDeserializationError);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1f97c8d/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index 0902614..d26511c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -18,6 +18,8 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,19 +41,25 @@ public class RecordQueue {
     private final TopicPartition partition;
     private final ArrayDeque<StampedRecord> fifoQueue;
     private final TimestampTracker<ConsumerRecord<Object, Object>> timeTracker;
-    private final RecordDeserializer recordDeserializer;
+    private final SourceNodeRecordDeserializer recordDeserializer;
+    private final DeserializationExceptionHandler deserializationExceptionHandler;
+    private final ProcessorContext processorContext;
 
     private long partitionTime = TimestampTracker.NOT_KNOWN;
 
     RecordQueue(final TopicPartition partition,
                 final SourceNode source,
-                final TimestampExtractor timestampExtractor) {
+                final TimestampExtractor timestampExtractor,
+                final DeserializationExceptionHandler deserializationExceptionHandler,
+                final ProcessorContext processorContext) {
         this.partition = partition;
         this.source = source;
         this.timestampExtractor = timestampExtractor;
         this.fifoQueue = new ArrayDeque<>();
         this.timeTracker = new MinTimestampTracker<>();
-        this.recordDeserializer = new SourceNodeRecordDeserializer(source);
+        this.recordDeserializer = new SourceNodeRecordDeserializer(source, deserializationExceptionHandler);
+        this.deserializationExceptionHandler = deserializationExceptionHandler;
+        this.processorContext = processorContext;
     }
 
 
@@ -81,7 +89,12 @@ public class RecordQueue {
      */
     public int addRawRecords(Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) {
         for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
-            ConsumerRecord<Object, Object> record = recordDeserializer.deserialize(rawRecord);
+
+            ConsumerRecord<Object, Object> record = recordDeserializer.tryDeserialize(processorContext, rawRecord);
+            if (record == null) {
+                continue;
+            }
+
             long timestamp = timestampExtractor.extract(record, timeTracker.get());
             log.trace("Source node {} extracted timestamp {} for record {}", source.name(), timestamp, record);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1f97c8d/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
index f66c0d9..e26d110 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
@@ -18,15 +18,21 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.ProcessorContext;
 
 import static java.lang.String.format;
+import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG;
 
 class SourceNodeRecordDeserializer implements RecordDeserializer {
     private final SourceNode sourceNode;
+    private final DeserializationExceptionHandler deserializationExceptionHandler;
 
-    SourceNodeRecordDeserializer(final SourceNode sourceNode) {
+    SourceNodeRecordDeserializer(final SourceNode sourceNode,
+                                 final DeserializationExceptionHandler deserializationExceptionHandler) {
         this.sourceNode = sourceNode;
+        this.deserializationExceptionHandler = deserializationExceptionHandler;
     }
 
     @Override
@@ -54,4 +60,30 @@ class SourceNodeRecordDeserializer implements RecordDeserializer {
                                     rawRecord.serializedValueSize(), key, value);
 
     }
+
+    public ConsumerRecord<Object, Object> tryDeserialize(final ProcessorContext processorContext,
+                                                         ConsumerRecord<byte[], byte[]> rawRecord) {
+
+        // catch and process if we have a deserialization handler
+        try {
+            return deserialize(rawRecord);
+        } catch (Exception e) {
+            final DeserializationExceptionHandler.DeserializationHandlerResponse response =
+                    deserializationExceptionHandler.handle(processorContext, rawRecord, e);
+            if (response == DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL) {
+                throw new StreamsException("Deserialization exception handler is set to fail upon" +
+                        " a deserialization error. If you would rather have the streaming pipeline" +
+                        " continue after a deserialization error, please set the " +
+                        DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.",
+                        e);
+            } else {
+                sourceNode.nodeMetrics.sourceNodeSkippedDueToDeserializationError.record();
+            }
+        }
+        return null;
+    }
+
+    public SourceNode sourceNode() {
+        return sourceNode;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1f97c8d/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 3fd4596..6ff1818 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -122,15 +123,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         // to corresponding source nodes in the processor topology
         final Map<TopicPartition, RecordQueue> partitionQueues = new HashMap<>();
 
-        final TimestampExtractor defaultTimestampExtractor  = config.defaultTimestampExtractor();
-        for (final TopicPartition partition : partitions) {
-            final SourceNode source = topology.source(partition.topic());
-            final TimestampExtractor sourceTimestampExtractor = source.getTimestampExtractor() != null ? source.getTimestampExtractor() : defaultTimestampExtractor;
-            final RecordQueue queue = new RecordQueue(partition, source, sourceTimestampExtractor);
-            partitionQueues.put(partition, queue);
-        }
-
-        partitionGroup = new PartitionGroup(partitionQueues);
 
         // initialize the consumed offset cache
         consumedOffsets = new HashMap<>();
@@ -140,6 +132,17 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
 
         // initialize the topology with its own context
         processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics, cache);
+
+        final TimestampExtractor defaultTimestampExtractor  = config.defaultTimestampExtractor();
+        final DeserializationExceptionHandler defaultDeserializationExceptionHandler = config.defaultDeserializationExceptionHandler();
+        for (final TopicPartition partition : partitions) {
+            final SourceNode source = topology.source(partition.topic());
+            final TimestampExtractor sourceTimestampExtractor = source.getTimestampExtractor() != null ? source.getTimestampExtractor() : defaultTimestampExtractor;
+            final RecordQueue queue = new RecordQueue(partition, source, sourceTimestampExtractor, defaultDeserializationExceptionHandler, processorContext);
+            partitionQueues.put(partition, queue);
+        }
+
+        partitionGroup = new PartitionGroup(partitionQueues);
         this.time = time;
         log.debug("{} Initializing", logPrefix);
         initializeStateStores();

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1f97c8d/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
index 4022ba9..7859a06 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
@@ -18,10 +18,15 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
+import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.test.GlobalStateManagerStub;
 import org.apache.kafka.test.MockProcessorNode;
@@ -41,6 +46,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class GlobalStateTaskTest {
 
@@ -53,6 +59,7 @@ public class GlobalStateTaskTest {
     private TopicPartition t2;
     private MockSourceNode sourceOne;
     private MockSourceNode sourceTwo;
+    private ProcessorTopology topology;
 
     @Before
     public void before() {
@@ -70,12 +77,12 @@ public class GlobalStateTaskTest {
         final Map<String, String> storeToTopic = new HashMap<>();
         storeToTopic.put("t1-store", "t1");
         storeToTopic.put("t2-store", "t2");
-        final ProcessorTopology topology = new ProcessorTopology(processorNodes,
-                                                                 sourceByTopics,
-                                                                 Collections.<String, SinkNode>emptyMap(),
-                                                                 Collections.<StateStore>emptyList(),
-                                                                 storeToTopic,
-                                                                 Collections.<StateStore>emptyList());
+        topology = new ProcessorTopology(processorNodes,
+                                         sourceByTopics,
+                                         Collections.<String, SinkNode>emptyMap(),
+                                         Collections.<StateStore>emptyList(),
+                                         storeToTopic,
+                                         Collections.<StateStore>emptyList());
         context = new NoOpProcessorContext();
 
         t1 = new TopicPartition("t1", 1);
@@ -84,7 +91,7 @@ public class GlobalStateTaskTest {
         offsets.put(t1, 50L);
         offsets.put(t2, 100L);
         stateMgr = new GlobalStateManagerStub(storeNames, offsets);
-        globalStateTask = new GlobalStateUpdateTask(topology, context, stateMgr);
+        globalStateTask = new GlobalStateUpdateTask(topology, context, stateMgr, new LogAndFailExceptionHandler());
     }
 
     @Test
@@ -129,6 +136,62 @@ public class GlobalStateTaskTest {
         assertEquals(0, sourceOne.numReceived);
     }
 
+    private void maybeDeserialize(final GlobalStateUpdateTask globalStateTask,
+                                  final byte[] key,
+                                  final byte[] recordValue,
+                                  boolean failExpected) {
+        final ConsumerRecord record = new ConsumerRecord<>("t2", 1, 1,
+                0L, TimestampType.CREATE_TIME, 0L, 0, 0,
+                key, recordValue);
+        globalStateTask.initialize();
+        try {
+            globalStateTask.update(record);
+            if (failExpected) {
+                fail("Should have failed to deserialize.");
+            }
+        } catch (StreamsException e) {
+            if (!failExpected) {
+                fail("Shouldn't have failed to deserialize.");
+            }
+        }
+    }
+
+
+    @Test
+    public void shouldThrowStreamsExceptionWhenKeyDeserializationFails() throws Exception {
+        final byte[] key = new LongSerializer().serialize("t2", 1L);
+        final byte[] recordValue = new IntegerSerializer().serialize("t2", 10);
+        maybeDeserialize(globalStateTask, key, recordValue, true);
+    }
+
+
+    @Test
+    public void shouldThrowStreamsExceptionWhenValueDeserializationFails() throws Exception {
+        final byte[] key = new IntegerSerializer().serialize("t2", 1);
+        final byte[] recordValue = new LongSerializer().serialize("t2", 10L);
+        maybeDeserialize(globalStateTask, key, recordValue, true);
+    }
+
+    @Test
+    public void shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHandler() throws Exception {
+        final GlobalStateUpdateTask globalStateTask2 = new GlobalStateUpdateTask(topology, context, stateMgr,
+            new LogAndContinueExceptionHandler());
+        final byte[] key = new LongSerializer().serialize("t2", 1L);
+        final byte[] recordValue = new IntegerSerializer().serialize("t2", 10);
+
+        maybeDeserialize(globalStateTask2, key, recordValue, false);
+    }
+
+    @Test
+    public void shouldNotThrowStreamsExceptionWhenValueDeserializationFails() throws Exception {
+        final GlobalStateUpdateTask globalStateTask2 = new GlobalStateUpdateTask(topology, context, stateMgr,
+            new LogAndContinueExceptionHandler());
+        final byte[] key = new IntegerSerializer().serialize("t2", 1);
+        final byte[] recordValue = new LongSerializer().serialize("t2", 10L);
+
+        maybeDeserialize(globalStateTask2, key, recordValue, false);
+    }
+
 
     @Test
     public void shouldCloseStateManagerWithOffsets() throws Exception {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1f97c8d/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index 476b009..d9f38eb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.test.MockSourceNode;
 import org.apache.kafka.test.MockTimestampExtractor;
@@ -40,8 +41,8 @@ public class PartitionGroupTest {
     private final String[] topics = {"topic"};
     private final TopicPartition partition1 = new TopicPartition(topics[0], 1);
     private final TopicPartition partition2 = new TopicPartition(topics[0], 2);
-    private final RecordQueue queue1 = new RecordQueue(partition1, new MockSourceNode<>(topics, intDeserializer, intDeserializer), timestampExtractor);
-    private final RecordQueue queue2 = new RecordQueue(partition2, new MockSourceNode<>(topics, intDeserializer, intDeserializer), timestampExtractor);
+    private final RecordQueue queue1 = new RecordQueue(partition1, new MockSourceNode<>(topics, intDeserializer, intDeserializer), timestampExtractor, new LogAndContinueExceptionHandler(), null);
+    private final RecordQueue queue2 = new RecordQueue(partition2, new MockSourceNode<>(topics, intDeserializer, intDeserializer), timestampExtractor, new LogAndContinueExceptionHandler(), null);
 
     private final byte[] recordValue = intSerializer.serialize(null, 10);
     private final byte[] recordKey = intSerializer.serialize(null, 1);

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1f97c8d/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index f80b7eb..6c45fd8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -27,12 +27,19 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
+import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.test.MockProcessorContext;
 import org.apache.kafka.test.MockSourceNode;
 import org.apache.kafka.test.MockTimestampExtractor;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -44,13 +51,30 @@ public class RecordQueueTest {
     private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
     private final TimestampExtractor timestampExtractor = new MockTimestampExtractor();
     private final String[] topics = {"topic"};
+
+    final MockProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class),
+            new RecordCollectorImpl(null, null));
+    private final MockSourceNode mockSourceNodeWithMetrics = new MockSourceNode<>(topics, intDeserializer, intDeserializer);
     private final RecordQueue queue = new RecordQueue(new TopicPartition(topics[0], 1),
-                                                      new MockSourceNode<>(topics, intDeserializer, intDeserializer),
-                                                      timestampExtractor);
+            mockSourceNodeWithMetrics,
+            timestampExtractor, new LogAndFailExceptionHandler(), context);
+    private final RecordQueue queueThatSkipsDeserializeErrors = new RecordQueue(new TopicPartition(topics[0], 1),
+            mockSourceNodeWithMetrics,
+            timestampExtractor, new LogAndContinueExceptionHandler(), context);
 
     private final byte[] recordValue = intSerializer.serialize(null, 10);
     private final byte[] recordKey = intSerializer.serialize(null, 1);
 
+    @Before
+    public void before() {
+        mockSourceNodeWithMetrics.init(context);
+    }
+
+    @After
+    public void after() {
+        mockSourceNodeWithMetrics.close();
+    }
+
     @Test
     public void testTimeTracking() {
 
@@ -140,14 +164,38 @@ public class RecordQueueTest {
         queue.addRawRecords(records);
     }
 
+    @Test
+    public void shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHandler() throws Exception {
+        final byte[] key = Serdes.Long().serializer().serialize("foo", 1L);
+        final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
+            new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, key, recordValue));
+        final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
+
+        queueThatSkipsDeserializeErrors.addRawRecords(records);
+        assertEquals(0, queueThatSkipsDeserializeErrors.size());
+    }
+
+    @Test
+    public void shouldNotThrowStreamsExceptionWhenValueDeserializationFailsWithSkipHandler() throws Exception {
+        final byte[] value = Serdes.Long().serializer().serialize("foo", 1L);
+        final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
+            new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, value));
+
+        queueThatSkipsDeserializeErrors.addRawRecords(records);
+        assertEquals(0, queueThatSkipsDeserializeErrors.size());
+    }
+
+
     @Test(expected = StreamsException.class)
     public void shouldThrowOnNegativeTimestamp() {
         final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
                 new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
 
         final RecordQueue queue = new RecordQueue(new TopicPartition(topics[0], 1),
-                                                          new MockSourceNode<>(topics, intDeserializer, intDeserializer),
-                                                          new FailOnInvalidTimestamp());
+                                                  new MockSourceNode<>(topics, intDeserializer, intDeserializer),
+                                                  new FailOnInvalidTimestamp(),
+                                                  new LogAndContinueExceptionHandler(),
+                                  null);
         queue.addRawRecords(records);
     }
 
@@ -158,7 +206,9 @@ public class RecordQueueTest {
 
         final RecordQueue queue = new RecordQueue(new TopicPartition(topics[0], 1),
                                                   new MockSourceNode<>(topics, intDeserializer, intDeserializer),
-                                                  new LogAndSkipOnInvalidTimestamp());
+                                                  new LogAndSkipOnInvalidTimestamp(),
+                                                  new LogAndContinueExceptionHandler(),
+                                  null);
         queue.addRawRecords(records);
 
         assertEquals(0, queue.size());

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1f97c8d/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java
index a9f41e7..9ba8308 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java
@@ -43,21 +43,21 @@ public class SourceNodeRecordDeserializerTest {
     @Test(expected = StreamsException.class)
     public void shouldThrowStreamsExceptionIfKeyFailsToDeserialize() throws Exception {
         final SourceNodeRecordDeserializer recordDeserializer = new SourceNodeRecordDeserializer(
-                new TheSourceNode(true, false));
+                new TheSourceNode(true, false), null);
         recordDeserializer.deserialize(rawRecord);
     }
 
     @Test(expected = StreamsException.class)
     public void shouldThrowStreamsExceptionIfKeyValueFailsToDeserialize() throws Exception {
         final SourceNodeRecordDeserializer recordDeserializer = new SourceNodeRecordDeserializer(
-                new TheSourceNode(false, true));
+                new TheSourceNode(false, true), null);
         recordDeserializer.deserialize(rawRecord);
     }
 
     @Test
     public void shouldReturnNewConsumerRecordWithDeserializedValueWhenNoExceptions() throws Exception {
         final SourceNodeRecordDeserializer recordDeserializer = new SourceNodeRecordDeserializer(
-                new TheSourceNode(false, false, "key", "value"));
+                new TheSourceNode(false, false, "key", "value"), null);
         final ConsumerRecord<Object, Object> record = recordDeserializer.deserialize(rawRecord);
         assertEquals(rawRecord.topic(), record.topic());
         assertEquals(rawRecord.partition(), record.partition());

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1f97c8d/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index a9f020b..c59113e 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
@@ -205,7 +206,7 @@ public class ProcessorTopologyTestDriver {
             final GlobalStateManagerImpl stateManager = new GlobalStateManagerImpl(globalTopology, globalConsumer, stateDirectory);
             globalStateTask = new GlobalStateUpdateTask(globalTopology,
                                                         new GlobalProcessorContextImpl(config, stateManager, streamsMetrics, cache),
-                                                        stateManager
+                                                        stateManager, new LogAndContinueExceptionHandler()
             );
             globalStateTask.initialize();
         }


Mime
View raw message