kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: KAFKA-4830; Augment KStream.print() to allow users pass in extra parameters in the printed string
Date Thu, 20 Jul 2017 13:47:44 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 84d2b6a01 -> fe4a469fc


KAFKA-4830; Augment KStream.print() to allow users pass in extra parameters in the printed
string

I extend `KStream#print()` to `KStream#print(KeyValueMapper<K, V, String>)`.
So I add the following methods :
1. `KStream#print(KeyValueMapper<K, V, String>)`
2. `KStream#print(KeyValueMapper<K, V, String>, String streamName)`
3. `KStream#print(KeyValueMapper<K, V, String>, Serde<K>, Serde<V>)`
4. `KStream#print(KeyValueMapper<K, V, String>, Serde<K>, Serde<V>, String
streamName)`

Author: jameschien <jameschien@staff.ruten.com.tw>
Author: jedichien <james.chain1990@gmail.com>
Author: JamesChien <jedichien@users.noreply.github.com>
Author: JamesChien <james.chain1990@gmail.com>

Reviewers: Bill Bejeck <bbejec@gmail.com>, Damian Guy <damian.guy@gmail.com>,
Guozhang Wang <wangguoz@gmail.com>

Closes #3085 from jedichien/KAFKA-4830


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fe4a469f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fe4a469f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fe4a469f

Branch: refs/heads/trunk
Commit: fe4a469fce5a47b89083781ed8a03d0d71428aeb
Parents: 84d2b6a
Author: jameschien <jameschien@staff.ruten.com.tw>
Authored: Thu Jul 20 14:47:38 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Thu Jul 20 14:47:38 2017 +0100

