kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: KAFKA-5815; add Printed class and KStream#print(printed)
Date Fri, 08 Sep 2017 17:22:08 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk e16b9143d -> 4769e3d92


KAFKA-5815; add Printed class and KStream#print(printed)

Part of KIP-182
- Add `Printed` class and `KStream#print(Printed)`
- deprecate all other `print` and `writeAsText` methods

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>,
Guozhang Wang <wangguoz@gmail.com>

Closes #3768 from dguy/kafka-5652-printed


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4769e3d9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4769e3d9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4769e3d9

Branch: refs/heads/trunk
Commit: 4769e3d92acdc6036f1f834c70004f0c867ae582
Parents: e16b914
Author: Damian Guy <damian.guy@gmail.com>
Authored: Fri Sep 8 18:22:04 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Fri Sep 8 18:22:04 2017 +0100

----------------------------------------------------------------------
 docs/streams/developer-guide.html               |  12 +-
 docs/streams/upgrade-guide.html                 |   6 +
 .../apache/kafka/streams/kstream/KStream.java   |  37 ++++++
 .../streams/kstream/PrintForeachAction.java     |  61 ---------
 .../apache/kafka/streams/kstream/Printed.java   | 126 +++++++++++++++++++
 .../streams/kstream/internals/KStreamImpl.java  |  29 ++---
 .../streams/kstream/internals/KStreamPrint.java |  43 +------
 .../streams/kstream/internals/KTableImpl.java   |   5 +-
 .../kstream/internals/PrintForeachAction.java   |  64 ++++++++++
 .../kstream/internals/PrintedInternal.java      |  36 ++++++
 .../kafka/streams/kstream/PrintedTest.java      | 126 +++++++++++++++++++
 .../kstream/internals/KStreamImplTest.java      |   7 +-
 .../kstream/internals/KStreamPrintTest.java     |  19 +--
 13 files changed, 426 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4769e3d9/docs/streams/developer-guide.html
----------------------------------------------------------------------
diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html
index 05acb55..42a9b20 100644
--- a/docs/streams/developer-guide.html
+++ b/docs/streams/developer-guide.html
@@ -1016,10 +1016,14 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
                 <pre class="brush: java;">
                        KStream&lt;byte[], String&gt; stream = ...;
                        stream.print();
-
-                       // Several variants of `print` exist to e.g. override the default
serdes for record keys
-                       // and record values, set a prefix label for the output string, etc
-                       stream.print(Serdes.ByteArray(), Serdes.String());
+                    
+                       // You can also override how and where the data is printed, i.e, to
file:
+                       stream.print(Printed.toFile("stream.out"));
+
+                       // with a custom KeyValueMapper and label
+                       stream.print(Printed.toSysOut()
+                                .withLabel("my-stream")
+                                .withKeyValueMapper((key, value) -> key + " -> " +
value));
                 </pre>
             </td>
         </tr>

http://git-wip-us.apache.org/repos/asf/kafka/blob/4769e3d9/docs/streams/upgrade-guide.html
----------------------------------------------------------------------
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index ffb365e..96c5941 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -86,6 +86,12 @@
     </p>
 
     <p>
+        With the introduction of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-182%3A+Reduce+Streams+DSL+overloads+and+allow+easier+use+of+custom+storage+engines">KIP-182</a>
+        you should no longer pass in <code>Serde</code> to <code>KStream#print</code>
operations.
+        If you can't rely on using <code>toString</code> to print your keys an
values, you should instead you provide a custom <code>KeyValueMapper</code> via
the <code>Printed#withKeyValueMapper</code> call.
+    </p>
+
+    <p>
         Windowed aggregations have moved from <code>KGroupedStream</code> to
<code>WindowedKStream</code>.
         You can now perform a windowed aggregation by, for example, using <code>KGroupedStream#windowedBy(Windows)#reduce(Reducer)</code>.
         Note: the previous aggregate functions on <code>KGroupedStream</code>
still work, but have been deprecated.

http://git-wip-us.apache.org/repos/asf/kafka/blob/4769e3d9/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index c1e5b87..3a51fad 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -274,7 +274,9 @@ public interface KStream<K, V> {
      * <p>
      * Implementors will need to override {@code toString()} for keys and values that are
not of type {@link String},
      * {@link Integer} etc. to get meaningful information.
+     * @deprecated use {@code print(Printed)}
      */
