kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4016: Added join benchmarks
Date Fri, 19 Aug 2016 22:23:32 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 33447cb4b -> c5d26c482


KAFKA-4016: Added join benchmarks

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Ismael Juma, Damian Guy, Guozhang Wang

Closes #1700 from enothereska/join-benchmarks


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c5d26c48
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c5d26c48
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c5d26c48

Branch: refs/heads/trunk
Commit: c5d26c4829583c95af7ca9e961a4d3954f8e09eb
Parents: 33447cb
Author: Eno Thereska <eno.thereska@gmail.com>
Authored: Fri Aug 19 15:23:30 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Aug 19 15:23:30 2016 -0700

----------------------------------------------------------------------
 .../kafka/streams/perf/SimpleBenchmark.java     | 325 +++++++++++++++----
 .../streams/streams_simple_benchmark_test.py    |   2 +-
 .../services/performance/streams_performance.py |   6 +-
 3 files changed, 270 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c5d26c48/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index a4b5345..5495416 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -28,14 +28,18 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -47,6 +51,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.Properties;
+import java.util.Random;
 
 public class SimpleBenchmark {
 
@@ -57,14 +62,33 @@ public class SimpleBenchmark {
     private static final String SOURCE_TOPIC = "simpleBenchmarkSourceTopic";
     private static final String SINK_TOPIC = "simpleBenchmarkSinkTopic";
 
-    private static final long NUM_RECORDS = 10000000L;
-    private static final Long END_KEY = NUM_RECORDS - 1;
+    private static final String JOIN_TOPIC_1_PREFIX = "joinSourceTopic1";
+    private static final String JOIN_TOPIC_2_PREFIX = "joinSourceTopic2";
+    private static final ValueJoiner VALUE_JOINER = new ValueJoiner<byte[], byte[], byte[]>()
{
+        @Override
+        public byte[] apply(final byte[] value1, final byte[] value2) {
+            if (value1 == null && value2 == null)
+                return new byte[VALUE_SIZE];
+            if (value1 == null && value2 != null)
+                return value2;
+            if (value1 != null && value2 == null)
+                return value1;
+
+            byte[] tmp = new byte[value1.length + value2.length];
+            System.arraycopy(value1, 0, tmp, 0, value1.length);
+            System.arraycopy(value2, 0, tmp, value1.length, value2.length);
+            return tmp;
+        }
+    };
+
+    private static int numRecords;
+    private static Integer endKey;
     private static final int KEY_SIZE = 8;
     private static final int VALUE_SIZE = 100;
     private static final int RECORD_SIZE = KEY_SIZE + VALUE_SIZE;
 
     private static final Serde<byte[]> BYTE_SERDE = Serdes.ByteArray();
-    private static final Serde<Long> LONG_SERDE = Serdes.Long();
+    private static final Serde<Integer> INTEGER_SERDE = Serdes.Integer();
 
     public SimpleBenchmark(File stateDir, String kafka, String zookeeper) {
         super();
@@ -77,6 +101,8 @@ public class SimpleBenchmark {
         String kafka = args.length > 0 ? args[0] : "localhost:9092";
         String zookeeper = args.length > 1 ? args[1] : "localhost:2181";
         String stateDirStr = args.length > 2 ? args[2] : "/tmp/kafka-streams-simple-benchmark";
+        numRecords = args.length > 3 ? Integer.parseInt(args[3]) : 10000000;
+        endKey = numRecords - 1;
 
         final File stateDir = new File(stateDirStr);
         stateDir.mkdir();
@@ -88,25 +114,130 @@ public class SimpleBenchmark {
         System.out.println("kafka=" + kafka);
         System.out.println("zookeeper=" + zookeeper);
         System.out.println("stateDir=" + stateDir);
+        System.out.println("numRecords=" + numRecords);
 
         SimpleBenchmark benchmark = new SimpleBenchmark(stateDir, kafka, zookeeper);
 
         // producer performance
-        benchmark.produce();
+        benchmark.produce(SOURCE_TOPIC, VALUE_SIZE, "simple-benchmark-produce", numRecords,
true, numRecords, true);
         // consumer performance
-        benchmark.consume();
+        benchmark.consume(SOURCE_TOPIC);
         // simple stream performance source->process
-        benchmark.processStream();
+        benchmark.processStream(SOURCE_TOPIC);
         // simple stream performance source->sink
-        benchmark.processStreamWithSink();
+        benchmark.processStreamWithSink(SOURCE_TOPIC);
         // simple stream performance source->store
-        benchmark.processStreamWithStateStore();
+        benchmark.processStreamWithStateStore(SOURCE_TOPIC);
+        // simple streams performance KSTREAM-KTABLE join
+        benchmark.kStreamKTableJoin(JOIN_TOPIC_1_PREFIX + "kStreamKTable", JOIN_TOPIC_2_PREFIX
+ "kStreamKTable");
+        // simple streams performance KSTREAM-KSTREAM join
+        benchmark.kStreamKStreamJoin(JOIN_TOPIC_1_PREFIX + "kStreamKStream", JOIN_TOPIC_2_PREFIX
+ "kStreamKStream");
+        // simple streams performance KTABLE-KTABLE join
+        benchmark.kTableKTableJoin(JOIN_TOPIC_1_PREFIX + "kTableKTable", JOIN_TOPIC_2_PREFIX
+ "kTableKTable");
+    }
+
+    private Properties setJoinProperties(final String applicationId) {
+        Properties props = new Properties();
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+        props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
+        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass());
+        return props;
+    }
+
+    /**
+     * Measure the performance of a KStream-KTable left join. The setup is such that each
+     * KStream record joins to exactly one element in the KTable
+     */
+    public void kStreamKTableJoin(String kStreamTopic, String kTableTopic) throws Exception
{
+        CountDownLatch latch = new CountDownLatch(numRecords);
+
+        // initialize topics
+        System.out.println("Initializing kStreamTopic " + kStreamTopic);
+        produce(kStreamTopic, VALUE_SIZE, "simple-benchmark-produce-kstream", numRecords,
false, numRecords, false);
+        System.out.println("Initializing kTableTopic " + kTableTopic);
+        produce(kTableTopic, VALUE_SIZE, "simple-benchmark-produce-ktable", numRecords, true,
numRecords, false);
+
+        // setup join
+        Properties props = setJoinProperties("simple-benchmark-kstream-ktable-join");
+        final KafkaStreams streams = createKafkaStreamsKStreamKTableJoin(props, kStreamTopic,
kTableTopic, latch);
+
+        // run benchmark
+        runJoinBenchmark(streams, "Streams KStreamKTable LeftJoin Performance [MB/s joined]:
", latch);
+    }
+
+    /**
+     * Measure the performance of a KStream-KStream left join. The setup is such that each
+     * KStream record joins to exactly one element in the other KStream
+     */
+    public void kStreamKStreamJoin(String kStreamTopic1, String kStreamTopic2) throws Exception
{
+        CountDownLatch latch = new CountDownLatch(numRecords);
+
+        // initialize topics
+        System.out.println("Initializing kStreamTopic " + kStreamTopic1);
+        produce(kStreamTopic1, VALUE_SIZE, "simple-benchmark-produce-kstream-topic1", numRecords,
true, numRecords, false);
+        System.out.println("Initializing kStreamTopic " + kStreamTopic2);
+        produce(kStreamTopic2, VALUE_SIZE, "simple-benchmark-produce-kstream-topic2", numRecords,
true, numRecords, false);
+
+        // setup join
+        Properties props = setJoinProperties("simple-benchmark-kstream-kstream-join");
+        final KafkaStreams streams = createKafkaStreamsKStreamKStreamJoin(props, kStreamTopic1,
kStreamTopic2, latch);
+
+        // run benchmark
+        runJoinBenchmark(streams, "Streams KStreamKStream LeftJoin Performance [MB/s joined]:
", latch);
+    }
+
+    /**
+     * Measure the performance of a KTable-KTable left join. The setup is such that each
+     * KTable record joins to exactly one element in the other KTable
+     */
+    public void kTableKTableJoin(String kTableTopic1, String kTableTopic2) throws Exception
{
+        CountDownLatch latch = new CountDownLatch(numRecords);
+
+        // initialize topics
+        System.out.println("Initializing kTableTopic " + kTableTopic1);
+        produce(kTableTopic1, VALUE_SIZE, "simple-benchmark-produce-ktable-topic1", numRecords,
true, numRecords, false);
+        System.out.println("Initializing kTableTopic " + kTableTopic2);
+        produce(kTableTopic2, VALUE_SIZE, "simple-benchmark-produce-ktable-topic2", numRecords,
true, numRecords, false);
+
+        // setup join
+        Properties props = setJoinProperties("simple-benchmark-ktable-ktable-join");
+        final KafkaStreams streams = createKafkaStreamsKTableKTableJoin(props, kTableTopic1,
kTableTopic2, latch);
+
+        // run benchmark
+        runJoinBenchmark(streams, "Streams KTableKTable LeftJoin Performance [MB/s joined]:
", latch);
+    }
+
+    private void runJoinBenchmark(final KafkaStreams streams, final String nameOfBenchmark,
final CountDownLatch latch) {
+        streams.start();
+
+        long startTime = System.currentTimeMillis();
+
+        while (latch.getCount() > 0) {
+            try {
+                latch.await();
+            } catch (InterruptedException ex) {
+                //ignore
+            }
+        }
+        long endTime = System.currentTimeMillis();
+
+
+        System.out.println(nameOfBenchmark + megaBytePerSec(endTime - startTime, numRecords,
KEY_SIZE + VALUE_SIZE));
+
+        streams.close();
     }
 
-    public void processStream() {
+
+
+    public void processStream(String topic) {
         CountDownLatch latch = new CountDownLatch(1);
 
-        final KafkaStreams streams = createKafkaStreams(stateDir, kafka, zookeeper, latch);
+        final KafkaStreams streams = createKafkaStreams(topic, stateDir, kafka, zookeeper,
latch);
 
         Thread thread = new Thread() {
             public void run() {
@@ -137,10 +268,10 @@ public class SimpleBenchmark {
         }
     }
 
-    public void processStreamWithSink() {
+    public void processStreamWithSink(String topic) {
         CountDownLatch latch = new CountDownLatch(1);
 
-        final KafkaStreams streams = createKafkaStreamsWithSink(stateDir, kafka, zookeeper,
latch);
+        final KafkaStreams streams = createKafkaStreamsWithSink(topic, stateDir, kafka, zookeeper,
latch);
 
         Thread thread = new Thread() {
             public void run() {
@@ -171,10 +302,10 @@ public class SimpleBenchmark {
         }
     }
 
-    public void processStreamWithStateStore() {
+    public void processStreamWithStateStore(String topic) {
         CountDownLatch latch = new CountDownLatch(1);
 
-        final KafkaStreams streams = createKafkaStreamsWithStateStore(stateDir, kafka, zookeeper,
latch);
+        final KafkaStreams streams = createKafkaStreamsWithStateStore(topic, stateDir, kafka,
zookeeper, latch);
 
         Thread thread = new Thread() {
             public void run() {
@@ -205,54 +336,76 @@ public class SimpleBenchmark {
         }
     }
 
-    public void produce() {
+    /**
+     * Produce values to a topic
+     * @param topic Topic to produce to
+     * @param valueSizeBytes Size of value in bytes
+     * @param clientId String specifying client ID
+     * @param numRecords Number of records to produce
+     * @param sequential if True, then keys are produced sequentially from 0 to upperRange.
In this case upperRange must be >= numRecords.
+     *                   if False, then keys are produced randomly in range [0, upperRange)
+     * @param printStats if True, print stats on how long producing took. If False, don't
print stats. False can be used
+     *                   when this produce step is part of another benchmark that produces
its own stats
+     */
+    public void produce(String topic, int valueSizeBytes, String clientId, int numRecords,
boolean sequential,
+                        int upperRange, boolean printStats) throws Exception {
+
+        if (sequential) {
+            if (upperRange < numRecords) throw new Exception("UpperRange must be >=
numRecords");
+        }
         Properties props = new Properties();
-        props.put(ProducerConfig.CLIENT_ID_CONFIG, "simple-benchmark-produce");
+        props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        int key = 0;
+        Random rand = new Random();
+        KafkaProducer<Integer, byte[]> producer = new KafkaProducer<>(props);
 
-        KafkaProducer<Long, byte[]> producer = new KafkaProducer<>(props);
-
-        byte[] value = new byte[VALUE_SIZE];
+        byte[] value = new byte[valueSizeBytes];
         long startTime = System.currentTimeMillis();
 
-        for (int i = 0; i < NUM_RECORDS; i++) {
-            producer.send(new ProducerRecord<>(SOURCE_TOPIC, (long) i, value));
+        if (sequential) key = 0;
+        else key = rand.nextInt(upperRange);
+        for (int i = 0; i < numRecords; i++) {
+            producer.send(new ProducerRecord<>(topic, key, value));
+            if (sequential) key++;
+            else key = rand.nextInt(upperRange);
         }
         producer.close();
 
         long endTime = System.currentTimeMillis();
 
-        System.out.println("Producer Performance [MB/sec write]: " + megaBytePerSec(endTime
- startTime));
+        if (printStats)
+            System.out.println("Producer Performance [MB/sec write]: " + megaBytePerSec(endTime
- startTime, numRecords, KEY_SIZE + valueSizeBytes));
     }
 
-    public void consume() {
+    public void consume(String topic) {
         Properties props = new Properties();
         props.put(ConsumerConfig.CLIENT_ID_CONFIG, "simple-benchmark-consumer");
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
 
-        KafkaConsumer<Long, byte[]> consumer = new KafkaConsumer<>(props);
+        KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(props);
 
-        List<TopicPartition> partitions = getAllPartitions(consumer, SOURCE_TOPIC);
+        List<TopicPartition> partitions = getAllPartitions(consumer, topic);
         consumer.assign(partitions);
         consumer.seekToBeginning(partitions);
 
-        Long key = null;
+        Integer key = null;
 
         long startTime = System.currentTimeMillis();
 
         while (true) {
-            ConsumerRecords<Long, byte[]> records = consumer.poll(500);
+            ConsumerRecords<Integer, byte[]> records = consumer.poll(500);
             if (records.isEmpty()) {
-                if (END_KEY.equals(key))
+                if (endKey.equals(key))
                     break;
             } else {
-                for (ConsumerRecord<Long, byte[]> record : records) {
-                    Long recKey = record.key();
+                for (ConsumerRecord<Integer, byte[]> record : records) {
+                    Integer recKey = record.key();
 
                     if (key == null || key < recKey)
                         key = recKey;
@@ -266,7 +419,7 @@ public class SimpleBenchmark {
         System.out.println("Consumer Performance [MB/sec read]: " + megaBytePerSec(endTime
- startTime));
     }
 
-    private KafkaStreams createKafkaStreams(File stateDir, String kafka, String zookeeper,
final CountDownLatch latch) {
+    private KafkaStreams createKafkaStreams(String topic, File stateDir, String kafka, String
zookeeper, final CountDownLatch latch) {
         Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams");
         props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
@@ -277,20 +430,20 @@ public class SimpleBenchmark {
 
         KStreamBuilder builder = new KStreamBuilder();
 
-        KStream<Long, byte[]> source = builder.stream(LONG_SERDE, BYTE_SERDE, SOURCE_TOPIC);
+        KStream<Integer, byte[]> source = builder.stream(INTEGER_SERDE, BYTE_SERDE,
topic);
 
-        source.process(new ProcessorSupplier<Long, byte[]>() {
+        source.process(new ProcessorSupplier<Integer, byte[]>() {
             @Override
-            public Processor<Long, byte[]> get() {
-                return new Processor<Long, byte[]>() {
+            public Processor<Integer, byte[]> get() {
+                return new Processor<Integer, byte[]>() {
 
                     @Override
                     public void init(ProcessorContext context) {
                     }
 
                     @Override
-                    public void process(Long key, byte[] value) {
-                        if (END_KEY.equals(key)) {
+                    public void process(Integer key, byte[] value) {
+                        if (endKey.equals(key)) {
                             latch.countDown();
                         }
                     }
@@ -309,7 +462,7 @@ public class SimpleBenchmark {
         return new KafkaStreams(builder, props);
     }
 
-    private KafkaStreams createKafkaStreamsWithSink(File stateDir, String kafka, String zookeeper,
final CountDownLatch latch) {
+    private KafkaStreams createKafkaStreamsWithSink(String topic, File stateDir, String kafka,
String zookeeper, final CountDownLatch latch) {
         Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams-with-sink");
         props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
@@ -320,21 +473,21 @@ public class SimpleBenchmark {
 
         KStreamBuilder builder = new KStreamBuilder();
 
-        KStream<Long, byte[]> source = builder.stream(LONG_SERDE, BYTE_SERDE, SOURCE_TOPIC);
+        KStream<Integer, byte[]> source = builder.stream(INTEGER_SERDE, BYTE_SERDE,
topic);
 
-        source.to(LONG_SERDE, BYTE_SERDE, SINK_TOPIC);
-        source.process(new ProcessorSupplier<Long, byte[]>() {
+        source.to(INTEGER_SERDE, BYTE_SERDE, SINK_TOPIC);
+        source.process(new ProcessorSupplier<Integer, byte[]>() {
             @Override
-            public Processor<Long, byte[]> get() {
-                return new Processor<Long, byte[]>() {
+            public Processor<Integer, byte[]> get() {
+                return new Processor<Integer, byte[]>() {
 
                     @Override
                     public void init(ProcessorContext context) {
                     }
 
                     @Override
-                    public void process(Long key, byte[] value) {
-                        if (END_KEY.equals(key)) {
+                    public void process(Integer key, byte[] value) {
+                        if (endKey.equals(key)) {
                             latch.countDown();
                         }
                     }
@@ -353,8 +506,56 @@ public class SimpleBenchmark {
         return new KafkaStreams(builder, props);
     }
 
+    private class CountDownAction<K, V> implements ForeachAction<K, V> {
+        private CountDownLatch latch;
+        CountDownAction(final CountDownLatch latch) {
+            this.latch = latch;
+        }
+        @Override
+        public void apply(K key, V value) {
+            this.latch.countDown();
+        }
+    }
+
+    private KafkaStreams createKafkaStreamsKStreamKTableJoin(Properties streamConfig, String
kStreamTopic,
+                                                             String kTableTopic, final CountDownLatch
latch) {
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        final KStream<Long, byte[]> input1 = builder.stream(kStreamTopic);
+        final KTable<Long, byte[]> input2 = builder.table(kTableTopic, kTableTopic
+ "-store");
+
+        input1.leftJoin(input2, VALUE_JOINER).foreach(new CountDownAction(latch));
+
+        return new KafkaStreams(builder, streamConfig);
+    }
+
+    private KafkaStreams createKafkaStreamsKTableKTableJoin(Properties streamConfig, String
kTableTopic1,
+                                                            String kTableTopic2, final CountDownLatch
latch) {
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        final KTable<Long, byte[]> input1 = builder.table(kTableTopic1, kTableTopic1
+ "-store");
+        final KTable<Long, byte[]> input2 = builder.table(kTableTopic2, kTableTopic2
+ "-store");
+
+        input1.leftJoin(input2, VALUE_JOINER).foreach(new CountDownAction(latch));
+
+        return new KafkaStreams(builder, streamConfig);
+    }
+
+    private KafkaStreams createKafkaStreamsKStreamKStreamJoin(Properties streamConfig, String
kStreamTopic1,
+                                                              String kStreamTopic2, final
CountDownLatch latch) {
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        final KStream<Long, byte[]> input1 = builder.stream(kStreamTopic1);
+        final KStream<Long, byte[]> input2 = builder.stream(kStreamTopic2);
+        final long timeDifferenceMs = 10000L;
 
-    private KafkaStreams createKafkaStreamsWithStateStore(File stateDir, String kafka, String
zookeeper, final CountDownLatch latch) {
+        input1.leftJoin(input2, VALUE_JOINER, JoinWindows.of(timeDifferenceMs)).foreach(new
CountDownAction(latch));
+
+        return new KafkaStreams(builder, streamConfig);
+    }
+
+    private KafkaStreams createKafkaStreamsWithStateStore(String topic, File stateDir, String
kafka, String zookeeper,
+                                                          final CountDownLatch latch) {
         Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams-with-store");
         props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
@@ -365,28 +566,28 @@ public class SimpleBenchmark {
 
         KStreamBuilder builder = new KStreamBuilder();
 
-        builder.addStateStore(Stores.create("store").withLongKeys().withByteArrayValues().persistent().build());
+        builder.addStateStore(Stores.create("store").withIntegerKeys().withByteArrayValues().persistent().build());
 
-        KStream<Long, byte[]> source = builder.stream(LONG_SERDE, BYTE_SERDE, SOURCE_TOPIC);
+        KStream<Integer, byte[]> source = builder.stream(INTEGER_SERDE, BYTE_SERDE,
topic);
 
-        source.process(new ProcessorSupplier<Long, byte[]>() {
+        source.process(new ProcessorSupplier<Integer, byte[]>() {
             @Override
-            public Processor<Long, byte[]> get() {
-                return new Processor<Long, byte[]>() {
+            public Processor<Integer, byte[]> get() {
+                return new Processor<Integer, byte[]>() {
 
-                    KeyValueStore<Long, byte[]> store;
+                    KeyValueStore<Integer, byte[]> store;
 
                     @SuppressWarnings("unchecked")
                     @Override
                     public void init(ProcessorContext context) {
-                        store = (KeyValueStore<Long, byte[]>) context.getStateStore("store");
+                        store = (KeyValueStore<Integer, byte[]>) context.getStateStore("store");
                     }
 
                     @Override
-                    public void process(Long key, byte[] value) {
+                    public void process(Integer key, byte[] value) {
                         store.put(key, value);
 
-                        if (END_KEY.equals(key)) {
+                        if (endKey.equals(key)) {
                             latch.countDown();
                         }
                     }
@@ -406,7 +607,11 @@ public class SimpleBenchmark {
     }
 
     private double megaBytePerSec(long time) {
-        return (double) (RECORD_SIZE * NUM_RECORDS / 1024 / 1024) / ((double) time / 1000);
+        return (double) (RECORD_SIZE * numRecords / 1024 / 1024) / ((double) time / 1000);
+    }
+
+    private double megaBytePerSec(long time, int numRecords, int recordSizeBytes) {
+        return (double) (recordSizeBytes * numRecords / 1024 / 1024) / ((double) time / 1000);
     }
 
     private List<TopicPartition> getAllPartitions(KafkaConsumer<?, ?> consumer,
String... topics) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5d26c48/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
index 5eb2663..de687e6 100644
--- a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
+++ b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
@@ -27,7 +27,7 @@ class StreamsSimpleBenchmarkTest(KafkaTest):
     def __init__(self, test_context):
         super(StreamsSimpleBenchmarkTest, self).__init__(test_context, num_zk=1, num_brokers=1)
 
-        self.driver = StreamsSimpleBenchmarkService(test_context, self.kafka)
+        self.driver = StreamsSimpleBenchmarkService(test_context, self.kafka, 1000000L)
 
     def test_simple_benchmark(self):
         """

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5d26c48/tests/kafkatest/services/performance/streams_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/streams_performance.py b/tests/kafkatest/services/performance/streams_performance.py
index 289bccb..b7d6b89 100644
--- a/tests/kafkatest/services/performance/streams_performance.py
+++ b/tests/kafkatest/services/performance/streams_performance.py
@@ -47,9 +47,10 @@ class StreamsSimpleBenchmarkService(KafkaPathResolverMixin, Service):
             "collect_default": True},
     }
 
-    def __init__(self, context, kafka):
+    def __init__(self, context, kafka, numrecs):
         super(StreamsSimpleBenchmarkService, self).__init__(context, 1)
         self.kafka = kafka
+        self.numrecs = numrecs
 
     @property
     def node(self):
@@ -88,6 +89,7 @@ class StreamsSimpleBenchmarkService(KafkaPathResolverMixin, Service):
         args['kafka'] = self.kafka.bootstrap_servers()
         args['zk'] = self.kafka.zk.connect_setting()
         args['state_dir'] = self.PERSISTENT_ROOT
+        args['numrecs'] = self.numrecs
         args['stdout'] = self.STDOUT_FILE
         args['stderr'] = self.STDERR_FILE
         args['pidfile'] = self.PID_FILE
@@ -96,7 +98,7 @@ class StreamsSimpleBenchmarkService(KafkaPathResolverMixin, Service):
 
         cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
               "INCLUDE_TEST_JARS=true %(kafka_run_class)s org.apache.kafka.streams.perf.SimpleBenchmark
" \
-              " %(kafka)s %(zk)s %(state_dir)s " \
+              " %(kafka)s %(zk)s %(state_dir)s %(numrecs)s " \
               " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3>
%(pidfile)s" % args
 
         return cmd


Mime
View raw message