kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4720: Add a KStream#peek(ForeachAction<K, V>) in DSL [Forced Update!]
Date Thu, 16 Feb 2017 05:21:42 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 5fcc4584d -> cb725c6a2 (forced update)


KAFKA-4720: Add a KStream#peek(ForeachAction<K, V>) in DSL

https://issues.apache.org/jira/browse/KAFKA-4720

Peek is a handy method to have to insert diagnostics that do not affect the stream itself,
but some external state such as logging or metrics collection.

Author: Steven Schlansker <sschlansker@opentable.com>

Reviewers: Damian Guy, Matthias J. Sax, Eno Thereska, Guozhang Wang

Closes #2493 from stevenschlansker/kafka-4720-peek


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cb725c6a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cb725c6a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cb725c6a

Branch: refs/heads/trunk
Commit: cb725c6a2d21651fe8b2e50225635209aa949bb5
Parents: af18248
Author: Steven Schlansker <sschlansker@opentable.com>
Authored: Wed Feb 15 21:19:15 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Feb 15 21:20:49 2017 -0800

----------------------------------------------------------------------
 .../apache/kafka/streams/kstream/KStream.java   | 14 ++++
 .../streams/kstream/internals/KStreamImpl.java  | 12 +++
 .../streams/kstream/internals/KStreamPeek.java  | 45 +++++++++++
 .../kstream/internals/KStreamPeekTest.java      | 85 ++++++++++++++++++++
 4 files changed, 156 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/cb725c6a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 21135fb..64187e7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -406,6 +406,20 @@ public interface KStream<K, V> {
     void foreach(final ForeachAction<? super K, ? super V> action);
 
     /**
+     * Perform an action on each record of {@code KStream}.
+     * This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier,
String...)}).
+     * <p>
+     * Peek is a non-terminal operation that triggers a side effect (such as logging or statistics
collection)
+     * and returns an unchanged stream.
+     * <p>
+     * Note that since this operation is stateless, it may execute multiple times for a single
record in failure cases.
+     *
+     * @param action an action to perform on each record
+     * @see #process(ProcessorSupplier, String...)
+     */
+    KStream<K, V> peek(final ForeachAction<? super K, ? super V> action);
+
+    /**
      * Creates an array of {@code KStream} from this stream by branching the records in the
original stream based on
      * the supplied predicates.
      * Each record is evaluated against the supplied predicates, and predicates are evaluated
in order.

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb725c6a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 0434f06..f325dcf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -57,6 +57,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements
KStream<K, V
 
     public static final String FILTER_NAME = "KSTREAM-FILTER-";
 
+    public static final String PEEK_NAME = "KSTREAM-PEEK-";
+
     private static final String FLATMAP_NAME = "KSTREAM-FLATMAP-";
 
     private static final String FLATMAPVALUES_NAME = "KSTREAM-FLATMAPVALUES-";
@@ -318,6 +320,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K>
implements KStream<K, V
     }
 
     @Override
+    public KStream<K, V> peek(final ForeachAction<? super K, ? super V> action)
{
+        Objects.requireNonNull(action, "action can't be null");
+        final String name = topology.newName(PEEK_NAME);
+
+        topology.addProcessor(name, new KStreamPeek<>(action), this.name);
+
+        return new KStreamImpl<>(topology, name, sourceNodes, repartitionRequired);
+    }
+
+    @Override
     public KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde,
String topic) {
         return through(keySerde, valSerde, null, topic);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb725c6a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java
new file mode 100644
index 0000000..3dc0513
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+class KStreamPeek<K, V> implements ProcessorSupplier<K, V> {
+
+    private final ForeachAction<K, V> action;
+
+    public KStreamPeek(final ForeachAction<K, V> action) {
+        this.action = action;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamPeekProcessor();
+    }
+
+    private class KStreamPeekProcessor extends AbstractProcessor<K, V> {
+        @Override
+        public void process(final K key, final V value) {
+            action.apply(key, value);
+            context().forward(key, value);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb725c6a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
new file mode 100644
index 0000000..48f4b65
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.junit.After;
+import org.junit.Test;
+
+public class KStreamPeekTest {
+
+    private final String topicName = "topic";
+
+    private KStreamTestDriver driver = null;
+
+    @After
+    public void cleanup() {
+        if (driver != null) {
+            driver.close();
+        }
+    }
+
+    @Test
+    public void shouldObserveStreamElements() {
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KStream<Integer, String> stream = builder.stream(Serdes.Integer(), Serdes.String(),
topicName);
+        final List<KeyValue<Integer, String>> peekObserved = new ArrayList<>(),
streamObserved = new ArrayList<>();
+        stream.peek(collect(peekObserved)).foreach(collect(streamObserved));
+
+        driver = new KStreamTestDriver(builder);
+        final List<KeyValue<Integer, String>> expected = new ArrayList<>();
+        for (int key = 0; key < 32; key++) {
+            final String value = "V" + key;
+            driver.process(topicName, key, value);
+            expected.add(new KeyValue<>(key, value));
+        }
+
+        assertEquals(expected, peekObserved);
+        assertEquals(expected, streamObserved);
+    }
+
+    @Test
+    public void shouldNotAllowNullAction() {
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KStream<Integer, String> stream = builder.stream(Serdes.Integer(), Serdes.String(),
topicName);
+        try {
+            stream.peek(null);
+            fail("expected null action to throw NPE");
+        } catch (NullPointerException expected) { }
+    }
+
+    private static <K, V> ForeachAction<K, V> collect(final List<KeyValue<K,
V>> into) {
+        return new ForeachAction<K, V>() {
+            @Override
+            public void apply(final K key, final V value) {
+                into.add(new KeyValue<>(key, value));
+            }
+        };
+    }
+}


Mime
View raw message