+    @Deprecated
     void print();
 
     /**
@@ -288,7 +290,9 @@ public interface KStream<K, V> {
      * {@link Integer} etc. to get meaningful information.
      *
      * @param label the name used to label the key/value pairs printed to the console
+     * @deprecated use {@code print(Printed)}
      */
+    @Deprecated
     void print(final String label);
 
     /**
@@ -304,7 +308,9 @@ public interface KStream<K, V> {
      *
      * @param keySerde key serde used to deserialize key if type is {@code byte[]},
      * @param valSerde value serde used to deserialize value if type is {@code byte[]},
+     * @deprecated use {@code print(Printed)}
      */
+    @Deprecated
     void print(final Serde<K> keySerde,
                final Serde<V> valSerde);
 
@@ -320,7 +326,9 @@ public interface KStream<K, V> {
      * @param keySerde   key serde used to deserialize key if type is {@code byte[]},
      * @param valSerde   value serde used to deserialize value if type is {@code byte[]},
      * @param label the name used to label the key/value pairs printed to the console
+     * @deprecated use {@code print(Printed)}
      */
+    @Deprecated
     void print(final Serde<K> keySerde,
                final Serde<V> valSerde,
                final String label);
@@ -344,7 +352,9 @@ public interface KStream<K, V> {
      * The KeyValueMapper's mapped value type must be {@code String}.
      *
      * @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
+     * @deprecated use {@code print(Printed)}
      */
+    @Deprecated
     void print(final KeyValueMapper<? super K, ? super V, String> mapper);
 
     /**
@@ -367,7 +377,9 @@ public interface KStream<K, V> {
      *
      * @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
      * @param label The given name which labels output will be printed.
+     * @deprecated use {@code print(Printed)}
      */
+    @Deprecated
     void print(final KeyValueMapper<? super K, ? super V, String> mapper, final String
label);
 
     /**
@@ -394,7 +406,9 @@ public interface KStream<K, V> {
      * @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
      * @param keySerde a {@link Serde<K>} used to deserialize key if type is {@code
byte[]}.
      * @param valSerde a {@link Serde<V>} used to deserialize value if type is {@code
byte[]}.
+     * @deprecated use {@code print(Printed)}
      */
+    @Deprecated
     void print(final KeyValueMapper<? super K, ? super V, String> mapper, final Serde<K>
keySerde, final Serde<V> valSerde);
 
     /**
@@ -422,10 +436,18 @@ public interface KStream<K, V> {
      * @param keySerde a {@link Serde<K>} used to deserialize key if type is {@code
byte[]}.
      * @param valSerde a {@link Serde<V>} used to deserialize value if type is {@code
byte[]}.
      * @param label The given name which labels output will be printed.
+     * @deprecated use {@code print(Printed)}
      */
+    @Deprecated
     void print(final KeyValueMapper<? super K, ? super V, String> mapper, final Serde<K>
keySerde, final Serde<V> valSerde, final String label);
 
     /**
+     * Print the records of this KStream using the options provided by {@link Printed}
+     * @param printed options for printing
+     */
+    void print(final Printed<K, V> printed);
+
+    /**
      * Write the records of this stream to a file at the given path.
      * This function will use the generated name of the parent processor node to label the
key/value pairs printed to
      * the file.
@@ -437,7 +459,9 @@ public interface KStream<K, V> {
      * {@link Integer} etc. to get meaningful information.
      *
      * @param filePath name of the file to write to
+     * @deprecated use {@code print(Printed)}
      */
+    @Deprecated
     void writeAsText(final String filePath);
 
     /**
@@ -452,7 +476,9 @@ public interface KStream<K, V> {
      *
      * @param filePath   name of the file to write to
      * @param label the name used to label the key/value pairs written to the file
+     * @deprecated use {@code print(Printed)}
      */
+    @Deprecated
     void writeAsText(final String filePath,
                      final String label);
 
@@ -470,7 +496,9 @@ public interface KStream<K, V> {
      * @param filePath name of the file to write to
      * @param keySerde key serde used to deserialize key if type is {@code byte[]},
      * @param valSerde value serde used to deserialize value if type is {@code byte[]},
+     * @deprecated use {@code print(Printed)}
      */
+    @Deprecated
     void writeAsText(final String filePath,
                      final Serde<K> keySerde,
                      final Serde<V> valSerde);
@@ -489,7 +517,9 @@ public interface KStream<K, V> {
      * @param label the name used to label the key/value pairs written to the file
      * @param keySerde   key serde used to deserialize key if type is {@code byte[]},
      * @param valSerde   value serde used deserialize value if type is {@code byte[]},
+     * @deprecated use {@code print(Printed)}
      */
+    @Deprecated
     void writeAsText(final String filePath,
                      final String label,
                      final Serde<K> keySerde,
@@ -517,7 +547,9 @@ public interface KStream<K, V> {
      *
      * @param filePath path of the file to write to.
      * @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
+     * @deprecated use {@code print(Printed)}
      */
+    @Deprecated
     void writeAsText(final String filePath, final KeyValueMapper<? super K, ? super V,
String> mapper);
 
     /**
@@ -543,7 +575,9 @@ public interface KStream<K, V> {
      * @param filePath path of the file to write to.
      * @param label the name used to label records written to file.
      * @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
+     * @deprecated use {@code print(Printed)}
      */
+    @Deprecated
     void writeAsText(final String filePath, final String label, final KeyValueMapper<?
super K, ? super V, String> mapper);
 
     /**
@@ -573,7 +607,9 @@ public interface KStream<K, V> {
      * @param keySerde key serde used to deserialize key if type is {@code byte[]}.
      * @param valSerde value serde used to deserialize value if type is {@code byte[]}.
      * @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
+     * @deprecated use {@code print(Printed)}
      */
+    @Deprecated
     void writeAsText(final String filePath, final Serde<K> keySerde, final Serde<V>
valSerde, final KeyValueMapper<? super K, ? super V, String> mapper);
 
     /**
@@ -604,6 +640,7 @@ public interface KStream<K, V> {
      * @param keySerde key serde used to deserialize key if type is {@code byte[]}.
      * @param valSerde value serde used to deserialize value if type is {@code byte[]}.
      * @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
+     * @deprecated use {@code print(Printed)}
      */
     void writeAsText(final String filePath, final String label, final Serde<K> keySerde,
final Serde<V> valSerde, final KeyValueMapper<? super K, ? super V, String> mapper);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4769e3d9/streams/src/main/java/org/apache/kafka/streams/kstream/PrintForeachAction.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/PrintForeachAction.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/PrintForeachAction.java
deleted file mode 100644
index 5e8ec28..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/PrintForeachAction.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream;
-
-import java.io.PrintWriter;
-
-public class PrintForeachAction<K, V> implements ForeachAction<K, V> {
-
-    private final String label;
-    private final PrintWriter printWriter;
-    private final KeyValueMapper<? super K, ? super V, String> mapper;
-    /**
-     * Print customized output with given writer. The PrintWriter can be null in order to
-     * distinguish between {@code System.out} and the others. If the PrintWriter is {@code
PrintWriter(System.out)},
-     * then it would close {@code System.out} output stream.
-     * <p>
-     * Afterall, not to pass in {@code PrintWriter(System.out)} but {@code null} instead.
-     *
-     * @param printWriter Use {@code System.out.println} if {@code null}.
-     * @param mapper The mapper which can allow user to customize output will be printed.
-     * @param label The given name will be printed.
-     */
-    public PrintForeachAction(final PrintWriter printWriter, final KeyValueMapper<? super
K, ? super V, String> mapper, final String label) {
-        this.printWriter = printWriter;
-        this.mapper = mapper;
-        this.label = label;
-    }
-
-    @Override
-    public void apply(final K key, final V value) {
-        final String data = String.format("[%s]: %s", label, mapper.apply(key, value));
-        if (printWriter == null) {
-            System.out.println(data);
-        } else {
-            printWriter.println(data);
-        }
-    }
-
-    public void close() {
-        if (printWriter == null) {
-            System.out.flush();
-        } else {
-            printWriter.close();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4769e3d9/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java
new file mode 100644
index 0000000..bdea27b
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.errors.TopologyException;
+
+import java.io.FileNotFoundException;
+import java.io.PrintWriter;
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+
+/**
+ * An object to define the options used when printing a {@link KStream}.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ * @see KStream#print(Printed)
+ */
+public class Printed<K, V> {
+    protected final PrintWriter printWriter;
+    protected String label;
+    protected KeyValueMapper<? super K, ? super V, String> mapper = new KeyValueMapper<K,
V, String>() {
+        @Override
+        public String apply(final K key, final V value) {
+            return String.format("%s, %s", key, value);
+        }
+    };
+
+    private Printed(final PrintWriter printWriter) {
+        this.printWriter = printWriter;
+    }
+
+    /**
+     * Copy constructor.
+     * @param printed   instance of {@link Printed} to copy
+     */
+    public Printed(final Printed<K, V> printed) {
+        this.printWriter = printed.printWriter;
+        this.label = printed.label;
+        this.mapper = printed.mapper;
+    }
+
+    /**
+     * Print the records of a {@link KStream} to a file.
+     *
+     * @param filePath path of the file
+     * @param <K>      key type
+     * @param <V>      value type
+     * @return a new Printed instance
+     */
+    public static <K, V> Printed<K, V> toFile(final String filePath) {
+        Objects.requireNonNull(filePath, "filePath can't be null");
+        if (filePath.trim().isEmpty()) {
+            throw new TopologyException("filePath can't be an empty string");
+        }
+        try {
+            return new Printed<>(new PrintWriter(filePath, StandardCharsets.UTF_8.name()));
+        } catch (final FileNotFoundException | UnsupportedEncodingException e) {
+            throw new TopologyException("Unable to write stream to file at [" + filePath
+ "] " + e.getMessage());
+        }
+    }
+
+    /**
+     * Print the records of a {@link KStream} to system out.
+     *
+     * @param <K> key type
+     * @param <V> value type
+     * @return a new Printed instance
+     */
+    public static <K, V> Printed<K, V> toSysOut() {
+        return new Printed<>((PrintWriter) null);
+    }
+
+    /**
+     * Print the records of a {@link KStream} with the provided label.
+     *
+     * @param label label to use
+     * @return this
+     */
+    public Printed<K, V> withLabel(final String label) {
+        Objects.requireNonNull(label, "label can't be null");
+        this.label = label;
+        return this;
+    }
+
+    /**
+     * Print the records of a {@link KStream} with the provided {@link KeyValueMapper}
+     * The provided KeyValueMapper's mapped value type must be {@code String}.
+     * <p>
+     * The example below shows how to customize output data.
+     * <pre>{@code
+     * final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer,
String, String>() {
+     *     @Override
+     *     public String apply(Integer key, String value) {
+     *         return String.format("(%d, %s)", key, value);
+     *     }
+     * };
+     * }</pre>
+     *
+     * Implementors will need to override {@code toString()} for keys and values that are
not of type {@link String},
+     * {@link Integer} etc. to get meaningful information.
+     *
+     * @param mapper mapper to use
+     * @return this
+     */
+    public Printed<K, V> withKeyValueMapper(final KeyValueMapper<? super K, ? super
V, String> mapper) {
+        Objects.requireNonNull(mapper, "mapper can't be null");
+        this.mapper = mapper;
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4769e3d9/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 93fe5b3..7adc426 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.JoinWindows;
@@ -30,7 +29,7 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.streams.kstream.PrintForeachAction;
+import org.apache.kafka.streams.kstream.Printed;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
@@ -43,11 +42,7 @@ import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.state.Stores;
 
-import java.io.FileNotFoundException;
-import java.io.PrintWriter;
-import java.io.UnsupportedEncodingException;
 import java.lang.reflect.Array;
-import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Objects;
@@ -236,8 +231,15 @@ public class KStreamImpl<K, V> extends AbstractStream<K>
implements KStream<K, V
                       final String label) {
         Objects.requireNonNull(mapper, "mapper can't be null");
         Objects.requireNonNull(label, "label can't be null");
-        String name = builder.newName(PRINTING_NAME);
-        builder.internalTopologyBuilder.addProcessor(name, new KStreamPrint<>(new PrintForeachAction<>(null,
mapper, label), keySerde, valSerde), this.name);
+        print(Printed.<K, V>toSysOut().withLabel(label).withKeyValueMapper(mapper));
+    }
+
+    @Override
+    public void print(final Printed<K, V> printed) {
+        Objects.requireNonNull(printed, "printed can't be null");
+        final PrintedInternal<K, V> printedInternal = new PrintedInternal<>(printed);
+        final String name = builder.newName(PRINTING_NAME);
+        builder.internalTopologyBuilder.addProcessor(name, printedInternal.build(this.name),
this.name);
     }
 
     @Override
@@ -295,16 +297,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K>
implements KStream<K, V
         Objects.requireNonNull(filePath, "filePath can't be null");
         Objects.requireNonNull(label, "label can't be null");
         Objects.requireNonNull(mapper, "mapper can't be null");
-        if (filePath.trim().isEmpty()) {
-            throw new TopologyException("filePath can't be an empty string");
-        }
-        final String name = builder.newName(PRINTING_NAME);
-        try {
-            PrintWriter printWriter = new PrintWriter(filePath, StandardCharsets.UTF_8.name());
-            builder.internalTopologyBuilder.addProcessor(name, new KStreamPrint<>(new
PrintForeachAction<>(printWriter, mapper, label), keySerde, valSerde), this.name);
-        } catch (FileNotFoundException | UnsupportedEncodingException e) {
-            throw new TopologyException("Unable to write stream to file at [" + filePath
+ "] " + e.getMessage());
-        }
+        print(Printed.<K, V>toFile(filePath).withKeyValueMapper(mapper).withLabel(label));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/4769e3d9/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java
index 8447ae1..ff82051 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java
@@ -17,67 +17,30 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.kstream.PrintForeachAction;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serde;
 
 public class KStreamPrint<K, V> implements ProcessorSupplier<K, V> {
 
-    private final Serde<?> keySerde;
-    private final Serde<?> valueSerde;
     private final ForeachAction<K, V> action;
     
-    public KStreamPrint(final ForeachAction<K, V> action, final Serde<?> keySerde,
final Serde<?> valueSerde) {
+    public KStreamPrint(final ForeachAction<K, V> action) {
         this.action = action;
-        this.keySerde = keySerde;
-        this.valueSerde = valueSerde;
     }
 
     @Override
     public Processor<K, V> get() {
-        return new KStreamPrintProcessor(keySerde, valueSerde);
+        return new KStreamPrintProcessor();
     }
 
     private class KStreamPrintProcessor extends AbstractProcessor<K, V> {
-        
-        private Serde<?> keySerde;
-        private Serde<?> valueSerde;
-        private ProcessorContext context;
-        
-        public KStreamPrintProcessor(final Serde<?> keySerde, final Serde<?>
valueSerde) {
-            this.keySerde = keySerde;
-            this.valueSerde = valueSerde;
-        }
-
-        @Override
-        public void init(ProcessorContext context) {
-            this.context = context;
-            if (keySerde == null) {
-                this.keySerde = context.keySerde();
-            }
-            if (valueSerde == null) {
-                this.valueSerde = context.valueSerde();
-            }
-        }
 
         @Override
         public void process(final K key, final V value) {
-            final K deKey = (K) maybeDeserialize(key, keySerde.deserializer());
-            final V deValue = (V) maybeDeserialize(value, valueSerde.deserializer());
-            action.apply(deKey, deValue);
+            action.apply(key, value);
         }
 
-        private Object maybeDeserialize(final Object keyOrValue, final Deserializer<?>
deserializer) {
-            if (keyOrValue instanceof byte[]) {
-                return deserializer.deserialize(this.context.topic(), (byte[]) keyOrValue);
-            }
-            return keyOrValue;
-        }
-        
         @Override
         public void close() {
             if (action instanceof PrintForeachAction) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/4769e3d9/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 0e06944..87277b6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -25,7 +25,6 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.streams.kstream.PrintForeachAction;
 import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
@@ -269,7 +268,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K>
implements KTable<K,
                       final String label) {
         Objects.requireNonNull(label, "label can't be null");
         final String name = builder.newName(PRINTING_NAME);
-        builder.internalTopologyBuilder.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(null,
defaultKeyValueMapper, label), keySerde, valSerde), this.name);
+        builder.internalTopologyBuilder.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(null,
defaultKeyValueMapper, label)), this.name);
     }
 
     @SuppressWarnings("deprecation")
@@ -310,7 +309,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K>
implements KTable<K,
         String name = builder.newName(PRINTING_NAME);
         try {
             PrintWriter printWriter = new PrintWriter(filePath, StandardCharsets.UTF_8.name());
-            builder.internalTopologyBuilder.addProcessor(name, new KStreamPrint<>(new
PrintForeachAction(printWriter, defaultKeyValueMapper, label), keySerde, valSerde), this.name);
+            builder.internalTopologyBuilder.addProcessor(name, new KStreamPrint<>(new
PrintForeachAction(printWriter, defaultKeyValueMapper, label)), this.name);
         } catch (final FileNotFoundException | UnsupportedEncodingException e) {
             throw new TopologyException(String.format("Unable to write stream to file at
[%s] %s", filePath, e.getMessage()));
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4769e3d9/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintForeachAction.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintForeachAction.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintForeachAction.java
new file mode 100644
index 0000000..dcdd44f
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintForeachAction.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+
+import java.io.PrintWriter;
+
+public class PrintForeachAction<K, V> implements ForeachAction<K, V> {
+
+    private final String label;
+    private final PrintWriter printWriter;
+    private final KeyValueMapper<? super K, ? super V, String> mapper;
+    /**
+     * Print customized output with given writer. The PrintWriter can be null in order to
+     * distinguish between {@code System.out} and the others. If the PrintWriter is {@code
PrintWriter(System.out)},
+     * then it would close {@code System.out} output stream.
+     * <p>
+     * Afterall, not to pass in {@code PrintWriter(System.out)} but {@code null} instead.
+     *
+     * @param printWriter Use {@code System.out.println} if {@code null}.
+     * @param mapper The mapper which can allow user to customize output will be printed.
+     * @param label The given name will be printed.
+     */
+    public PrintForeachAction(final PrintWriter printWriter, final KeyValueMapper<? super
K, ? super V, String> mapper, final String label) {
+        this.printWriter = printWriter;
+        this.mapper = mapper;
+        this.label = label;
+    }
+
+    @Override
+    public void apply(final K key, final V value) {
+        final String data = String.format("[%s]: %s", label, mapper.apply(key, value));
+        if (printWriter == null) {
+            System.out.println(data);
+        } else {
+            printWriter.println(data);
+        }
+    }
+
+    public void close() {
+        if (printWriter == null) {
+            System.out.flush();
+        } else {
+            printWriter.close();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4769e3d9/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java
new file mode 100644
index 0000000..7e1a02d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Printed;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+public class PrintedInternal<K, V> extends Printed<K, V> {
+    public PrintedInternal(final Printed<K, V> printed) {
+        super(printed);
+    }
+
+    /**
+     * Builds the {@link ProcessorSupplier} that will be used to print the records flowing
through a {@link KStream}.
+     *
+     * @return the {@code ProcessorSupplier} to be used for printing
+     */
+    public ProcessorSupplier<K, V> build(final String processorName) {
+        return new KStreamPrint<>(new PrintForeachAction<>(printWriter, mapper,
label != null ? label : processorName));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4769e3d9/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java
new file mode 100644
index 0000000..849d94f
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.kstream.internals.PrintedInternal;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+public class PrintedTest {
+
+    private final ByteArrayOutputStream sysOut = new ByteArrayOutputStream();
+    private final Printed<String, Integer> sysOutPrinter = Printed.toSysOut();
+
+    @Before
+    public void before() {
+        System.setOut(new PrintStream(sysOut));
+    }
+
+    @After
+    public void after() {
+        System.setOut(null);
+    }
+
+    @Test
+    public void shouldCreateProcessorThatPrintsToFile() throws IOException {
+        final File file = TestUtils.tempFile();
+        final ProcessorSupplier<String, Integer> processorSupplier = new PrintedInternal<>(
+                Printed.<String, Integer>toFile(file.getPath()))
+                .build("processor");
+        final Processor<String, Integer> processor = processorSupplier.get();
+        processor.process("hi", 1);
+        processor.close();
+        try (final FileInputStream stream = new FileInputStream(file)) {
+            final byte[] data = new byte[stream.available()];
+            stream.read(data);
+            assertThat(new String(data, StandardCharsets.UTF_8.name()), equalTo("[processor]:
hi, 1\n"));
+        }
+    }
+
+    @Test
+    public void shouldCreateProcessorThatPrintsToStdOut() throws UnsupportedEncodingException
{
+        final ProcessorSupplier<String, Integer> supplier = new PrintedInternal<>(sysOutPrinter).build("processor");
+        supplier.get().process("good", 2);
+        assertThat(sysOut.toString(StandardCharsets.UTF_8.name()), equalTo("[processor]:
good, 2\n"));
+    }
+
+    @Test
+    public void shouldPrintWithLabel() throws UnsupportedEncodingException {
+        final Processor<String, Integer> processor = new PrintedInternal<>(sysOutPrinter.withLabel("label"))
+                .build("processor")
+                .get();
+
+        processor.process("hello", 3);
+        assertThat(sysOut.toString(StandardCharsets.UTF_8.name()), equalTo("[label]: hello,
3\n"));
+    }
+
+    @Test
+    public void shouldPrintWithKeyValueMapper() throws UnsupportedEncodingException {
+        final Processor<String, Integer> processor = new PrintedInternal<>(sysOutPrinter.withKeyValueMapper(
+                new KeyValueMapper<String, Integer, String>() {
+                    @Override
+                    public String apply(final String key, final Integer value) {
+                        return String.format("%s -> %d", key, value);
+                    }
+                })).build("processor")
+                .get();
+        processor.process("hello", 1);
+        assertThat(sysOut.toString(StandardCharsets.UTF_8.name()), equalTo("[processor]:
hello -> 1\n"));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionIfFilePathIsNull() {
+        Printed.toFile(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionIfMapperIsNull() {
+        sysOutPrinter.withKeyValueMapper(null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionIfLabelIsNull() {
+        sysOutPrinter.withLabel(null);
+    }
+
+    @Test(expected = TopologyException.class)
+    public void shouldThrowTopologyExceptionIfFilePathIsEmpty() {
+        Printed.toFile("");
+    }
+
+    @Test(expected = TopologyException.class)
+    public void shouldThrowTopologyExceptionIfFilePathDoesntExist() {
+        Printed.toFile("/this/should/not/exist");
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/4769e3d9/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 4ae1ea4..32c21fe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.Printed;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
@@ -415,6 +416,11 @@ public class KStreamImplTest {
                         null);
     }
 
+    @SuppressWarnings("unchecked")
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnPrintIfPrintedIsNull() {
+        testStream.print((Printed) null);
+    }
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerOnThroughWhenProducedIsNull() {
@@ -426,7 +432,6 @@ public class KStreamImplTest {
         testStream.to("topic", null);
     }
 
-
     @Test
     public void shouldThrowNullPointerOnLeftJoinWithTableWhenJoinedIsNull() {
         final KTable<String, String> table = builder.table("blah", consumed);

http://git-wip-us.apache.org/repos/asf/kafka/blob/4769e3d9/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
index c9bbb07..e1a014d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.PrintForeachAction;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
@@ -61,7 +60,7 @@ public class KStreamPrintTest {
             }
         };
 
-        kStreamPrint = new KStreamPrint<>(new PrintForeachAction<>(printWriter,
mapper, "test-stream"), intSerd, stringSerd);
+        kStreamPrint = new KStreamPrint<>(new PrintForeachAction<>(printWriter,
mapper, "test-stream"));
 
         printProcessor = kStreamPrint.get();
         ProcessorContext processorContext = EasyMock.createNiceMock(ProcessorContext.class);
@@ -85,22 +84,6 @@ public class KStreamPrintTest {
         doTest(inputRecords, expectedResult);
     }
 
-    @Test
-    @SuppressWarnings("unchecked")
-    public void testPrintKeyValueStringBytesArray() {
-
-        // we don't have a topic name because we don't need it for the test at this level
-        final List<KeyValue<byte[], byte[]>> inputRecords = Arrays.asList(
-                new KeyValue<>(intSerd.serializer().serialize(null, 0), stringSerd.serializer().serialize(null,
"zero")),
-                new KeyValue<>(intSerd.serializer().serialize(null, 1), stringSerd.serializer().serialize(null,
"one")),
-                new KeyValue<>(intSerd.serializer().serialize(null, 2), stringSerd.serializer().serialize(null,
"two")),
-                new KeyValue<>(intSerd.serializer().serialize(null, 3), stringSerd.serializer().serialize(null,
"three")));
-
-        final String[] expectedResult = {"[test-stream]: 0, zero", "[test-stream]: 1, one",
"[test-stream]: 2, two", "[test-stream]: 3, three"};
-
-        doTest(inputRecords, expectedResult);
-    }
-
     private void assertFlushData(final String[] expectedResult, final ByteArrayOutputStream
byteOutStream) {
 
         final String[] flushOutDatas = new String(byteOutStream.toByteArray(), Charset.forName("UTF-8")).split("\\r*\\n");


Mime
View raw message