kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3794: added stream / table names as prefix to print / writeAsText
Date Wed, 06 Jul 2016 17:46:39 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 15e008783 -> b64302f9c


KAFKA-3794: added stream / table names as prefix to print / writeAsText

…int to the console.

Author: bbejeck <bbejeck@gmail.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #1577 from bbejeck/KAFKA-3794-add-prefix-to-print-functions


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b64302f9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b64302f9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b64302f9

Branch: refs/heads/trunk
Commit: b64302f9cea0e3e95d740cbe7bbd4c64e147301c
Parents: 15e0087
Author: Bill Bejeck <bbejeck@gmail.com>
Authored: Wed Jul 6 10:46:36 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Jul 6 10:46:36 2016 -0700

----------------------------------------------------------------------
 .../apache/kafka/streams/kstream/KStream.java   | 71 +++++++++++++++++---
 .../apache/kafka/streams/kstream/KTable.java    | 54 ++++++++++++++-
 .../streams/kstream/internals/KStreamImpl.java  | 34 ++++++++--
 .../streams/kstream/internals/KTableImpl.java   | 34 ++++++++--
 .../kstream/internals/KeyValuePrinter.java      | 24 +++----
 .../internals/KeyValuePrinterProcessorTest.java | 32 +++++++--
 6 files changed, 213 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b64302f9/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 3ac0284..8133aa7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -91,38 +91,79 @@ public interface KStream<K, V> {
     <V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper);
 
     /**
-     * Print the elements of this stream to System.out
+     * Print the elements of this stream to {@code System.out}.  This function
+     * will use the generated name of the parent processor node to label the key/value pairs
+     * printed out to the console.
      *
      * Implementors will need to override toString for keys and values that are not of
      * type String, Integer etc to get meaningful information.
      */
     void print();
 
+    /**
+     * Print the elements of this stream to {@code System.out}.  This function
+     * will use the given name to label the key/value printed out to the console.
+     *
+     * @param streamName the name used to label the key/value pairs printed out to the console
+     *
+     * Implementors will need to override toString for keys and values that are not of
+     * type String, Integer etc to get meaningful information.
+     */
+    void print(String streamName);
+
 
     /**
-     * Print the elements of this stream to System.out
+     * Print the elements of this stream to System.out.  This function
+     * will use the generated name of the parent processor node to label the key/value pairs
+     * printed out to the console.
      *
      * @param keySerde key serde used to send key-value pairs,
      *                 if not specified the default serde defined in the configs will be
used
      * @param valSerde value serde used to send key-value pairs,
      *                 if not specified the default serde defined in the configs will be
used
      *
-     *                 Implementors will need to override toString for keys and values that
are not of
-     *                 type String, Integer etc to get meaningful information.
+     * Implementors will need to override toString for keys and values that are not of
+     * type String, Integer etc to get meaningful information.
      */
     void print(Serde<K> keySerde, Serde<V> valSerde);
 
+    /**
+     * Print the elements of this stream to System.out
+     *
+     * @param keySerde key serde used to send key-value pairs,
+     *                 if not specified the default serde defined in the configs will be
used
+     * @param valSerde value serde used to send key-value pairs,
+     *                 if not specified the default serde defined in the configs will be
used
+     * @param streamName the name used to label the key/value pairs printed out to the console
+     *
+     * Implementors will need to override {@code toString} for keys and values that are not
of
+     * type {@link String}, {@link Integer} etc. to get meaningful information.
+     */
+    void print(Serde<K> keySerde, Serde<V> valSerde, String streamName);
+
 
     /**
      * Write the elements of this stream to a file at the given path.
      *
      * @param filePath name of file to write to
      *
-     *                 Implementors will need to override toString for keys and values that
are not of
-     *                 type String, Integer etc to get meaningful information.
+     * Implementors will need to override {@code toString} for keys and values that are not
of
+     * type {@link String}, {@link Integer} etc. to get meaningful information.
      */
     void writeAsText(String filePath);
 
