kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3338: Add print and writeAsText to KStream/KTable in Kafka Streams
Date Mon, 11 Apr 2016 00:43:51 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 8c5956576 -> 7c2798986


KAFKA-3338: Add print and writeAsText to KStream/KTable in Kafka Streams

    Addresses comments from previous PR [#1187]
    Changed print and writeAsText method return signature to void
    Flush System.out on close
    Changed IllegalStateException to TopologyBuilderException
    Updated MockProcessorContext.topic method to return a String
    Renamed KStreamPrinter to KeyValuePrinter
    Updated the printing of null keys to 'null' to match ConsoleConsumer
    Updated JavaDoc stating need to override toString

Author: bbejeck <bbejeck@gmail.com>

Reviewers: Dan Norwood, Guozhang Wang

Closes #1209 from bbejeck/KAFKA-3338_Adding_print/writeAsText_to_Streams_DSL


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7c279898
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7c279898
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7c279898

Branch: refs/heads/trunk
Commit: 7c27989860e7e063bfec3c679cb6bc8fd52abc84
Parents: 8c59565
Author: bbejeck <bbejeck@gmail.com>
Authored: Sun Apr 10 17:43:47 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Sun Apr 10 17:43:47 2016 -0700

----------------------------------------------------------------------
 .../apache/kafka/streams/kstream/KStream.java   |  46 ++++++
 .../apache/kafka/streams/kstream/KTable.java    |  43 +++++
 .../streams/kstream/internals/KStreamImpl.java  |  40 ++++-
 .../streams/kstream/internals/KTableImpl.java   |  38 ++++-
 .../kstream/internals/KeyValuePrinter.java      | 124 ++++++++++++++
 .../internals/KeyValuePrinterProcessorTest.java | 165 +++++++++++++++++++
 .../apache/kafka/test/MockProcessorContext.java |   2 +-
 7 files changed, 454 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7c279898/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index a55e726..27475aa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -64,6 +64,52 @@ public interface KStream<K, V> {
     <V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper);
 
     /**
+     * Print the elements of this stream to System.out
+     *
+     * Implementors will need to override toString for keys and values that are not of
+     * type String, Integer etc to get meaningful information.
+     */
+    void print();
+
+
+    /**
+     * Print the elements of this stream to System.out
+     *
+     * @param keySerde key serde used to send key-value pairs,
+     *                 if not specified the default serde defined in the configs will be
used
+     * @param valSerde value serde used to send key-value pairs,
+     *                 if not specified the default serde defined in the configs will be
used
+     *
+     *                 Implementors will need to override toString for keys and values that
are not of
+     *                 type String, Integer etc to get meaningful information.
+     */
+    void print(Serde<K> keySerde, Serde<V> valSerde);
+
+
+    /**
+     * Write the elements of this stream to a file at the given path.
+     *
+     * @param filePath name of file to write to
+     *
+     *                 Implementors will need to override toString for keys and values that
are not of
+     *                 type String, Integer etc to get meaningful information.
+     */
+    void writeAsText(String filePath);
+
+    /**
+     * @param filePath name of file to write to
+     * @param keySerde key serde used to send key-value pairs,
+     *                 if not specified the default serde defined in the configs will be
used
+     * @param valSerde value serde used to send key-value pairs,
+     *                 if not specified the default serde defined in the configs will be
used
+     *
+     *                 Implementors will need to override toString for keys and values that
are not of
+     *                 type String, Integer etc to get meaningful information.
+     */
+
+    void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde);
+
+    /**
      * Create a new instance of {@link KStream} by transforming each element in this stream
into zero or more elements in the new stream.
      *
      * @param mapper        the instance of {@link KeyValueMapper}

http://git-wip-us.apache.org/repos/asf/kafka/blob/7c279898/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 1f6ee68..bb6878f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -53,6 +53,49 @@ public interface KTable<K, V> {
      */
     <V1> KTable<K, V1> mapValues(ValueMapper<V, V1> mapper);
 
