kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: KAFKA-3422: Add overloading functions without serdes in Streams DSL
Date Fri, 18 Mar 2016 19:39:46 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 0d8cbbcb2 -> 5d0cd7667


http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
index 00089ab..0407299 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
@@ -17,10 +17,10 @@
 
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.state.StateSerdes;
 
 /**
  * A {@link org.apache.kafka.streams.state.KeyValueStore} that stores all entries in a local
RocksDB database.
@@ -36,15 +36,21 @@ public class RocksDBWindowStoreSupplier<K, V> implements StateStoreSupplier
{
     private final long retentionPeriod;
     private final boolean retainDuplicates;
     private final int numSegments;
-    private final StateSerdes<K, V> serdes;
+    private final Serde<K> keySerde;
+    private final Serde<V> valueSerde;
     private final Time time;
 
-    public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments,
boolean retainDuplicates, StateSerdes<K, V> serdes, Time time) {
+    public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments,
boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde) {
+        this(name, retentionPeriod, numSegments, retainDuplicates, keySerde, valueSerde,
null);
+    }
+
+    public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments,
boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde, Time time) {
         this.name = name;
         this.retentionPeriod = retentionPeriod;
         this.retainDuplicates = retainDuplicates;
         this.numSegments = numSegments;
-        this.serdes = serdes;
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
         this.time = time;
     }
 
@@ -53,7 +59,7 @@ public class RocksDBWindowStoreSupplier<K, V> implements StateStoreSupplier
{
     }
 
     public StateStore get() {
-        return new MeteredWindowStore<>(new RocksDBWindowStore<>(name, retentionPeriod,
numSegments, retainDuplicates, serdes).enableLogging(), "rocksdb-window", time);
+        return new MeteredWindowStore<>(new RocksDBWindowStore<>(name, retentionPeriod,
numSegments, retainDuplicates, keySerde, valueSerde).enableLogging(), "rocksdb-window", time);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 20c3a28..6f49b6a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -72,7 +72,7 @@ public class KTableImplTest {
         MockProcessorSupplier<String, Integer> proc3 = new MockProcessorSupplier<>();
         table3.toStream().process(proc3);
 
-        KTable<String, String> table4 = table1.through(topic2, stringSerde, stringSerde);
+        KTable<String, String> table4 = table1.through(stringSerde, stringSerde, topic2);
 
         MockProcessorSupplier<String, String> proc4 = new MockProcessorSupplier<>();
         table4.toStream().process(proc4);
@@ -116,7 +116,7 @@ public class KTableImplTest {
                         }
                     });
             KTableImpl<String, String, String> table4 = (KTableImpl<String, String,
String>)
-                    table1.through(topic2, stringSerde, stringSerde);
+                    table1.through(stringSerde, stringSerde, topic2);
 
             KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
             KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index aa3daeb..9ec1258 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -94,7 +94,7 @@ public class KTableMapValuesTest {
                         }
                     });
             KTableImpl<String, String, String> table4 = (KTableImpl<String, String,
String>)
-                    table1.through(topic2, stringSerde, stringSerde);
+                    table1.through(stringSerde, stringSerde, topic2);
 
             KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
             KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
index ce4956c..0a02824 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
@@ -94,7 +94,7 @@ public class SmokeTestClient extends SmokeTestUtil {
 
         KStream<String, Integer> source = builder.stream(stringSerde, intSerde, "data");
 
-        source.to("echo", stringSerde, intSerde);
+        source.to(stringSerde, intSerde, "echo");
 
         KStream<String, Integer> data = source.filter(new Predicate<String, Integer>()
{
             @Override
@@ -123,7 +123,7 @@ public class SmokeTestClient extends SmokeTestUtil {
                 intSerde
         ).toStream().map(
                 new Unwindow<String, Integer>()
-        ).to("min", stringSerde, intSerde);
+        ).to(stringSerde, intSerde, "min");
 
         KTable<String, Integer> minTable = builder.table(stringSerde, intSerde, "min");
         minTable.toStream().process(SmokeTestUtil.<Integer>printProcessorSupplier("min"));
@@ -146,7 +146,7 @@ public class SmokeTestClient extends SmokeTestUtil {
                 intSerde
         ).toStream().map(
                 new Unwindow<String, Integer>()
-        ).to("max", stringSerde, intSerde);
+        ).to(stringSerde, intSerde, "max");
 
         KTable<String, Integer> maxTable = builder.table(stringSerde, intSerde, "max");
         maxTable.toStream().process(SmokeTestUtil.<Integer>printProcessorSupplier("max"));
@@ -169,7 +169,7 @@ public class SmokeTestClient extends SmokeTestUtil {
                 longSerde
         ).toStream().map(
                 new Unwindow<String, Long>()
-        ).to("sum", stringSerde, longSerde);
+        ).to(stringSerde, longSerde, "sum");
 
 
         KTable<String, Long> sumTable = builder.table(stringSerde, longSerde, "sum");
@@ -181,7 +181,7 @@ public class SmokeTestClient extends SmokeTestUtil {
                 stringSerde
         ).toStream().map(
                 new Unwindow<String, Long>()
-        ).to("cnt", stringSerde, longSerde);
+        ).to(stringSerde, longSerde, "cnt");
 
         KTable<String, Long> cntTable = builder.table(stringSerde, longSerde, "cnt");
         cntTable.toStream().process(SmokeTestUtil.<Long>printProcessorSupplier("cnt"));
@@ -193,7 +193,7 @@ public class SmokeTestClient extends SmokeTestUtil {
                         return value1 - value2;
                     }
                 }
-        ).to("dif", stringSerde, intSerde);
+        ).to(stringSerde, intSerde, "dif");
 
         // avg
         sumTable.join(
@@ -203,7 +203,7 @@ public class SmokeTestClient extends SmokeTestUtil {
                         return (double) value1 / (double) value2;
                     }
                 }
-        ).to("avg", stringSerde, doubleSerde);
+        ).to(stringSerde, doubleSerde, "avg");
 
         // windowed count
         data.countByKey(
@@ -216,7 +216,7 @@ public class SmokeTestClient extends SmokeTestUtil {
                         return new KeyValue<>(key.value() + "@" + key.window().start(),
value);
                     }
                 }
-        ).to("wcnt", stringSerde, longSerde);
+        ).to(stringSerde, longSerde, "wcnt");
 
         // test repartition
         Agg agg = new Agg();
@@ -229,7 +229,7 @@ public class SmokeTestClient extends SmokeTestUtil {
                 longSerde,
                 longSerde,
                 "cntByCnt"
-        ).to("tagg", stringSerde, longSerde);
+        ).to(stringSerde, longSerde, "tagg");
 
         return new KafkaStreams(builder, props);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index ffc97c3..502870b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -57,11 +57,13 @@ public class RocksDBWindowStoreTest {
     private final long segmentSize = RocksDBWindowStore.MIN_SEGMENT_INTERVAL;
     private final long retentionPeriod = segmentSize * (numSegments - 1);
     private final long windowSize = 3;
-    private final StateSerdes<Integer, String> serdes = StateSerdes.withBuiltinTypes("",
Integer.class, String.class);
+    private final Serde<Integer> intSerde = Serdes.Integer();
+    private final Serde<String> stringSerde = Serdes.String();
+    private final StateSerdes<Integer, String> serdes = new StateSerdes<>("",
intSerde, stringSerde);
 
     @SuppressWarnings("unchecked")
-    protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context,
StateSerdes<K, V> serdes) {
-        StateStoreSupplier supplier = new RocksDBWindowStoreSupplier<>(windowName,
retentionPeriod, numSegments, true, serdes, null);
+    protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context)
{
+        StateStoreSupplier supplier = new RocksDBWindowStoreSupplier<>(windowName,
retentionPeriod, numSegments, true, intSerde, stringSerde);
 
         WindowStore<K, V> store = (WindowStore<K, V>) supplier.get();
         store.init(context, store);
@@ -89,7 +91,7 @@ public class RocksDBWindowStoreTest {
                     byteArraySerde, byteArraySerde,
                     recordCollector);
 
-            WindowStore<Integer, String> store = createWindowStore(context, serdes);
+            WindowStore<Integer, String> store = createWindowStore(context);
             try {
                 long startTime = segmentSize - 4L;
 
@@ -185,7 +187,7 @@ public class RocksDBWindowStoreTest {
                     byteArraySerde, byteArraySerde,
                     recordCollector);
 
-            WindowStore<Integer, String> store = createWindowStore(context, serdes);
+            WindowStore<Integer, String> store = createWindowStore(context);
             try {
                 long startTime = segmentSize - 4L;
 
@@ -281,7 +283,7 @@ public class RocksDBWindowStoreTest {
                     byteArraySerde, byteArraySerde,
                     recordCollector);
 
-            WindowStore<Integer, String> store = createWindowStore(context, serdes);
+            WindowStore<Integer, String> store = createWindowStore(context);
             try {
                 long startTime = segmentSize - 4L;
 
@@ -377,7 +379,7 @@ public class RocksDBWindowStoreTest {
                     byteArraySerde, byteArraySerde,
                     recordCollector);
 
-            WindowStore<Integer, String> store = createWindowStore(context, serdes);
+            WindowStore<Integer, String> store = createWindowStore(context);
             try {
                 long startTime = segmentSize - 4L;
 
@@ -436,7 +438,7 @@ public class RocksDBWindowStoreTest {
                     byteArraySerde, byteArraySerde,
                     recordCollector);
 
-            WindowStore<Integer, String> store = createWindowStore(context, serdes);
+            WindowStore<Integer, String> store = createWindowStore(context);
             RocksDBWindowStore<Integer, String> inner =
                     (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer,
String>) store).inner();
             try {
@@ -553,7 +555,7 @@ public class RocksDBWindowStoreTest {
                     byteArraySerde, byteArraySerde,
                     recordCollector);
 
-            WindowStore<Integer, String> store = createWindowStore(context, serdes);
+            WindowStore<Integer, String> store = createWindowStore(context);
             try {
                 context.setTime(startTime);
                 store.put(0, "zero");
@@ -602,7 +604,7 @@ public class RocksDBWindowStoreTest {
                     byteArraySerde, byteArraySerde,
                     recordCollector);
 
-            WindowStore<Integer, String> store = createWindowStore(context, serdes);
+            WindowStore<Integer, String> store = createWindowStore(context);
             RocksDBWindowStore<Integer, String> inner =
                     (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer,
String>) store).inner();
 
@@ -654,7 +656,7 @@ public class RocksDBWindowStoreTest {
                     byteArraySerde, byteArraySerde,
                     recordCollector);
 
-            WindowStore<Integer, String> store = createWindowStore(context, serdes);
+            WindowStore<Integer, String> store = createWindowStore(context);
             RocksDBWindowStore<Integer, String> inner =
                     (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer,
String>) store).inner();
 
@@ -759,7 +761,7 @@ public class RocksDBWindowStoreTest {
 
             File storeDir = new File(baseDir, windowName);
 
-            WindowStore<Integer, String> store = createWindowStore(context, serdes);
+            WindowStore<Integer, String> store = createWindowStore(context);
             RocksDBWindowStore<Integer, String> inner =
                     (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer,
String>) store).inner();
 
@@ -775,7 +777,7 @@ public class RocksDBWindowStoreTest {
                 store.close();
             }
 
-            store = createWindowStore(context, serdes);
+            store = createWindowStore(context);
             inner = (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer,
String>) store).inner();
 
             try {


Mime
View raw message