kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: KAFKA-3016: phase-1. A local store for join window
Date Tue, 05 Jan 2016 00:47:21 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 57df460f8 -> b0b3e5aeb


http://git-wip-us.apache.org/repos/asf/kafka/blob/b0b3e5ae/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index 747b4f1..a6a29cd 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -26,10 +26,12 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.state.Entry;
 
 import java.io.File;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class MockProcessorContext implements ProcessorContext, RecordCollector.Supplier {
@@ -40,20 +42,19 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
     private final Deserializer keyDeserializer;
     private final Deserializer valueDeserializer;
     private final RecordCollector.Supplier recordCollectorSupplier;
+    private final File stateDir;
 
     private Map<String, StateStore> storeMap = new HashMap<>();
+    private Map<String, StateRestoreCallback> restoreFuncs = new HashMap<>();
 
     long timestamp = -1L;
 
-    public MockProcessorContext(KStreamTestDriver driver, Serializer<?> serializer,
Deserializer<?> deserializer) {
-        this(driver, serializer, deserializer, serializer, deserializer, (RecordCollector.Supplier)
null);
-    }
-
-    public MockProcessorContext(KStreamTestDriver driver, Serializer<?> keySerializer,
Deserializer<?> keyDeserializer,
-            Serializer<?> valueSerializer, Deserializer<?> valueDeserializer,
-            final RecordCollector collector) {
-        this(driver, keySerializer, keyDeserializer, valueSerializer, valueDeserializer,
-                collector == null ? null : new RecordCollector.Supplier() {
+    public MockProcessorContext(KStreamTestDriver driver, File stateDir,
+                                Serializer<?> keySerializer, Deserializer<?>
keyDeserializer,
+                                Serializer<?> valueSerializer, Deserializer<?>
valueDeserializer,
+                                final RecordCollector collector) {
+        this(driver, stateDir, keySerializer, keyDeserializer, valueSerializer, valueDeserializer,
+                new RecordCollector.Supplier() {
                     @Override
                     public RecordCollector recordCollector() {
                         return collector;
@@ -61,10 +62,12 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
                 });
     }
 
-    public MockProcessorContext(KStreamTestDriver driver, Serializer<?> keySerializer,
Deserializer<?> keyDeserializer,
-            Serializer<?> valueSerializer, Deserializer<?> valueDeserializer,
-            RecordCollector.Supplier collectorSupplier) {
+    public MockProcessorContext(KStreamTestDriver driver, File stateDir,
+                                Serializer<?> keySerializer, Deserializer<?>
keyDeserializer,
+                                Serializer<?> valueSerializer, Deserializer<?>
valueDeserializer,
+                                RecordCollector.Supplier collectorSupplier) {
         this.driver = driver;
+        this.stateDir = stateDir;
         this.keySerializer = keySerializer;
         this.valueSerializer = valueSerializer;
         this.keyDeserializer = keyDeserializer;
@@ -74,10 +77,12 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
 
     @Override
     public RecordCollector recordCollector() {
-        if (recordCollectorSupplier == null) {
+        RecordCollector recordCollector = recordCollectorSupplier.recordCollector();
+
+        if (recordCollector == null) {
             throw new UnsupportedOperationException("No RecordCollector specified");
         }
-        return recordCollectorSupplier.recordCollector();
+        return recordCollector;
     }
 
     public void setTime(long timestamp) {
@@ -111,7 +116,10 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
 
     @Override
     public File stateDir() {
-        return driver.stateDir;
+        if (stateDir == null)
+            throw new UnsupportedOperationException("State directory not specified");
+
+        return stateDir;
     }
 
     @Override
@@ -130,6 +138,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
     @Override
     public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback func)
{
         storeMap.put(store.name(), store);
+        restoreFuncs.put(store.name(), func);
     }
 
     @Override
@@ -182,4 +191,11 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
     public Map<String, StateStore> allStateStores() {
         return Collections.unmodifiableMap(storeMap);
     }
+
+    public void restore(String storeName, List<Entry<byte[], byte[]>> changeLog)
{
+        StateRestoreCallback restoreCallback = restoreFuncs.get(storeName);
+        for (Entry<byte[], byte[]> entry : changeLog) {
+            restoreCallback.restore(entry.key(), entry.value());
+        }
+    }
 }


Mime
View raw message