+
+    /**
+     * Print the elements of this stream to System.out
+     *
+     * Implementors will need to override toString for keys and values that are not of
+     * type String, Integer etc to get meaningful information.
+     */
+    void print();
+
+    /**
+     * Print the elements of this stream to System.out
+     * @param keySerde key serde used to send key-value pairs,
+     *                 if not specified the default serde defined in the configs will be
used
+     * @param valSerde value serde used to send key-value pairs,
+     *                 if not specified the default serde defined in the configs will be
used
+     *
+     * Implementors will need to override toString for keys and values that are not of
+     * type String, Integer etc to get meaningful information.
+     */
+    void print(Serde<K> keySerde, Serde<V> valSerde);
+
+    /**
+     * Write the elements of this stream to a file at the given path.
+     * @param filePath name of file to write to
+     *
+     * Implementors will need to override toString for keys and values that are not of
+     * type String, Integer etc to get meaningful information.
+     */
+    void writeAsText(String filePath);
+
+    /**
+     *
+     * @param filePath name of file to write to
+     * @param keySerde key serde used to send key-value pairs,
+     *                 if not specified the default serde defined in the configs will be
used
+     * @param valSerde value serde used to send key-value pairs,
+     *                 if not specified the default serde defined in the configs will be
used
+     *
+     * Implementors will need to override toString for keys and values that are not of
+     * type String, Integer etc to get meaningful information.
+     */
+    void  writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde);
+
     /**
      * Materialize this stream to a topic, also creates a new instance of {@link KTable}
from the topic
      * using default serializers and deserializers and producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.

http://git-wip-us.apache.org/repos/asf/kafka/blob/7c279898/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index c266328..9707aee 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -20,13 +20,14 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -42,7 +43,9 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.state.Stores;
-
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
 import java.lang.reflect.Array;
 import java.util.HashSet;
 import java.util.Set;
@@ -79,6 +82,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements
KStream<K, V
 
     private static final String PROCESSOR_NAME = "KSTREAM-PROCESSOR-";
 
+    private static final String PRINTING_NAME = "KSTREAM-PRINTER-";
+
     private static final String REDUCE_NAME = "KSTREAM-REDUCE-";
 
     private static final String SELECT_NAME = "KSTREAM-SELECT-";
@@ -136,6 +141,37 @@ public class KStreamImpl<K, V> extends AbstractStream<K>
implements KStream<K, V
     }
 
     @Override
+    public void print() {
+        print(null, null);
+    }
+
+    @Override
+    public void print(Serde<K> keySerde, Serde<V> valSerde) {
+        String name = topology.newName(PRINTING_NAME);
+        topology.addProcessor(name, new KeyValuePrinter<>(keySerde, valSerde), this.name);
+    }
+
+
+    @Override
+    public void writeAsText(String filePath) {
+        writeAsText(filePath, null, null);
+    }
+
+    @Override
+    public void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde)
{
+        String name = topology.newName(PRINTING_NAME);
+        try {
+
+            PrintStream printStream = new PrintStream(new FileOutputStream(filePath));
+            topology.addProcessor(name, new KeyValuePrinter<>(printStream, keySerde,
valSerde), this.name);
+
+        } catch (FileNotFoundException e) {
+            String message = "Unable to write stream to file at [" + filePath + "] " + e.getMessage();
+            throw new TopologyBuilderException(message);
+        }
+    }
+
+    @Override
     public <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1,
V1>>> mapper) {
         String name = topology.newName(FLATMAP_NAME);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7c279898/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 8de9a0b..adc8b91 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -19,13 +19,14 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.Reducer;
@@ -36,6 +37,9 @@ import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.state.Stores;
 
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
 import java.util.Collections;
 import java.util.Set;
 
@@ -69,6 +73,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements
KTable<K,
 
     public static final String OUTEROTHER_NAME = "KTABLE-OUTEROTHER-";
 
+    private static final String PRINTING_NAME = "KSTREAM-PRINTER-";
+
     private static final String REDUCE_NAME = "KTABLE-REDUCE-";
 
     private static final String SELECT_NAME = "KTABLE-SELECT-";
@@ -135,6 +141,36 @@ public class KTableImpl<K, S, V> extends AbstractStream<K>
implements KTable<K,
     }
 
     @Override
+    public void print() {
+        print(null, null);
+    }
+
+    @Override
+    public void print(Serde<K> keySerde, Serde<V> valSerde) {
+        String name = topology.newName(PRINTING_NAME);
+        topology.addProcessor(name, new KeyValuePrinter<>(keySerde, valSerde), this.name);
+    }
+
+
+    @Override
+    public void writeAsText(String filePath) {
+        writeAsText(filePath, null, null);
+    }
+
+    @Override
+    public void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde)
{
+        String name = topology.newName(PRINTING_NAME);
+        try {
+            PrintStream printStream = new PrintStream(new FileOutputStream(filePath));
+            topology.addProcessor(name, new KeyValuePrinter<>(printStream, keySerde,
valSerde), this.name);
+        } catch (FileNotFoundException e) {
+            String message = "Unable to write stream to file at [" + filePath + "] " + e.getMessage();
+            throw new TopologyBuilderException(message);
+        }
+    }
+
+
+    @Override
     public KTable<K, V> through(Serde<K> keySerde,
                                 Serde<V> valSerde,
                                 StreamPartitioner<K, V> partitioner,

http://git-wip-us.apache.org/repos/asf/kafka/blob/7c279898/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java
new file mode 100644
index 0000000..d1c1d8b
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.io.PrintStream;
+
+
+class KeyValuePrinter<K, V> implements ProcessorSupplier<K, V> {
+
+    private final PrintStream printStream;
+    private Serde<?> keySerde;
+    private Serde<?> valueSerde;
+    private boolean notStandardOut;
+
+
+    KeyValuePrinter(PrintStream printStream, Serde<?> keySerde, Serde<?> valueSerde)
{
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
+        if (printStream == null) {
+            this.printStream = System.out;
+        } else {
+            this.printStream = printStream;
+            notStandardOut = true;
+        }
+    }
+
+    KeyValuePrinter(PrintStream printStream) {
+        this(printStream, null, null);
+    }
+
+    KeyValuePrinter(Serde<?> keySerde, Serde<?> valueSerde) {
+        this(System.out, keySerde, valueSerde);
+    }
+
+    KeyValuePrinter() {
+        this(System.out, null, null);
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KeyValuePrinterProcessor(this.printStream, this.keySerde, this.valueSerde);
+    }
+
+
+    private class KeyValuePrinterProcessor extends AbstractProcessor<K, V> {
+        private final PrintStream printStream;
+        private Serde<?> keySerde;
+        private Serde<?> valueSerde;
+        private ProcessorContext processorContext;
+
+        private KeyValuePrinterProcessor(PrintStream printStream, Serde<?> keySerde,
Serde<?> valueSerde) {
+            this.printStream = printStream;
+            this.keySerde = keySerde;
+            this.valueSerde = valueSerde;
+        }
+
+        @Override
+        public void init(ProcessorContext context) {
+            this.processorContext = context;
+
+            if (this.keySerde == null) {
+                keySerde = this.processorContext.keySerde();
+            }
+
+            if (this.valueSerde == null) {
+                valueSerde = this.processorContext.valueSerde();
+            }
+        }
+
+        @Override
+        public void process(K key, V value) {
+            K keyToPrint = (K) maybeDeserialize(key, keySerde.deserializer());
+            V valueToPrint = (V) maybeDeserialize(value, valueSerde.deserializer());
+
+            printStream.println(keyToPrint + " , " + valueToPrint);
+
+            this.processorContext.forward(key, value);
+        }
+
+
+        private Object maybeDeserialize(Object receivedElement, Deserializer<?> deserializer)
{
+            if (receivedElement == null) {
+                return null;
+            }
+
+            if (receivedElement instanceof byte[]) {
+                return deserializer.deserialize(this.processorContext.topic(), (byte[]) receivedElement);
+            }
+
+            return receivedElement;
+        }
+
+        @Override
+        public void close() {
+            if (notStandardOut) {
+                this.printStream.close();
+            } else {
+                this.printStream.flush();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/7c279898/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java
new file mode 100644
index 0000000..22948ab
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.nio.charset.Charset;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class KeyValuePrinterProcessorTest {
+
+    private String topicName = "topic";
+    private Serde<String> stringSerde = Serdes.String();
+    private Serde<byte[]> bytesSerde = Serdes.ByteArray();
+    private ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    private KStreamBuilder builder = new KStreamBuilder();
+    private PrintStream printStream = new PrintStream(baos);
+
+
+    @Test
+    public void testPrintKeyValueDefaultSerde() throws Exception {
+
+        KeyValuePrinter<String, String> keyValuePrinter = new KeyValuePrinter<>(printStream);
+        String[] suppliedKeys = {"foo", "bar", null};
+        String[] suppliedValues = {"value1", "value2", "value3"};
+        String[] expectedValues = {"foo , value1", "bar , value2", "null , value3"};
+
+
+        KStream<String, String> stream = builder.stream(stringSerde, stringSerde, topicName);
+        stream.process(keyValuePrinter);
+
+        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        for (int i = 0; i < suppliedKeys.length; i++) {
+            driver.process(topicName, suppliedKeys[i], suppliedValues[i]);
+        }
+
+        String[] capturedValues = new String(baos.toByteArray(), Charset.forName("UTF-8")).split("\n");
+
+        for (int i = 0; i < capturedValues.length; i++) {
+            assertEquals(capturedValues[i], expectedValues[i]);
+        }
+    }
+
+
+    @Test
+    public void testPrintKeyValueWithProvidedSerde() throws Exception {
+
+        Serde<MockObject> mockObjectSerde = Serdes.serdeFrom(new MockSerializer(),
new MockDeserializer());
+        KeyValuePrinter<String, MockObject> keyValuePrinter = new KeyValuePrinter<>(printStream,
stringSerde, mockObjectSerde);
+        KStream<String, MockObject> stream = builder.stream(stringSerde, mockObjectSerde,
topicName);
+
+        stream.process(keyValuePrinter);
+
+        KStreamTestDriver driver = new KStreamTestDriver(builder);
+
+        String suppliedKey = null;
+        byte[] suppliedValue = "{\"name\":\"print\", \"label\":\"test\"}".getBytes(Charset.forName("UTF-8"));
+
+        driver.process(topicName, suppliedKey, suppliedValue);
+        String expectedPrintedValue = "null , name:print label:test";
+        String capturedValue = new String(baos.toByteArray(), Charset.forName("UTF-8")).trim();
+
+        assertEquals(capturedValue, expectedPrintedValue);
+
+    }
+
+    private static class MockObject {
+        public String name;
+        public String label;
+
+        public MockObject() {
+        }
+
+        MockObject(String name, String label) {
+            this.name = name;
+            this.label = label;
+        }
+
+        @Override
+        public String toString() {
+            return "name:" + name + " label:" + label;
+        }
+    }
+
+
+    private static class MockDeserializer implements Deserializer<MockObject> {
+
+        private com.fasterxml.jackson.databind.ObjectMapper objectMapper = new com.fasterxml.jackson.databind.ObjectMapper();
+
+        @Override
+        public void configure(Map<String, ?> configs, boolean isKey) {
+
+        }
+
+        @Override
+        public MockObject deserialize(String topic, byte[] data) {
+            MockObject mockObject;
+            try {
+                mockObject = objectMapper.readValue(data, MockObject.class);
+            } catch (Exception e) {
+                throw new SerializationException(e);
+            }
+            return mockObject;
+        }
+
+        @Override
+        public void close() {
+
+        }
+    }
+
+
+    private static class MockSerializer implements Serializer<MockObject> {
+        private final com.fasterxml.jackson.databind.ObjectMapper objectMapper = new com.fasterxml.jackson.databind.ObjectMapper();
+
+        @Override
+        public void configure(Map<String, ?> configs, boolean isKey) {
+
+        }
+
+        @Override
+        public byte[] serialize(String topic, MockObject data) {
+            try {
+                return objectMapper.writeValueAsBytes(data);
+            } catch (Exception e) {
+                throw new SerializationException("Error serializing JSON message", e);
+            }
+        }
+
+        @Override
+        public void close() {
+
+        }
+    }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/7c279898/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index 287af5a..1d478dd 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -171,7 +171,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
 
     @Override
     public String topic() {
-        throw new UnsupportedOperationException("topic() not supported.");
+        return "mockTopic";
     }
 
     @Override


Mime
View raw message