kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: MINOR: use StoreBuilder in KStreamImpl rather than StateStoreSupplier
Date Tue, 19 Sep 2017 11:05:57 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk c8f147199 -> c96f89f84


MINOR: use StoreBuilder in KStreamImpl rather than StateStoreSupplier

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>,
Guozhang Wang <wangguoz@gmail.com>

Closes #3892 from dguy/cleanup-state-stores


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c96f89f8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c96f89f8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c96f89f8

Branch: refs/heads/trunk
Commit: c96f89f845f790d8e7bce45aae6c8c4d15a25660
Parents: c8f1471
Author: Damian Guy <damian.guy@gmail.com>
Authored: Tue Sep 19 12:05:52 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Tue Sep 19 12:05:52 2017 +0100

----------------------------------------------------------------------
 .../streams/kstream/internals/KStreamImpl.java  | 29 ++++++++++----------
 1 file changed, 15 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c96f89f8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 6ebbd14..cbaf95a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -38,9 +38,10 @@ import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowStore;
 
 import java.lang.reflect.Array;
 import java.util.Collections;
@@ -827,16 +828,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K>
implements KStream<K, V
         return groupByKey(Serialized.with(keySerde, valSerde));
     }
 
-    private static <K, V> StateStoreSupplier createWindowedStateStore(final JoinWindows
windows,
-                                                                      final Serde<K>
keySerde,
-                                                                      final Serde<V>
valueSerde,
-                                                                      final String storeName)
{
-        return Stores.create(storeName)
-            .withKeys(keySerde)
-            .withValues(valueSerde)
-            .persistent()
-            .windowed(windows.size(), windows.maintainMs(), windows.segments, true)
-            .build();
+    private static <K, V> StoreBuilder<WindowStore<K, V>> createWindowedStateStore(final
JoinWindows windows,
+                                                                                   final
Serde<K> keySerde,
+                                                                                   final
Serde<V> valueSerde,
+                                                                                   final
String storeName) {
+        return Stores.windowStoreBuilder(Stores.persistentWindowStore(storeName,
+                                                                      windows.maintainMs(),
+                                                                      windows.segments,
+                                                                      windows.size(),
+                                                                      true), keySerde, valueSerde);
+
     }
 
     private class KStreamImplJoin {
@@ -854,17 +855,17 @@ public class KStreamImpl<K, V> extends AbstractStream<K>
implements KStream<K, V
                                                    final KStream<K1, V2> other,
                                                    final ValueJoiner<? super V1, ? super
V2, ? extends R> joiner,
                                                    final JoinWindows windows,
-                                                   final Joined joined) {
+                                                   final Joined<K1, V1, V2> joined)
{
             String thisWindowStreamName = builder.newName(WINDOWED_NAME);
             String otherWindowStreamName = builder.newName(WINDOWED_NAME);
             String joinThisName = rightOuter ? builder.newName(OUTERTHIS_NAME) : builder.newName(JOINTHIS_NAME);
             String joinOtherName = leftOuter ? builder.newName(OUTEROTHER_NAME) : builder.newName(JOINOTHER_NAME);
             String joinMergeName = builder.newName(MERGE_NAME);
 
-            StateStoreSupplier thisWindow =
+            final StoreBuilder<WindowStore<K1, V1>> thisWindow =
                 createWindowedStateStore(windows, joined.keySerde(), joined.valueSerde(),
joinThisName + "-store");
 
-            StateStoreSupplier otherWindow =
+            final StoreBuilder<WindowStore<K1, V2>> otherWindow =
                 createWindowedStateStore(windows, joined.keySerde(), joined.otherValueSerde(),
joinOtherName + "-store");
 
 


Mime
View raw message