----------------------------------------------------------------------
 .../apache/kafka/streams/kstream/KStream.java   | 228 ++++++++++++++++++-
 .../apache/kafka/streams/kstream/KTable.java    |  16 +-
 .../streams/kstream/PrintForeachAction.java     |  16 +-
 .../streams/kstream/internals/KStreamImpl.java  |  93 +++++---
 .../streams/kstream/internals/KTableImpl.java   |  42 ++--
 .../kstream/internals/KStreamPrintTest.java     |  46 +++-
 6 files changed, 374 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fe4a469f/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 9637927..191931b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -286,9 +286,9 @@ public interface KStream<K, V> {
      * Implementors will need to override {@code toString()} for keys and values that are
not of type {@link String},
      * {@link Integer} etc. to get meaningful information.
      *
-     * @param streamName the name used to label the key/value pairs printed to the console
+     * @param label the name used to label the key/value pairs printed to the console
      */
-    void print(final String streamName);
+    void print(final String label);
 
     /**
      * Print the records of this stream to {@code System.out}.
@@ -318,11 +318,111 @@ public interface KStream<K, V> {
      *
      * @param keySerde   key serde used to deserialize key if type is {@code byte[]},
      * @param valSerde   value serde used to deserialize value if type is {@code byte[]},
-     * @param streamName the name used to label the key/value pairs printed to the console
+     * @param label the name used to label the key/value pairs printed to the console
      */
     void print(final Serde<K> keySerde,
                final Serde<V> valSerde,
-               final String streamName);
+               final String label);
+
+    /**
+     * Print the customized output with {@code System.out}.
+     * <p>
+     * The default serde will be use to deserialize key or value if type is {@code byte[]}.
+     * The user provided {@link KeyValueMapper} which customizes output is used to print
with {@code System.out}
+     * <p>
+     * The example below shows the way to customize output data.
+     * <pre>{@code
+     * final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer,
String, String>() {
+     *     @Override
+     *     public String apply(Integer key, String value) {
+     *         return String.format("(%d, %s)", key, value);
+     *     }
+     * };
+     * }</pre>
+     * <p>
+     * The KeyValueMapper's mapped value type must be {@code String}.
+     *
+     * @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
+     */
+    void print(final KeyValueMapper<? super K, ? super V, String> mapper);
+
+    /**
+     * Print the customized output with {@code System.out}.
+     * <p>
+     * The default serde will be used to deserialize key or value if type is {@code byte[]}.
+     * The user provided {@link KeyValueMapper} which customizes output is used to print
with {@code System.out}
+     * <p>
+     * The example below shows the way to customize output data.
+     * <pre>{@code
+     * final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer,
String, String>() {
+     *     @Override
+     *     public String apply(Integer key, String value) {
+     *         return String.format("(%d, %s)", key, value);
+     *     }
+     * };
+     * }</pre>
+     * <p>
+     * The KeyValueMapper's mapped value type must be {@code String}.
+     *
+     * @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
+     * @param label The given name which labels output will be printed.
+     */
+    void print(final KeyValueMapper<? super K, ? super V, String> mapper, final String
label);
+
+    /**
+     * Print the customized output with {@code System.out}.
+     * <p>
+     * The user provided {@link KeyValueMapper} which customizes output is used to print
with {@code System.out}
+     * The provided serde will be use to deserialize key or value if type is {@code byte[]}.
+     * <p>
+     * The example below shows the way to customize output data.
+     * <pre>{@code
+     * final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer,
String, String>() {
+     *     @Override
+     *     public String apply(Integer key, String value) {
+     *         return String.format("(%d, %s)", key, value);
+     *     }
+     * };
+     * }</pre>
+     * <p>
+     * The provided KeyValueMapper's mapped value type must be {@code String}.
+     * <p>
+     * Implementors will need to override {@code toString()} for keys and values that are
not of type {@link String},
+     * {@link Integer} etc. to get meaningful information.
+     *
+     * @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
+     * @param keySerde a {@link Serde<K>} used to deserialize key if type is {@code
byte[]}.
+     * @param valSerde a {@link Serde<V>} used to deserialize value if type is {@code
byte[]}.
+     */
+    void print(final KeyValueMapper<? super K, ? super V, String> mapper, final Serde<K>
keySerde, final Serde<V> valSerde);
+
+    /**
+     * Print the customized output with {@code System.out}.
+     * <p>
+     * The user provided {@link KeyValueMapper} which customizes output is used to print
with {@code System.out}.
+     * The provided serde will be use to deserialize key or value if type is {@code byte[]}.
+     * <p>
+     * The example below shows the way to customize output data.
+     * <pre>{@code
+     * final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer,
String, String>() {
+     *     @Override
+     *     public String apply(Integer key, String value) {
+     *         return String.format("(%d, %s)", key, value);
+     *     }
+     * };
+     * }</pre>
+     * <p>
+     * The provided KeyValueMapper's mapped value type must be {@code String}.
+     * <p>
+     * Implementors will need to override {@code toString()} for keys and values that are
not of type {@link String},
+     * {@link Integer} etc. to get meaningful information.
+     *
+     * @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
+     * @param keySerde a {@link Serde<K>} used to deserialize key if type is {@code
byte[]}.
+     * @param valSerde a {@link Serde<V>} used to deserialize value if type is {@code
byte[]}.
+     * @param label The given name which labels output will be printed.
+     */
+    void print(final KeyValueMapper<? super K, ? super V, String> mapper, final Serde<K>
keySerde, final Serde<V> valSerde, final String label);
 
     /**
      * Write the records of this stream to a file at the given path.
@@ -350,10 +450,10 @@ public interface KStream<K, V> {
      * {@link Integer} etc. to get meaningful information.
      *
      * @param filePath   name of the file to write to
-     * @param streamName the name used to label the key/value pairs written to the file
+     * @param label the name used to label the key/value pairs written to the file
      */
     void writeAsText(final String filePath,
-                     final String streamName);
+                     final String label);
 
     /**
      * Write the records of this stream to a file at the given path.
@@ -385,16 +485,128 @@ public interface KStream<K, V> {
      * {@link Integer} etc. to get meaningful information.
      *
      * @param filePath   name of the file to write to
-     * @param streamName the name used to label the key/value pairs written to the file
+     * @param label the name used to label the key/value pairs written to the file
      * @param keySerde   key serde used to deserialize key if type is {@code byte[]},
      * @param valSerde   value serde used deserialize value if type is {@code byte[]},
      */
     void writeAsText(final String filePath,
-                     final String streamName,
+                     final String label,
                      final Serde<K> keySerde,
                      final Serde<V> valSerde);
 
     /**
+     * Write the customised output to a given file path.
+     * <p>
+     * The user provided {@link KeyValueMapper} which customizes output is used to write
to file.
+     * This function will use default name of stream to label records.
+     * <p>
+     * The default key and value serde will used to deserialize {@code byte[]} records before
calling {@code toString()}.
+     * <p>
+     * The example below shows the way to customize output data.
+     * <pre>{@code
+     * final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer,
String, String>() {
+     *     @Override
+     *     public String apply(Integer key, String value) {
+     *         return String.format("(%d, %s)", key, value);
+     *     }
+     * };
+     * }</pre>
+     * <p>
+     * The KeyValueMapper's mapped value type must be {@code String}.
+     *
+     * @param filePath path of the file to write to.
+     * @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
+     */
+    void writeAsText(final String filePath, final KeyValueMapper<? super K, ? super V,
String> mapper);
+
+    /**
+     * Write the customised output to a given file path.
+     * <p>
+     * The user provided {@link KeyValueMapper} which customizes output is used to write
to file.
+     * This function will use given name of stream to label records.
+     * <p>
+     * The default key and value serde will used to deserialize {@code byte[]} records before
calling {@code toString()}.
+     * <p>
+     * The example below shows the way to customize output data.
+     * <pre>{@code
+     * final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer,
String, String>() {
+     *     @Override
+     *     public String apply(Integer key, String value) {
+     *         return String.format("(%d, %s)", key, value);
+     *     }
+     * };
+     * }</pre>
+     * <p>
+     * The KeyValueMapper's mapped value type must be {@code String}.
+     *
+     * @param filePath path of the file to write to.
+     * @param label the name used to label records written to file.
+     * @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
+     */
+    void writeAsText(final String filePath, final String label, final KeyValueMapper<?
super K, ? super V, String> mapper);
+
+    /**
+     * Write the customised output to a given file path.
+     * <p>
+     * The user provided {@link KeyValueMapper} which customizes output is used to write
to file.
+     * This function will use default name of stream to label records.
+     * <p>
+     * The given key and value serde will be used to deserialize {@code byte[]} records before
calling {@code toString()}.
+     * <p>
+     * The example below shows the way to customize output data.
+     * <pre>{@code
+     * final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer,
String, String>() {
+     *     @Override
+     *     public String apply(Integer key, String value) {
+     *         return String.format("(%d, %s)", key, value);
+     *     }
+     * };
+     * }</pre>
+     * <p>
+     * The KeyValueMapper's mapped value type must be {@code String}.
+     * <p>
+     * Implementors will need to override {@code toString()} for keys and values that are
not of type {@link String},
+     * {@link Integer} etc. to get meaningful information.
+     *
+     * @param filePath path of the file to write to.
+     * @param keySerde key serde used to deserialize key if type is {@code byte[]}.
+     * @param valSerde value serde used to deserialize value if type is {@code byte[]}.
+     * @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
+     */
+    void writeAsText(final String filePath, final Serde<K> keySerde, final Serde<V>
valSerde, final KeyValueMapper<? super K, ? super V, String> mapper);
+
+    /**
+     * Write the customised output to a given file path.
+     * <p>
+     * The user provided {@link KeyValueMapper} which customizes output is used to write
to file.
+     * This function will use given name of stream to label records.
+     * <p>
+     * The given key and value serde will be used to deserialize {@code byte[]} records before
calling {@code toString()}.
+     * <p>
+     * The example below shows the way to customize output data.
+     * <pre>{@code
+     * final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer,
String, String>() {
+     *     @Override
+     *     public String apply(Integer key, String value) {
+     *         return String.format("(%d, %s)", key, value);
+     *     }
+     * };
+     * }</pre>
+     * <p>
+     * The KeyValueMapper's mapped value type must be {@code String}.
+     * <p>
+     * Implementors will need to override {@code toString()} for keys and values that are
not of type {@link String},
+     * {@link Integer} etc. to get meaningful information.
+     *
+     * @param filePath path of the file to write to.
+     * @param label the name used to label records written to file.
+     * @param keySerde key serde used to deserialize key if type is {@code byte[]}.
+     * @param valSerde value serde used to deserialize value if type is {@code byte[]}.
+     * @param mapper a {@link KeyValueMapper} that computes output type {@code String}.
+     */
+    void writeAsText(final String filePath, final String label, final Serde<K> keySerde,
final Serde<V> valSerde, final KeyValueMapper<? super K, ? super V, String> mapper);
+
+    /**
      * Perform an action on each record of {@code KStream}.
      * This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier,
String...)}).
      * Note that this is a terminal operation that returns void.

http://git-wip-us.apache.org/repos/asf/kafka/blob/fe4a469f/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 7f8ab6a..b7dd43e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -415,13 +415,13 @@ public interface KTable<K, V> {
      * Note that {@code print()} is not applied to the internal state store and only called
for each new {@code KTable}
      * update record.
      *
-     * @param streamName the name used to label the key/value pairs printed to the console
+     * @param label the name used to label the key/value pairs printed to the console
      * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String,
QueryableStoreType) }
      * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable.
Alternatively
      * convert to a KStream using {@code toStream()} and then use {@link KStream#print(String)}
on the result.
      */
     @Deprecated