+
+    /**
+     * Write the elements of this stream to a file at the given path.
+     *
+     * @param filePath name of file to write to
+     * @param streamName the name used to label the key/value pairs printed out to the console
+     *
+     * Implementors will need to override {@code toString} for keys and values that are not
of
+     * type {@link String}, {@link Integer} etc. to get meaningful information.
+     */
+    void writeAsText(String filePath, String streamName);
+
     /**
      * @param filePath name of file to write to
      * @param keySerde key serde used to send key-value pairs,
@@ -130,13 +171,27 @@ public interface KStream<K, V> {
      * @param valSerde value serde used to send key-value pairs,
      *                 if not specified the default serde defined in the configs will be
used
      *
-     *                 Implementors will need to override toString for keys and values that
are not of
-     *                 type String, Integer etc to get meaningful information.
+     * Implementors will need to override {@code toString} for keys and values that are not
of
+     * type {@link String}, {@link Integer} etc. to get meaningful information.
      */
 
     void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde);
 
     /**
+     * @param filePath name of file to write to
+     * @param streamName the name used to label the key/value pairs printed out to the console
+     * @param keySerde key serde used to send key-value pairs,
+     *                 if not specified the default serde defined in the configs will be
used
+     * @param valSerde value serde used to send key-value pairs,
+     *                 if not specified the default serde defined in the configs will be
used
+     *
+     * Implementors will need to override {@code toString} for keys and values that are not
of
+     * type {@link String}, {@link Integer} etc. to get meaningful information.
+     */
+
+    void writeAsText(String filePath, String streamName, Serde<K> keySerde, Serde<V>
valSerde);
+
+    /**
      * Create a new instance of {@link KStream} by transforming each element in this stream
into zero or more elements in the new stream.
      *
      * @param mapper        the instance of {@link KeyValueMapper}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b64302f9/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 50d0595..c16b3d2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -71,7 +71,9 @@ public interface KTable<K, V> {
 
 
     /**
-     * Print the elements of this stream to {@code System.out}
+     * Print the elements of this stream to {@code System.out}. This function
+     * will use the generated name of the parent processor node to label the key/value pairs
+     * printed out to the console.
      *
      * Implementors will need to override toString for keys and values that are not of
      * type String, Integer etc to get meaningful information.
@@ -79,6 +81,17 @@ public interface KTable<K, V> {
     void print();
 
     /**
+     * Print the elements of this stream to {@code System.out}.  This function
+     * will use the given name to label the key/value printed out to the console.
+     *
+     * @param streamName the name used to label the key/value pairs printed out to the console
+     *
+     * Implementors will need to override toString for keys and values that are not of
+     * type String, Integer etc to get meaningful information.
+     */
+    void print(String streamName);
+
+    /**
      * Print the elements of this stream to {@code System.out}
      * @param keySerde key serde used to send key-value pairs,
      *                 if not specified the default serde defined in the configs will be
used
@@ -91,6 +104,20 @@ public interface KTable<K, V> {
     void print(Serde<K> keySerde, Serde<V> valSerde);
 
     /**
+     * Print the elements of this stream to System.out
+     *
+     * @param keySerde key serde used to send key-value pairs,
+     *                 if not specified the default serde defined in the configs will be
used
+     * @param valSerde value serde used to send key-value pairs,
+     *                 if not specified the default serde defined in the configs will be
used
+     * @param streamName the name used to label the key/value pairs printed out to the console
+     *
+     * Implementors will need to override toString for keys and values that are not of
+     * type String, Integer etc to get meaningful information.
+     */
+    void print(Serde<K> keySerde, Serde<V> valSerde, String streamName);
+
+    /**
      * Write the elements of this stream to a file at the given path using default serializers
and deserializers.
      * @param filePath name of file to write to
      *
@@ -103,6 +130,17 @@ public interface KTable<K, V> {
      * Write the elements of this stream to a file at the given path.
      *
      * @param filePath name of file to write to
+     * @param streamName the name used to label the key/value pairs printed out to the console
+     *
+     * Implementors will need to override {@code toString} for keys and values that are not
of
+     * type {@link String}, {@link Integer} etc. to get meaningful information.
+     */
+    void writeAsText(String filePath, String streamName);
+
+    /**
+     * Write the elements of this stream to a file at the given path.
+     *
+     * @param filePath name of file to write to
      * @param keySerde key serde used to send key-value pairs,
      *                 if not specified the default serde defined in the configs will be
used
      * @param valSerde value serde used to send key-value pairs,
@@ -114,6 +152,20 @@ public interface KTable<K, V> {
     void  writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde);
 
     /**
+     * @param filePath name of file to write to
+     * @param streamName the name used to label the key/value pairs printed out to the console
+     * @param keySerde key serde used to send key-value pairs,
+     *                 if not specified the default serde defined in the configs will be
used
+     * @param valSerde value serde used to send key-value pairs,
+     *                 if not specified the default serde defined in the configs will be
used
+     *
+     * Implementors will need to override {@code toString} for keys and values that are not
of
+     * type {@link String}, {@link Integer} etc. to get meaningful information.
+     */
+
+    void writeAsText(String filePath, String streamName, Serde<K> keySerde, Serde<V>
valSerde);
+
+    /**
      * Materialize this stream to a topic, also creates a new instance of {@link KTable}
from the topic
      * using default serializers and deserializers and producer's {@link DefaultPartitioner}.
      * This is equivalent to calling {@link #to(String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#table(String)}.

http://git-wip-us.apache.org/repos/asf/kafka/blob/b64302f9/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 52b1c7b..b79532e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -158,36 +158,60 @@ public class KStreamImpl<K, V> extends AbstractStream<K>
implements KStream<K, V
 
     @Override
     public void print() {
-        print(null, null);
+        print(null, null, null);
+    }
+
+    @Override
+    public void print(String streamName) {
+        print(null, null, streamName);
     }
 
     @Override
     public void print(Serde<K> keySerde, Serde<V> valSerde) {
+        print(keySerde, valSerde, null);
+    }
+
+    @Override
+    public void print(Serde<K> keySerde, Serde<V> valSerde, String streamName)
{
         String name = topology.newName(PRINTING_NAME);
-        topology.addProcessor(name, new KeyValuePrinter<>(keySerde, valSerde), this.name);
+        streamName = (streamName == null) ? this.name : streamName;
+        topology.addProcessor(name, new KeyValuePrinter<>(keySerde, valSerde, streamName),
this.name);
     }
 
 
     @Override
     public void writeAsText(String filePath) {
-        writeAsText(filePath, null, null);
+        writeAsText(filePath, null, null, null);
+    }
+
+    @Override
+    public void writeAsText(String filePath, String streamName) {
+        writeAsText(filePath, streamName, null, null);
+    }
+
+
+    @Override
+    public void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde)
{
+        writeAsText(filePath, null, keySerde, valSerde);
     }
 
     /**
      * @throws TopologyBuilderException if file is not found
      */
     @Override
