kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4253: Fix Kafka Stream thread shutting down process ordering
Date Thu, 06 Oct 2016 16:43:42 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 4efa833f4 -> c9da62794


KAFKA-4253: Fix Kafka Stream thread shutting down process ordering

Changed the ordering in `StreamThread.shutdown`
1. commitAll (we need to commit so that any cached data is flushed through the topology)
2. close all tasks
3. producer.flush() - so any records produced during close are flushed and we have offsets
for them
4. close all state managers
5. close producers/consumers
6. remove the tasks

Also in `onPartitionsRevoked`
1. commitAll
2. close all tasks
3. producer.flush
4. close all state managers

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Eno Thereska, Guozhang Wang

Closes #1970 from dguy/kafka-4253


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c9da6279
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c9da6279
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c9da6279

Branch: refs/heads/trunk
Commit: c9da62794547ee75a7e2ade1aa8bbf4a1443c49c
Parents: 4efa833
Author: Damian Guy <damian.guy@gmail.com>
Authored: Thu Oct 6 09:43:39 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Oct 6 09:43:39 2016 -0700

----------------------------------------------------------------------
 .../processor/internals/AbstractTask.java       |  14 ++-
 .../internals/ProcessorStateManager.java        |   5 +-
 .../processor/internals/StandbyTask.java        |  10 ++
 .../streams/processor/internals/StreamTask.java |  13 ++-
 .../processor/internals/StreamThread.java       | 112 ++++++++++++++-----
 .../streams/state/internals/RocksDBStore.java   |   1 -
 .../processor/internals/AbstractTaskTest.java   |  10 ++
 .../internals/ProcessorStateManagerTest.java    |   6 +-
 .../processor/internals/StandbyTaskTest.java    |   4 +-
 .../processor/internals/StreamThreadTest.java   |   6 +
 .../state/internals/StateStoreTestUtils.java    |  31 +----
 .../apache/kafka/test/NoOpRecordCollector.java  |  48 ++++++++
 12 files changed, 189 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c9da6279/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index fe0d99c..7bda3f6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -108,10 +108,15 @@ public abstract class AbstractTask {
 
     public abstract void commit();
 
+
+    public abstract void close();
+
+    public abstract void commitOffsets();
+
     /**
      * @throws ProcessorStateException if there is an error while closing the state manager
      */
-    public void close() {
+    void closeStateManager() {
         try {
             stateMgr.close(recordCollectorOffsets());
         } catch (IOException e) {
@@ -168,4 +173,11 @@ public abstract class AbstractTask {
         sb.append("\n");
         return sb.toString();
     }
+
+    /**
+     * Flush all state stores owned by this task
+     */
+    public void flushState() {
+        stateMgr.flush((InternalProcessorContext) this.context());
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9da6279/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 2e1e4da..9d2e63f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -326,18 +326,17 @@ public class ProcessorStateManager {
     }
 
     /**
-     * @throws IOException if any error happens when flushing or closing the state stores
+     * @throws IOException if any error happens when closing the state stores
      */
     public void close(Map<TopicPartition, Long> ackedOffsets) throws IOException {
         try {
-            // attempting to flush and close the stores, just in case they
+            // attempting to close the stores, just in case they
             // are not closed by a ProcessorNode yet
             if (!stores.isEmpty()) {
                 log.debug("task [{}] Closing stores.", taskId);
                 for (Map.Entry<String, StateStore> entry : stores.entrySet()) {
                     log.debug("task [{}} Closing storage engine {}", taskId, entry.getKey());
                     try {
-                        entry.getValue().flush();
                         entry.getValue().close();
                     } catch (Exception e) {
                         throw new ProcessorStateException(String.format("task [%s] Failed
to close state store %s", taskId, entry.getKey()), e);

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9da6279/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 40c4d9c..e57b44a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -98,6 +98,16 @@ public class StandbyTask extends AbstractTask {
         initializeOffsetLimits();
     }
 
+    @Override
+    public void close() {
+        //no-op
+    }
+
+    @Override
+    public void commitOffsets() {
+        // no-op
+    }
+
     /**
      * Produces a string representation contain useful information about a StreamTask.
      * This is useful in debugging scenarios.

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9da6279/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 14daf56..061cfeb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -269,6 +269,14 @@ public class StreamTask extends AbstractTask implements Punctuator {
         recordCollector.flush();
 
         // 3) commit consumed offsets if it is dirty already
+        commitOffsets();
+    }
+
+    /**
+     * commit consumed offsets if needed
+     */
+    @Override
+    public void commitOffsets() {
         if (commitOffsetNeeded) {
             Map<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata = new
HashMap<>(consumedOffsets.size());
             for (Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet())
{
@@ -333,10 +341,9 @@ public class StreamTask extends AbstractTask implements Punctuator {
             }
         }
 
-        super.close();
-
-        if (exception != null)
+        if (exception != null) {
             throw exception;
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9da6279/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index f1913b8..0667865 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -132,8 +132,8 @@ public class StreamThread extends Thread {
         public void onPartitionsRevoked(Collection<TopicPartition> assignment) {
             try {
                 initialized.set(false);
-                commitAll();
                 lastCleanMs = Long.MAX_VALUE; // stop the cleaning cycle until partitions
are assigned
+                shutdownTasksAndState(true);
             } catch (Throwable t) {
                 rebalanceException = t;
                 throw t;
@@ -259,21 +259,10 @@ public class StreamThread extends Thread {
 
     private void shutdown() {
         log.info("stream-thread [{}] Shutting down", this.getName());
+        shutdownTasksAndState(false);
 
-        // Exceptions should not prevent this call from going through all shutdown steps
-        try {
-            commitAll();
-        } catch (Throwable e) {
-            // already logged in commitAll()
-        }
 
-        // Close standby tasks before closing the restore consumer since closing standby
tasks uses the restore consumer.
-        removeStandbyTasks();
-
-        // We need to first close the underlying clients before closing the state
-        // manager, for example we need to make sure producer's record sends
-        // have all been acked before the state manager records
-        // changelog sent offsets
+        // close all embedded clients
         try {
             producer.close();
         } catch (Throwable e) {
@@ -290,11 +279,80 @@ public class StreamThread extends Thread {
             log.error("stream-thread [{}] Failed to close restore consumer: ", this.getName(),
e);
         }
 
+        // remove all tasks
         removeStreamTasks();
+        removeStandbyTasks();
 
         log.info("stream-thread [{}] Stream thread shutdown complete", this.getName());
     }
 
+    private void shutdownTasksAndState(final boolean rethrowExceptions) {
+        // Commit first as there may be cached records that have not been flushed yet.
+        commitOffsets(rethrowExceptions);
+        // Close all processors in topology order
+        closeAllTasks();
+        // flush state
+        flushAllState(rethrowExceptions);
+        // flush out any extra data sent during close
+        producer.flush();
+        // Close all task state managers
+        closeAllStateManagers(rethrowExceptions);
+    }
+
+    interface AbstractTaskAction {
+        void apply(final AbstractTask task);
+    }
+
+    private void performOnAllTasks(final AbstractTaskAction action,
+                                   final String exceptionMessage,
+                                   final boolean throwExceptions) {
+        final List<AbstractTask> allTasks = new ArrayList<AbstractTask>(activeTasks.values());
+        allTasks.addAll(standbyTasks.values());
+        for (final AbstractTask task : allTasks) {
+            try {
+                action.apply(task);
+            } catch (KafkaException e) {
+                log.error(String.format("stream-thread [%s] Failed to %s for %s %s: ",
+                                        StreamThread.this.getName(),
+                                        exceptionMessage,
+                                        task.getClass().getSimpleName(),
+                                        task.id()),
+                          e);
+                if (throwExceptions) {
+                    throw e;
+                }
+            }
+        }
+    }
+
+    private void closeAllStateManagers(final boolean throwExceptions) {
+        performOnAllTasks(new AbstractTaskAction() {
+            @Override
+            public void apply(final AbstractTask task) {
+                    task.closeStateManager();
+            }
+        }, "close state manager", throwExceptions);
+    }
+
+    private void commitOffsets(final boolean throwExceptions) {
+        // Exceptions should not prevent this call from going through all shutdown steps
+        performOnAllTasks(new AbstractTaskAction() {
+            @Override
+            public void apply(final AbstractTask task) {
+                task.commitOffsets();
+            }
+        }, "commit consumer offsets", throwExceptions);
+    }
+
+    private void flushAllState(final boolean throwExceptions) {
+        performOnAllTasks(new AbstractTaskAction() {
+            @Override
+            public void apply(final AbstractTask task) {
+                task.flushState();
+            }
+        }, "flush state", throwExceptions);
+    }
+
     /**
      * Compute the latency based on the current marked timestamp,
      * and update the marked timestamp with the current system timestamp.
@@ -593,9 +651,6 @@ public class StreamThread extends Thread {
 
     private void removeStreamTasks() {
         try {
-            for (StreamTask task : activeTasks.values()) {
-                closeOne(task);
-            }
             prevTasks.clear();
             prevTasks.addAll(activeTasks.keySet());
 
@@ -607,16 +662,6 @@ public class StreamThread extends Thread {
         }
     }
 
-    private void closeOne(AbstractTask task) {
-        log.info("stream-thread [{}] Removing a task {}", this.getName(), task.id());
-        try {
-            task.close();
-        } catch (StreamsException e) {
-            log.error(String.format("stream-thread [%s] Failed to close a %s %s: ", this.getName(),
task.getClass().getSimpleName(), task.id()), e);
-        }
-        sensors.taskDestructionSensor.record();
-    }
-
     protected StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions)
{
         sensors.taskCreationSensor.record();
 
@@ -701,12 +746,19 @@ public class StreamThread extends Thread {
         return sb.toString();
     }
 
+    private void closeAllTasks() {
+        performOnAllTasks(new AbstractTaskAction() {
+            @Override
+            public void apply(final AbstractTask task) {
+                log.info("stream-thread [{}] Removing a task {}", StreamThread.this.getName(),
task.id());
+                task.close();
+                sensors.taskDestructionSensor.record();
+            }
+        }, "close", false);
+    }
 
     private void removeStandbyTasks() {
         try {
-            for (StandbyTask task : standbyTasks.values()) {
-                closeOne(task);
-            }
             standbyTasks.clear();
             standbyTasksByPartition.clear();
             standbyRecords.clear();

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9da6279/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 7bd1020..e27ffd8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -395,7 +395,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V>
{
         }
         open = false;
         closeOpenIterators();
-        flush();
         options.close();
         wOptions.close();
         fOptions.close();

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9da6279/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index b069cd9..7cd0b8c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -77,6 +77,16 @@ public class AbstractTaskTest {
             public void commit() {
                 // do nothing
             }
+
+            @Override
+            public void close() {
+
+            }
+
+            @Override
+            public void commitOffsets() {
+                // do nothing
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9da6279/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 7c22202..9198fa9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -31,8 +31,11 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
+import org.apache.kafka.test.MockProcessorContext;
 import org.apache.kafka.test.MockStateStoreSupplier;
+import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -408,7 +411,7 @@ public class ProcessorStateManagerTest {
     }
 
     @Test
-    public void testClose() throws IOException {
+    public void testFlushAndClose() throws IOException {
         final TaskId taskId = new TaskId(0, 1);
         File checkpointFile = new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME);
         // write an empty checkpoint file
@@ -445,6 +448,7 @@ public class ProcessorStateManagerTest {
             stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback);
         } finally {
             // close the state manager with the ack'ed offsets
+            stateMgr.flush(new MockProcessorContext(StateSerdes.withBuiltinTypes("foo", String.class,
String.class), new NoOpRecordCollector()));
             stateMgr.close(ackedOffsets);
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9da6279/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 268697c..afd9bb6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -199,7 +199,7 @@ public class StandbyTaskTest {
         assertEquals(Collections.emptyList(), store1.keys);
         assertEquals(Utils.mkList(1, 2, 3), store2.keys);
 
-        task.close();
+        task.closeStateManager();
 
         File taskDir = stateDirectory.directoryForTask(taskId);
         OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
@@ -292,7 +292,7 @@ public class StandbyTaskTest {
         remaining = task.update(ktable, remaining);
         assertNull(remaining);
 
-        task.close();
+        task.closeStateManager();
 
         File taskDir = stateDirectory.directoryForTask(taskId);
         OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9da6279/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index c7e9daa..2f252e9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -143,6 +143,12 @@ public class StreamThreadTest {
         }
 
         @Override
+        public void commitOffsets() {
+            super.commitOffsets();
+            committed = true;
+        }
+
+        @Override
         protected void initializeOffsetLimits() {
             // do nothing
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9da6279/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
index 1788159..d4cc99b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
@@ -16,18 +16,15 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.NoOpRecordCollector;
 
 import java.util.Collections;
 
@@ -49,32 +46,6 @@ public class StateStoreTestUtils {
 
     }
 
-    static class NoOpRecordCollector extends RecordCollector {
-        public NoOpRecordCollector() {
-            super(null, "StateStoreTestUtils");
-        }
-
-        @Override
-        public <K, V> void send(final ProducerRecord<K, V> record, final Serializer<K>
keySerializer, final Serializer<V> valueSerializer) {
-            // no-op
-        }
-
-        @Override
-        public <K, V> void send(final ProducerRecord<K, V> record, final Serializer<K>
keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<K, V>
partitioner) {
-            // no-op
-        }
-
-        @Override
-        public void flush() {
-            //no-op
-        }
-
-        @Override
-        public void close() {
-            //no-op
-        }
-    }
-
     static class NoOpReadOnlyStore<K, V>
             implements ReadOnlyKeyValueStore<K, V>, StateStore {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9da6279/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java b/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
new file mode 100644
index 0000000..880a93b
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+
+public class NoOpRecordCollector extends RecordCollector {
+    public NoOpRecordCollector() {
+        super(null, "NoOpRecordCollector");
+    }
+
+    @Override
+    public <K, V> void send(final ProducerRecord<K, V> record, final Serializer<K>
keySerializer, final Serializer<V> valueSerializer) {
+        // no-op
+    }
+
+    @Override
+    public <K, V> void send(final ProducerRecord<K, V> record, final Serializer<K>
keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<K, V>
partitioner) {
+        // no-op
+    }
+
+    @Override
+    public void flush() {
+        //no-op
+    }
+
+    @Override
+    public void close() {
+        //no-op
+    }
+}


Mime
View raw message