-    void print(final String streamName);
+    void print(final String label);
 
     /**
      * Print the update records of this {@code KTable} to {@code System.out}.
@@ -462,7 +462,7 @@ public interface KTable<K, V> {
      *
      * @param keySerde   key serde used to deserialize key if type is {@code byte[]},
      * @param valSerde   value serde used to deserialize value if type is {@code byte[]},
-     * @param streamName the name used to label the key/value pairs printed to the console
+     * @param label the name used to label the key/value pairs printed to the console
      * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String,
QueryableStoreType) }
      * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable.
Alternatively
      * convert to a KStream using {@code toStream()} and then use {@link KStream#print(Serde,
Serde, String)} on the result.
@@ -470,7 +470,7 @@ public interface KTable<K, V> {
     @Deprecated
     void print(final Serde<K> keySerde,
                final Serde<V> valSerde,
-               final String streamName);
+               final String label);
 
     /**
      * Write the update records of this {@code KTable} to a file at the given path.
@@ -508,14 +508,14 @@ public interface KTable<K, V> {
      * {@code KTable} update record.
      *
      * @param filePath   name of file to write to
-     * @param streamName the name used to label the key/value pairs printed out to the console
+     * @param label the name used to label the key/value pairs printed out to the console
      * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String,
QueryableStoreType) }
      * followed by {@link ReadOnlyKeyValueStore#all()}) to iterate over the keys of a KTable.
Alternatively
      * convert to a KStream using {@code toStream()} and then use {@link KStream#writeAsText(String,
String)}} on the result.
      */
     @Deprecated
     void writeAsText(final String filePath,
-                     final String streamName);
+                     final String label);
 
     /**
      * Write the update records of this {@code KTable} to a file at the given path.
@@ -557,7 +557,7 @@ public interface KTable<K, V> {
      * {@code KTable} update record.
      *
      * @param filePath name of file to write to
-     * @param streamName the name used to label the key/value pairs printed to the console
+     * @param label the name used to label the key/value pairs printed to the console
      * @param keySerde key serde used to deserialize key if type is {@code byte[]},
      * @param valSerde value serde used to deserialize value if type is {@code byte[]}
      * @deprecated Use the Interactive Queries APIs (e.g., {@link KafkaStreams#store(String,
QueryableStoreType) }
@@ -567,7 +567,7 @@ public interface KTable<K, V> {
      */
     @Deprecated
     void writeAsText(final String filePath,
-                     final String streamName,
+                     final String label,
                      final Serde<K> keySerde,
                      final Serde<V> valSerde);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fe4a469f/streams/src/main/java/org/apache/kafka/streams/kstream/PrintForeachAction.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/PrintForeachAction.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/PrintForeachAction.java