-    public void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde)
{
+    public void writeAsText(String filePath, String streamName, Serde<K> keySerde,
Serde<V> valSerde) {
         String name = topology.newName(PRINTING_NAME);
+        streamName = (streamName == null) ? this.name : streamName;
         try {
 
             PrintStream printStream = new PrintStream(new FileOutputStream(filePath));
-            topology.addProcessor(name, new KeyValuePrinter<>(printStream, keySerde,
valSerde), this.name);
+            topology.addProcessor(name, new KeyValuePrinter<>(printStream, keySerde,
valSerde, streamName), this.name);
 
         } catch (FileNotFoundException e) {
             String message = "Unable to write stream to file at [" + filePath + "] " + e.getMessage();
             throw new TopologyBuilderException(message);
         }
+
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/b64302f9/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index c5543ad..49d2762 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -131,30 +131,52 @@ public class KTableImpl<K, S, V> extends AbstractStream<K>
implements KTable<K,
 
     @Override
     public void print() {
-        print(null, null);
+        print(null, null, null);
+    }
+
+    @Override
+    public void print(String streamName) {
+        print(null, null, streamName);
     }
 
     @Override
     public void print(Serde<K> keySerde, Serde<V> valSerde) {
-        String name = topology.newName(PRINTING_NAME);
-        topology.addProcessor(name, new KeyValuePrinter<>(keySerde, valSerde), this.name);
+        print(keySerde, valSerde, null);
     }
 
 
     @Override
+    public void print(Serde<K> keySerde, Serde<V> valSerde, String streamName)
{
+        String name = topology.newName(PRINTING_NAME);
+        streamName = (streamName == null) ? this.name : streamName;
+        topology.addProcessor(name, new KeyValuePrinter<>(keySerde, valSerde, streamName),
this.name);
+    }
+
+    @Override
     public void writeAsText(String filePath) {
-        writeAsText(filePath, null, null);
+        writeAsText(filePath, null, null, null);
+    }
+
+    @Override
+    public void writeAsText(String filePath, String streamName) {
+        writeAsText(filePath, streamName, null, null);
+    }
+
+    @Override
+    public void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde)
{
+        writeAsText(filePath, null, keySerde, valSerde);
     }
 
     /**
      * @throws TopologyBuilderException if file is not found
      */
     @Override
-    public void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde)
{
+    public void writeAsText(String filePath, String streamName, Serde<K> keySerde,
Serde<V> valSerde) {
         String name = topology.newName(PRINTING_NAME);
+        streamName = (streamName == null) ? this.name : streamName;
         try {
             PrintStream printStream = new PrintStream(new FileOutputStream(filePath));
-            topology.addProcessor(name, new KeyValuePrinter<>(printStream, keySerde,
valSerde), this.name);
+            topology.addProcessor(name, new KeyValuePrinter<>(printStream, keySerde,
valSerde, streamName), this.name);
         } catch (FileNotFoundException e) {
             String message = "Unable to write stream to file at [" + filePath + "] " + e.getMessage();
             throw new TopologyBuilderException(message);

http://git-wip-us.apache.org/repos/asf/kafka/blob/b64302f9/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java
index d1c1d8b..30203ca 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java
@@ -33,11 +33,13 @@ class KeyValuePrinter<K, V> implements ProcessorSupplier<K, V>
{
     private Serde<?> keySerde;
     private Serde<?> valueSerde;
     private boolean notStandardOut;
+    private String streamName;
 
 
-    KeyValuePrinter(PrintStream printStream, Serde<?> keySerde, Serde<?> valueSerde)
{
+    KeyValuePrinter(PrintStream printStream, Serde<?> keySerde, Serde<?> valueSerde,
String streamName) {
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;
+        this.streamName = streamName;
         if (printStream == null) {
             this.printStream = System.out;
         } else {
@@ -46,21 +48,17 @@ class KeyValuePrinter<K, V> implements ProcessorSupplier<K, V>
{
         }
     }
 
-    KeyValuePrinter(PrintStream printStream) {
-        this(printStream, null, null);
+    KeyValuePrinter(PrintStream printStream, String streamName) {
+        this(printStream, null, null, streamName);
     }
 
-    KeyValuePrinter(Serde<?> keySerde, Serde<?> valueSerde) {
-        this(System.out, keySerde, valueSerde);
-    }
-
-    KeyValuePrinter() {
-        this(System.out, null, null);
+    KeyValuePrinter(Serde<?> keySerde, Serde<?> valueSerde, String streamName)
{
+        this(System.out, keySerde, valueSerde, streamName);
     }
 
     @Override
     public Processor<K, V> get() {
-        return new KeyValuePrinterProcessor(this.printStream, this.keySerde, this.valueSerde);
+        return new KeyValuePrinterProcessor(this.printStream, this.keySerde, this.valueSerde,
this.streamName);
     }
 
 
@@ -69,11 +67,13 @@ class KeyValuePrinter<K, V> implements ProcessorSupplier<K, V>
{
         private Serde<?> keySerde;
         private Serde<?> valueSerde;
         private ProcessorContext processorContext;
+        private String streamName;
 
-        private KeyValuePrinterProcessor(PrintStream printStream, Serde<?> keySerde,
Serde<?> valueSerde) {
+        private KeyValuePrinterProcessor(PrintStream printStream, Serde<?> keySerde,
Serde<?> valueSerde, String streamName) {
             this.printStream = printStream;
             this.keySerde = keySerde;
             this.valueSerde = valueSerde;
+            this.streamName = streamName;
         }
 
         @Override
@@ -94,7 +94,7 @@ class KeyValuePrinter<K, V> implements ProcessorSupplier<K, V>
{
             K keyToPrint = (K) maybeDeserialize(key, keySerde.deserializer());
             V valueToPrint = (V) maybeDeserialize(value, valueSerde.deserializer());
 
-            printStream.println(keyToPrint + " , " + valueToPrint);
+            printStream.println("[" + this.streamName + "]: " + keyToPrint + " , " + valueToPrint);
 
             this.processorContext.forward(key, value);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b64302f9/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java
index c8707af..d1bff05 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java
@@ -57,10 +57,34 @@ public class KeyValuePrinterProcessorTest {
     @Test
     public void testPrintKeyValueDefaultSerde() throws Exception {
 
-        KeyValuePrinter<String, String> keyValuePrinter = new KeyValuePrinter<>(printStream);
+        KeyValuePrinter<String, String> keyValuePrinter = new KeyValuePrinter<>(printStream,
null);
         String[] suppliedKeys = {"foo", "bar", null};
         String[] suppliedValues = {"value1", "value2", "value3"};
-        String[] expectedValues = {"foo , value1", "bar , value2", "null , value3"};
+        String[] expectedValues = {"[null]: foo , value1", "[null]: bar , value2", "[null]:
null , value3"};
+
+
+        KStream<String, String> stream = builder.stream(stringSerde, stringSerde, topicName);
+        stream.process(keyValuePrinter);
+
+        driver = new KStreamTestDriver(builder);
+        for (int i = 0; i < suppliedKeys.length; i++) {
+            driver.process(topicName, suppliedKeys[i], suppliedValues[i]);
+        }
+
+        String[] capturedValues = new String(baos.toByteArray(), Charset.forName("UTF-8")).split("\n");
+
+        for (int i = 0; i < capturedValues.length; i++) {
+            assertEquals(capturedValues[i], expectedValues[i]);
+        }
+    }
+
+    @Test
+    public void testPrintKeyValuesWithName() throws Exception {
+
+        KeyValuePrinter<String, String> keyValuePrinter = new KeyValuePrinter<>(printStream,
"test-stream");
+        String[] suppliedKeys = {"foo", "bar", null};
+        String[] suppliedValues = {"value1", "value2", "value3"};
+        String[] expectedValues = {"[test-stream]: foo , value1", "[test-stream]: bar , value2",
"[test-stream]: null , value3"};
 
 
         KStream<String, String> stream = builder.stream(stringSerde, stringSerde, topicName);
@@ -83,7 +107,7 @@ public class KeyValuePrinterProcessorTest {
     public void testPrintKeyValueWithProvidedSerde() throws Exception {
 
         Serde<MockObject> mockObjectSerde = Serdes.serdeFrom(new MockSerializer(),
new MockDeserializer());
-        KeyValuePrinter<String, MockObject> keyValuePrinter = new KeyValuePrinter<>(printStream,
stringSerde, mockObjectSerde);
+        KeyValuePrinter<String, MockObject> keyValuePrinter = new KeyValuePrinter<>(printStream,
stringSerde, mockObjectSerde, null);
         KStream<String, MockObject> stream = builder.stream(stringSerde, mockObjectSerde,
topicName);
 
         stream.process(keyValuePrinter);
@@ -94,7 +118,7 @@ public class KeyValuePrinterProcessorTest {
         byte[] suppliedValue = "{\"name\":\"print\", \"label\":\"test\"}".getBytes(Charset.forName("UTF-8"));
 
         driver.process(topicName, suppliedKey, suppliedValue);
-        String expectedPrintedValue = "null , name:print label:test";
+        String expectedPrintedValue = "[null]: null , name:print label:test";
         String capturedValue = new String(baos.toByteArray(), Charset.forName("UTF-8")).trim();
 
         assertEquals(capturedValue, expectedPrintedValue);


Mime
View raw message