kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: add window store range query in simple benchmark (#4894)
Date Thu, 19 Apr 2018 23:14:36 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1f523d9  MINOR: add window store range query in simple benchmark (#4894)
1f523d9 is described below

commit 1f523d9d72ad3008664146e1d2a5b40861b06072
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Thu Apr 19 16:14:32 2018 -0700

    MINOR: add window store range query in simple benchmark (#4894)
    
    There are a couple minor additions in this PR:
    
    1. Add a new test for window store, to range query upon receiving each record.
    2. In the non-windowed state store case, add a get call before the put call.
    3. Enable caching by default to be consistent with other Join / Aggregate cases, where
caching is enabled by default.
    
    Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
---
 .../apache/kafka/streams/perf/SimpleBenchmark.java | 58 +++++++++++++++++++++-
 .../streams/streams_simple_benchmark_test.py       |  4 +-
 2 files changed, 59 insertions(+), 3 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 9dd97c6..d956f27 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -42,13 +42,16 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowStore;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -187,6 +190,9 @@ public class SimpleBenchmark {
             case "streamprocesswithstatestore":
                 processStreamWithStateStore(SOURCE_TOPIC_ONE);
                 break;
+            case "streamprocesswithwindowstore":
+                processStreamWithWindowStore(SOURCE_TOPIC_ONE);
+                break;
             case "streamtablejoin":
                 streamTableJoin(SOURCE_TOPIC_ONE, SOURCE_TOPIC_TWO);
                 break;
@@ -426,7 +432,7 @@ public class SimpleBenchmark {
         final StreamsBuilder builder = new StreamsBuilder();
         final StoreBuilder<KeyValueStore<Integer, byte[]>> storeBuilder
                 = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("store"), INTEGER_SERDE,
BYTE_SERDE);
-        builder.addStateStore(storeBuilder);
+        builder.addStateStore(storeBuilder.withCachingEnabled());
 
         final KStream<Integer, byte[]> source = builder.stream(topic);
 
@@ -445,6 +451,56 @@ public class SimpleBenchmark {
 
                     @Override
                     public void process(final Integer key, final byte[] value) {
+                        store.get(key);
+                        store.put(key, value);
+                    }
+                };
+            }
+        }, "store");
+
+        final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
+        runGenericBenchmark(streams, "Streams Stateful Performance [records/latency/rec-sec/MB-sec
joined]: ", latch);
+    }
+
+    private void processStreamWithWindowStore(final String topic) {
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        setStreamProperties("simple-benchmark-streams-with-store");
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final StoreBuilder<WindowStore<Integer, byte[]>> storeBuilder
+                = Stores.windowStoreBuilder(Stores.persistentWindowStore("store",
+                AGGREGATE_WINDOW_SIZE * 3,
+                3,
+                AGGREGATE_WINDOW_SIZE,
+                false),
+                INTEGER_SERDE, BYTE_SERDE);
+        builder.addStateStore(storeBuilder.withCachingEnabled());
+
+        final KStream<Integer, byte[]> source = builder.stream(topic);
+
+        source.peek(new CountDownAction(latch)).process(new ProcessorSupplier<Integer,
byte[]>() {
+            @Override
+            public Processor<Integer, byte[]> get() {
+                return new AbstractProcessor<Integer, byte[]>() {
+                    WindowStore<Integer, byte[]> store;
+
+                    @SuppressWarnings("unchecked")
+                    @Override
+                    public void init(final ProcessorContext context) {
+                        super.init(context);
+                        store = (WindowStore<Integer, byte[]>) context.getStateStore("store");
+                    }
+
+                    @Override
+                    public void process(final Integer key, final byte[] value) {
+                        final long timestamp = context().timestamp();
+                        final KeyValueIterator<Windowed<Integer>, byte[]> iter
= store.fetch(key - 10, key + 10, timestamp - 1000L, timestamp + 1000L);
+                        while (iter.hasNext()) {
+                            iter.next();
+                        }
+                        iter.close();
+
                         store.put(key, value);
                     }
                 };
diff --git a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
index 0f1d33f..6e6d676 100644
--- a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
+++ b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
@@ -23,8 +23,8 @@ from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
 from kafkatest.version import DEV_BRANCH
 
-STREAMS_SIMPLE_TESTS = ["streamprocess", "streamprocesswithsink", "streamprocesswithstatestore"]
-STREAMS_COUNT_TESTS = ["streamcount", "streamcountwindowed", "streamprocesswithstatestore"]
+STREAMS_SIMPLE_TESTS = ["streamprocess", "streamprocesswithsink", "streamprocesswithstatestore",
"streamprocesswithwindowstore"]
+STREAMS_COUNT_TESTS = ["streamcount", "streamcountwindowed"]
 STREAMS_JOIN_TESTS = ["streamtablejoin", "streamstreamjoin", "tabletablejoin"]
 NON_STREAMS_TESTS = ["consume", "consumeproduce"]
 

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message