index 3eb6d80..5e8ec28 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/PrintForeachAction.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/PrintForeachAction.java
@@ -20,27 +20,29 @@ import java.io.PrintWriter;
 
 public class PrintForeachAction<K, V> implements ForeachAction<K, V> {
 
-    private final String streamName;
+    private final String label;
     private final PrintWriter printWriter;
-    
+    private final KeyValueMapper<? super K, ? super V, String> mapper;
     /**
-     * Print data message with given writer. The PrintWriter can be null in order to
+     * Print customized output with given writer. The PrintWriter can be null in order to
      * distinguish between {@code System.out} and the others. If the PrintWriter is {@code
PrintWriter(System.out)},
      * then it would close {@code System.out} output stream.
      * <p>
      * Afterall, not to pass in {@code PrintWriter(System.out)} but {@code null} instead.
      *
      * @param printWriter Use {@code System.out.println} if {@code null}.
-     * @param streamName The given name will be printed.
+     * @param mapper The mapper which can allow user to customize output will be printed.
+     * @param label The given name will be printed.
      */
-    public PrintForeachAction(final PrintWriter printWriter, final String streamName) {
+    public PrintForeachAction(final PrintWriter printWriter, final KeyValueMapper<? super
K, ? super V, String> mapper, final String label) {
         this.printWriter = printWriter;
-        this.streamName = streamName;
+        this.mapper = mapper;
+        this.label = label;
     }
 
     @Override
     public void apply(final K key, final V value) {
-        final String data = String.format("[%s]: %s, %s", streamName, key, value);
+        final String data = String.format("[%s]: %s", label, mapper.apply(key, value));
         if (printWriter == null) {
             System.out.println(data);
         } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fe4a469f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 9cf8b38..ba537c0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -103,6 +103,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements
KStream<K, V
 
     public static final String REPARTITION_TOPIC_SUFFIX = "-repartition";
 
+    private final KeyValueMapper<K, V, String> defaultKeyValueMapper;
 
     private final boolean repartitionRequired;
 
@@ -110,6 +111,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K>
implements KStream<K, V
                        boolean repartitionRequired) {
         super(topology, name, sourceNodes);
         this.repartitionRequired = repartitionRequired;
+        this.defaultKeyValueMapper = new KeyValueMapper<K, V, String>() {
+            @Override
+            public String apply(K key, V value) {
+                return String.format("%s, %s", key, value);
+            }
+        };
     }
 
     @Override
@@ -178,63 +185,98 @@ public class KStreamImpl<K, V> extends AbstractStream<K>
implements KStream<K, V
 
     @Override
     public void print() {
-        print(null, null, null);
+        print(defaultKeyValueMapper, null, null, this.name);
+    }
+
+    @Override
+    public void print(final String label) {
+        print(defaultKeyValueMapper, null, null, label);
     }
 
     @Override
-    public void print(String streamName) {
-        print(null, null, streamName);
+    public void print(final Serde<K> keySerde, final Serde<V> valSerde) {
+        print(defaultKeyValueMapper, keySerde, valSerde, this.name);
     }
 
     @Override
-    public void print(Serde<K> keySerde, Serde<V> valSerde) {
-        print(keySerde, valSerde, null);
+    public void print(final Serde<K> keySerde, final Serde<V> valSerde, final
String label) {
+        print(defaultKeyValueMapper, keySerde, valSerde, label);
     }
 
     @Override
-    public void print(Serde<K> keySerde, Serde<V> valSerde, String streamName)
{
+    public void print(final KeyValueMapper<? super K, ? super V, String> mapper) {
+        print(mapper, null, null, this.name);
+    }
+
+    @Override
+    public void print(final KeyValueMapper<? super K, ? super V, String> mapper, final
String label) {
+        print(mapper, null, null, label);
+    }
+
+    @Override
+    public void print(final KeyValueMapper<? super K, ? super V, String> mapper, final
Serde<K> keySerde, final Serde<V> valSerde) {
+        print(mapper, keySerde, valSerde, this.name);
+    }
+
+    @Override
+    public void print(KeyValueMapper<? super K, ? super V, String> mapper, final Serde<K>
keySerde, Serde<V> valSerde, final String label) {
+        Objects.requireNonNull(mapper, "mapper can't be null");
+        Objects.requireNonNull(label, "label can't be null");
         String name = topology.newName(PRINTING_NAME);
-        streamName = (streamName == null) ? this.name : streamName;
-        topology.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(null,
streamName), keySerde, valSerde), this.name);
+        topology.addProcessor(name, new KStreamPrint<>(new PrintForeachAction<>(null,
mapper, label), keySerde, valSerde), this.name);
     }
 
+    @Override
+    public void writeAsText(final String filePath) {
+        writeAsText(filePath, this.name, null, null, defaultKeyValueMapper);
+    }
 
     @Override
-    public void writeAsText(String filePath) {
-        writeAsText(filePath, null, null, null);
+    public void writeAsText(final String filePath, final String label) {
+        writeAsText(filePath, label, null, null, defaultKeyValueMapper);
     }
 
     @Override
-    public void writeAsText(String filePath, String streamName) {
-        writeAsText(filePath, streamName, null, null);
+    public void writeAsText(final String filePath, final Serde<K> keySerde, final Serde<V>
valSerde) {
+        writeAsText(filePath, this.name, keySerde, valSerde, defaultKeyValueMapper);
     }
 
+    @Override
+    public void writeAsText(final String filePath, final String label, final Serde<K>
keySerde, final Serde<V> valSerde) {
+        writeAsText(filePath, label, keySerde, valSerde, defaultKeyValueMapper);
+    }
 
     @Override
-    public void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde)
{
-        writeAsText(filePath, null, keySerde, valSerde);
+    public void writeAsText(final String filePath, final KeyValueMapper<? super K, ? super
V, String> mapper) {
+        writeAsText(filePath, this.name, null, null, mapper);
+    }
+
+    @Override
+    public void writeAsText(final String filePath, final String label, final KeyValueMapper<?
super K, ? super V, String> mapper) {
+        writeAsText(filePath, label, null, null, mapper);
+    }
+
+    @Override
+    public void writeAsText(final String filePath, final Serde<K> keySerde, final Serde<V>
valSerde, final KeyValueMapper<? super K, ? super V, String> mapper) {
+        writeAsText(filePath, this.name, keySerde, valSerde, mapper);
     }
 
-    /**
-     * @throws TopologyBuilderException if file is not found
-     */
     @Override
-    public void writeAsText(String filePath, String streamName, Serde<K> keySerde,
Serde<V> valSerde) {
+    public void writeAsText(final String filePath, final String label, final Serde<K>
keySerde, final Serde<V> valSerde, KeyValueMapper<? super K, ? super V, String>
mapper) {
         Objects.requireNonNull(filePath, "filePath can't be null");
+        Objects.requireNonNull(label, "label can't be null");
+        Objects.requireNonNull(mapper, "mapper can't be null");
         if (filePath.trim().isEmpty()) {
             throw new TopologyBuilderException("filePath can't be an empty string");
         }
-        String name = topology.newName(PRINTING_NAME);
-        streamName = (streamName == null) ? this.name : streamName;
+        final String name = topology.newName(PRINTING_NAME);
         try {
-            PrintWriter printWriter = null;
-            printWriter = new PrintWriter(filePath, StandardCharsets.UTF_8.name());
-            topology.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(printWriter,
streamName), keySerde, valSerde), this.name);
+            PrintWriter printWriter = new PrintWriter(filePath, StandardCharsets.UTF_8.name());
+            topology.addProcessor(name, new KStreamPrint<>(new PrintForeachAction<>(printWriter,
mapper, label), keySerde, valSerde), this.name);
         } catch (FileNotFoundException | UnsupportedEncodingException e) {
             String message = "Unable to write stream to file at [" + filePath + "] " + e.getMessage();
             throw new TopologyBuilderException(message);
         }
-
     }
 
     @Override
@@ -678,9 +720,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements
KStream<K, V
                                         this.repartitionRequired);
     }
 
-
-
-
     private static <K, V> StateStoreSupplier createWindowedStateStore(final JoinWindows
windows,
                                                                      final Serde<K>
keySerde,
                                                                      final Serde<V>
valueSerde,

http://git-wip-us.apache.org/repos/asf/kafka/blob/fe4a469f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 912f42c..679efe5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -75,6 +75,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements
KTable<K,
 
     private final ProcessorSupplier<?, ?> processorSupplier;
 
+    private final KeyValueMapper<K, V, String> defaultKeyValueMapper;
+
     private final String queryableStoreName;
     private final boolean isQueryable;
 
@@ -94,6 +96,12 @@ public class KTableImpl<K, S, V> extends AbstractStream<K>
implements KTable<K,
         this.keySerde = null;
         this.valSerde = null;
         this.isQueryable = isQueryable;
+        this.defaultKeyValueMapper = new KeyValueMapper<K, V, String>() {
+            @Override
+            public String apply(K key, V value) {
+                return String.format("%s, %s", key, value);
+            }
+        };
     }
 
     public KTableImpl(KStreamBuilder topology,
@@ -110,6 +118,12 @@ public class KTableImpl<K, S, V> extends AbstractStream<K>
implements KTable<K,
         this.keySerde = keySerde;
         this.valSerde = valSerde;
         this.isQueryable = isQueryable;
+        this.defaultKeyValueMapper = new KeyValueMapper<K, V, String>() {
+            @Override
+            public String apply(K key, V value) {
+                return String.format("%s, %s", key, value);
+            }
+        };
     }
 
     @Override
@@ -226,56 +240,56 @@ public class KTableImpl<K, S, V> extends AbstractStream<K>
implements KTable<K,
 
     @Override
     public void print() {
-        print(null, null, null);
+        print(null, null, this.name);
     }
 
     @Override
-    public void print(String streamName) {
-        print(null, null, streamName);
+    public void print(String label) {
+        print(null, null, label);
     }
 
     @Override
     public void print(Serde<K> keySerde, Serde<V> valSerde) {
-        print(keySerde, valSerde, null);
+        print(keySerde, valSerde, this.name);
     }
 
 
     @Override
-    public void print(Serde<K> keySerde, Serde<V> valSerde, String streamName)
{
+    public void print(Serde<K> keySerde, final Serde<V> valSerde, String label)
{
+        Objects.requireNonNull(label, "label can't be null");
         String name = topology.newName(PRINTING_NAME);
-        streamName = (streamName == null) ? this.name : streamName;
-        topology.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(null,
streamName), keySerde, valSerde), this.name);
+        topology.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(null,
defaultKeyValueMapper, label), keySerde, valSerde), this.name);
     }
 
     @Override
     public void writeAsText(String filePath) {
-        writeAsText(filePath, null, null, null);
+        writeAsText(filePath, this.name, null, null);
     }
 
     @Override
