kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: KAFKA-2593: Key value stores can use specified serializers and deserializers
Date Wed, 14 Oct 2015 20:54:34 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk f13d11559 -> 6e571225d


http://git-wip-us.apache.org/repos/asf/kafka/blob/6e571225/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index c0b09f6..761f5ce 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -23,31 +23,65 @@ import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
 
 import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
 
-public class MockProcessorContext implements ProcessorContext {
+public class MockProcessorContext implements ProcessorContext, RecordCollector.Supplier {
 
     private final KStreamTestDriver driver;
-    private final Serializer serializer;
-    private final Deserializer deserializer;
+    private final Serializer keySerializer;
+    private final Serializer valueSerializer;
+    private final Deserializer keyDeserializer;
+    private final Deserializer valueDeserializer;
+    private final RecordCollector.Supplier recordCollectorSupplier;
 
     private Map<String, StateStore> storeMap = new HashMap<>();
 
     long timestamp = -1L;
 
     public MockProcessorContext(KStreamTestDriver driver, Serializer<?> serializer,
Deserializer<?> deserializer) {
+        this(driver, serializer, deserializer, serializer, deserializer, (RecordCollector.Supplier)
null);
+    }
+
+    public MockProcessorContext(KStreamTestDriver driver, Serializer<?> keySerializer,
Deserializer<?> keyDeserializer,
+            Serializer<?> valueSerializer, Deserializer<?> valueDeserializer,
+            final RecordCollector collector) {
+        this(driver, keySerializer, keyDeserializer, valueSerializer, valueDeserializer,
+                collector == null ? null : new RecordCollector.Supplier() {
+                    @Override
+                    public RecordCollector recordCollector() {
+                        return collector;
+                    }
+                });
+    }
+
+    public MockProcessorContext(KStreamTestDriver driver, Serializer<?> keySerializer,
Deserializer<?> keyDeserializer,
+            Serializer<?> valueSerializer, Deserializer<?> valueDeserializer,
+            RecordCollector.Supplier collectorSupplier) {
         this.driver = driver;
-        this.serializer = serializer;
-        this.deserializer = deserializer;
+        this.keySerializer = keySerializer;
+        this.valueSerializer = valueSerializer;
+        this.keyDeserializer = keyDeserializer;
+        this.valueDeserializer = valueDeserializer;
+        this.recordCollectorSupplier = collectorSupplier;
+    }
+
+    @Override
+    public RecordCollector recordCollector() {
+        if (recordCollectorSupplier == null) {
+            throw new UnsupportedOperationException("No RecordCollector specified");
+        }
+        return recordCollectorSupplier.recordCollector();
     }
 
     public void setTime(long timestamp) {
         this.timestamp = timestamp;
     }
 
+    @Override
     public int id() {
         return -1;
     }
@@ -59,22 +93,22 @@ public class MockProcessorContext implements ProcessorContext {
 
     @Override
     public Serializer<?> keySerializer() {
-        return serializer;
+        return keySerializer;
     }
 
     @Override
     public Serializer<?> valueSerializer() {
-        return serializer;
+        return valueSerializer;
     }
 
     @Override
     public Deserializer<?> keyDeserializer() {
-        return deserializer;
+        return keyDeserializer;
     }
 
     @Override
     public Deserializer<?> valueDeserializer() {
-        return deserializer;
+        return valueDeserializer;
     }
 
     @Override
@@ -140,4 +174,4 @@ public class MockProcessorContext implements ProcessorContext {
         return this.timestamp;
     }
 
-}
+}
\ No newline at end of file


Mime
View raw message