-    public void writeAsText(String filePath, String streamName) {
-        writeAsText(filePath, streamName, null, null);
+    public void writeAsText(String filePath, String label) {
+        writeAsText(filePath, label, null, null);
     }
 
     @Override
     public void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde)
{
-        writeAsText(filePath, null, keySerde, valSerde);
+        writeAsText(filePath, this.name, keySerde, valSerde);
     }
 
     /**
      * @throws TopologyBuilderException if file is not found
      */
     @Override
-    public void writeAsText(String filePath, String streamName, Serde<K> keySerde,
Serde<V> valSerde) {
+    public void writeAsText(String filePath, String label, Serde<K> keySerde, Serde<V>
valSerde) {
         Objects.requireNonNull(filePath, "filePath can't be null");
+        Objects.requireNonNull(label, "label can't be null");
         if (filePath.trim().isEmpty()) {
             throw new TopologyBuilderException("filePath can't be an empty string");
         }
         String name = topology.newName(PRINTING_NAME);
-        streamName = (streamName == null) ? this.name : streamName;
         try {
             PrintWriter printWriter = new PrintWriter(filePath, StandardCharsets.UTF_8.name());
-            topology.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(printWriter,
streamName), keySerde, valSerde), this.name);
+            topology.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(printWriter,
defaultKeyValueMapper, label), keySerde, valSerde), this.name);
         } catch (FileNotFoundException | UnsupportedEncodingException e) {
             String message = "Unable to write stream to file at [" + filePath + "] " + e.getMessage();
             throw new TopologyBuilderException(message);

http://git-wip-us.apache.org/repos/asf/kafka/blob/fe4a469f/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
index c537e0a..c94b868 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.PrintForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
@@ -63,7 +64,13 @@ public class KStreamPrintTest {
     
     @Test
     public void testPrintKeyValueWithName() {
-        final KStreamPrint<Integer, String> kStreamPrint = new KStreamPrint<>(new
PrintForeachAction(printWriter, "test-stream"), intSerd, stringSerd);
+        KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer,
String, String>() {
+            @Override
+            public String apply(Integer key, String value) {
+                return String.format("%d, %s", key, value);
+            }
+        };
+        final KStreamPrint<Integer, String> kStreamPrint = new KStreamPrint<>(new
PrintForeachAction<>(printWriter, mapper, "test-stream"), intSerd, stringSerd);
 
         final List<KeyValue<Integer, String>> inputRecords = Arrays.asList(
                 new KeyValue<>(0, "zero"),
@@ -82,9 +89,42 @@ public class KStreamPrintTest {
             driver.process(topicName, record.key, record.value);
         }
         printWriter.flush();
-        final String[] flushOutDatas = new String(byteOutStream.toByteArray(), Charset.forName("UTF-8")).split("\n");
+        final String[] flushOutDatas = new String(byteOutStream.toByteArray(), Charset.forName("UTF-8")).split("\\r*\\n");
         for (int i = 0; i < flushOutDatas.length; i++) {
-            assertEquals(flushOutDatas[i], expectedResult[i]);
+            assertEquals(expectedResult[i], flushOutDatas[i]);
+        }
+    }
+
+    @Test
+    public void testPrintStreamWithProvidedKeyValueMapper() {
+        final KeyValueMapper<Integer, String, String> mapper = new KeyValueMapper<Integer,
String, String>() {
+            @Override
+            public String apply(Integer key, String value) {
+                return String.format("(%d, %s)", key, value);
+            }
+        };
+        final KStreamPrint<Integer, String> kStreamPrint = new KStreamPrint<>(new
PrintForeachAction<>(printWriter, mapper, "test-stream"), intSerd, stringSerd);
+
+        final List<KeyValue<Integer, String>> inputRecords = Arrays.asList(
+                new KeyValue<>(0, "zero"),
+                new KeyValue<>(1, "one"),
+                new KeyValue<>(2, "two"),
+                new KeyValue<>(3, "three"));
+
+        final String[] expectedResult = {"[test-stream]: (0, zero)", "[test-stream]: (1,
one)", "[test-stream]: (2, two)", "[test-stream]: (3, three)"};
+
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KStream<Integer, String> stream = builder.stream(intSerd, stringSerd,
topicName);
+        stream.process(kStreamPrint);
+
+        driver = new KStreamTestDriver(builder);
+        for (KeyValue<Integer, String> record: inputRecords) {
+            driver.process(topicName, record.key, record.value);
+        }
+        printWriter.flush();
+        final String[] results = new String(byteOutStream.toByteArray(), Charset.forName("UTF-8")).split("\\r*\\n");
+        for (int i = 0; i < results.length; i++) {
+            assertEquals(expectedResult[i], results[i]);
         }
     }
 


Mime
View raw message