kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject [07/50] [abbrv] kafka git commit: MINOR: a simple benchmark for Streams
Date Mon, 11 Apr 2016 23:09:23 GMT
MINOR: a simple benchmark for Streams

guozhangwang miguno

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1164 from ymatsuda/perf


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2788f2dc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2788f2dc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2788f2dc

Branch: refs/heads/0.10.0
Commit: 2788f2dc73dd62c35c87842d02d23a0ed6a62e97
Parents: aaafd23
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Authored: Wed Mar 30 14:26:01 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Mar 30 14:26:01 2016 -0700

----------------------------------------------------------------------
 .../streams/processor/internals/StreamTask.java |   4 +-
 .../kafka/streams/perf/SimpleBenchmark.java     | 413 +++++++++++++++++++
 2 files changed, 415 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2788f2dc/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index c4cc2ba..afa303c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -167,11 +167,11 @@ public class StreamTask extends AbstractTask implements Punctuator {
                 this.currNode = recordInfo.node();
                 TopicPartition partition = recordInfo.partition();
 
-                log.debug("Start processing one record [" + currRecord + "]");
+                log.debug("Start processing one record [{}]", currRecord);
 
                 this.currNode.process(currRecord.key(), currRecord.value());
 
-                log.debug("Completed processing one record [" + currRecord + "]");
+                log.debug("Completed processing one record [{}]", currRecord);
 
                 // update the consumed offset map after processing is done
                 consumedOffsets.put(partition, currRecord.offset());

http://git-wip-us.apache.org/repos/asf/kafka/blob/2788f2dc/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
new file mode 100644
index 0000000..c883090
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -0,0 +1,413 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.perf;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.Properties;
+
+public class SimpleBenchmark {
+
+    private final String kafka;
+    private final String zookeeper;
+    private final File stateDir;
+
+    private static final String SOURCE_TOPIC = "simpleBenchmarkSourceTopic";
+    private static final String SINK_TOPIC = "simpleBenchmarkSinkTopic";
+
+    private static final long NUM_RECORDS = 10000000L;
+    private static final Long END_KEY = NUM_RECORDS - 1;
+    private static final int KEY_SIZE = 8;
+    private static final int VALUE_SIZE = 100;
+    private static final int RECORD_SIZE = KEY_SIZE + VALUE_SIZE;
+
+    private static final Serde<byte[]> BYTE_SERDE = Serdes.ByteArray();
+    private static final Serde<Long> LONG_SERDE = Serdes.Long();
+
+    public SimpleBenchmark(File stateDir, String kafka, String zookeeper) {
+        super();
+        this.stateDir = stateDir;
+        this.kafka = kafka;
+        this.zookeeper = zookeeper;
+    }
+
+    public static void main(String[] args) throws Exception {
+        final File stateDir = new File("/tmp/kafka-streams-simple-benchmark");
+        stateDir.mkdir();
+
+        final File rocksdbDir = new File(stateDir, "rocksdb-test");
+        rocksdbDir.mkdir();
+
+
+        final String kafka = "localhost:9092";
+        final String zookeeper = "localhost:2181";
+
+        SimpleBenchmark benchmark = new SimpleBenchmark(stateDir, kafka, zookeeper);
+
+        // producer performance
+        benchmark.produce();
+        // consumer performance
+        benchmark.consume();
+        // simple stream performance source->process
+        benchmark.processStream();
+        // simple stream performance source->sink
+        benchmark.processStreamWithSink();
+        // simple stream performance source->store
+        benchmark.processStreamWithStateStore();
+    }
+
+    public void processStream() {
+        CountDownLatch latch = new CountDownLatch(1);
+
+        final KafkaStreams streams = createKafkaStreams(stateDir, kafka, zookeeper, latch);
+
+        Thread thread = new Thread() {
+            public void run() {
+                streams.start();
+            }
+        };
+        thread.start();
+
+        long startTime = System.currentTimeMillis();
+
+        while (latch.getCount() > 0) {
+            try {
+                latch.await();
+            } catch (InterruptedException ex) {
+                Thread.interrupted();
+            }
+        }
+
+        long endTime = System.currentTimeMillis();
+
+        System.out.println("Streams Performance [MB/sec read]: " + megaBytePerSec(endTime
- startTime));
+
+        streams.close();
+        try {
+            thread.join();
+        } catch (Exception ex) {
+            // ignore
+        }
+    }
+
+    public void processStreamWithSink() {
+        CountDownLatch latch = new CountDownLatch(1);
+
+        final KafkaStreams streams = createKafkaStreamsWithSink(stateDir, kafka, zookeeper,
latch);
+
+        Thread thread = new Thread() {
+            public void run() {
+                streams.start();
+            }
+        };
+        thread.start();
+
+        long startTime = System.currentTimeMillis();
+
+        while (latch.getCount() > 0) {
+            try {
+                latch.await();
+            } catch (InterruptedException ex) {
+                Thread.interrupted();
+            }
+        }
+
+        long endTime = System.currentTimeMillis();
+
+        System.out.println("Streams Performance [MB/sec read+write]: " + megaBytePerSec(endTime
- startTime));
+
+        streams.close();
+        try {
+            thread.join();
+        } catch (Exception ex) {
+            // ignore
+        }
+    }
+
+    public void processStreamWithStateStore() {
+        CountDownLatch latch = new CountDownLatch(1);
+
+        final KafkaStreams streams = createKafkaStreamsWithStateStore(stateDir, kafka, zookeeper,
latch);
+
+        Thread thread = new Thread() {
+            public void run() {
+                streams.start();
+            }
+        };
+        thread.start();
+
+        long startTime = System.currentTimeMillis();
+
+        while (latch.getCount() > 0) {
+            try {
+                latch.await();
+            } catch (InterruptedException ex) {
+                Thread.interrupted();
+            }
+        }
+
+        long endTime = System.currentTimeMillis();
+
+        System.out.println("Streams Performance [MB/sec read+store]: " + megaBytePerSec(endTime
- startTime));
+
+        streams.close();
+        try {
+            thread.join();
+        } catch (Exception ex) {
+            // ignore
+        }
+    }
+
+    public void produce() {
+        Properties props = new Properties();
+        props.put(ProducerConfig.CLIENT_ID_CONFIG, "simple-benchmark-produce");
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+
+        KafkaProducer<Long, byte[]> producer = new KafkaProducer<>(props);
+
+        byte[] value = new byte[VALUE_SIZE];
+        long startTime = System.currentTimeMillis();
+
+        for (int i = 0; i < NUM_RECORDS; i++) {
+            producer.send(new ProducerRecord<>(SOURCE_TOPIC, (long) i, value));
+        }
+        producer.close();
+
+        long endTime = System.currentTimeMillis();
+
+        System.out.println("Producer Performance [MB/sec write]: " + megaBytePerSec(endTime
- startTime));
+    }
+
+    public void consume() {
+        Properties props = new Properties();
+        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "simple-benchmark-consumer");
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+
+        KafkaConsumer<Long, byte[]> consumer = new KafkaConsumer<>(props);
+
+        List<TopicPartition> partitions = getAllPartitions(consumer, SOURCE_TOPIC);
+        consumer.assign(partitions);
+        consumer.seekToBeginning(partitions);
+
+        Long key = null;
+
+        long startTime = System.currentTimeMillis();
+
+        while (true) {
+            ConsumerRecords<Long, byte[]> records = consumer.poll(500);
+            if (records.isEmpty()) {
+                if (END_KEY.equals(key))
+                    break;
+            } else {
+                for (ConsumerRecord<Long, byte[]> record : records) {
+                    key = record.key();
+                }
+            }
+        }
+
+        long endTime = System.currentTimeMillis();
+
+        System.out.println("Consumer Performance [MB/sec read]: " + megaBytePerSec(endTime
- startTime));
+    }
+
+    private KafkaStreams createKafkaStreams(File stateDir, String kafka, String zookeeper,
final CountDownLatch latch) {
+        Properties props = new Properties();
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams");
+        props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        KStreamBuilder builder = new KStreamBuilder();
+
+        KStream<Long, byte[]> source = builder.stream(LONG_SERDE, BYTE_SERDE, SOURCE_TOPIC);
+
+        source.process(new ProcessorSupplier<Long, byte[]>() {
+            @Override
+            public Processor<Long, byte[]> get() {
+                return new Processor<Long, byte[]>() {
+
+                    @Override
+                    public void init(ProcessorContext context) {
+                    }
+
+                    @Override
+                    public void process(Long key, byte[] value) {
+                        if (END_KEY.equals(key)) {
+                            latch.countDown();
+                        }
+                    }
+
+                    @Override
+                    public void punctuate(long timestamp) {
+                    }
+
+                    @Override
+                    public void close() {
+                    }
+                };
+            }
+        });
+
+        return new KafkaStreams(builder, props);
+    }
+
+    private KafkaStreams createKafkaStreamsWithSink(File stateDir, String kafka, String zookeeper,
final CountDownLatch latch) {
+        Properties props = new Properties();
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams-with-sink");
+        props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        KStreamBuilder builder = new KStreamBuilder();
+
+        KStream<Long, byte[]> source = builder.stream(LONG_SERDE, BYTE_SERDE, SOURCE_TOPIC);
+
+        source.to(LONG_SERDE, BYTE_SERDE, SINK_TOPIC);
+        source.process(new ProcessorSupplier<Long, byte[]>() {
+            @Override
+            public Processor<Long, byte[]> get() {
+                return new Processor<Long, byte[]>() {
+
+                    @Override
+                    public void init(ProcessorContext context) {
+                    }
+
+                    @Override
+                    public void process(Long key, byte[] value) {
+                        if (END_KEY.equals(key)) {
+                            latch.countDown();
+                        }
+                    }
+
+                    @Override
+                    public void punctuate(long timestamp) {
+                    }
+
+                    @Override
+                    public void close() {
+                    }
+                };
+            }
+        });
+
+        return new KafkaStreams(builder, props);
+    }
+
+
+    private KafkaStreams createKafkaStreamsWithStateStore(File stateDir, String kafka, String
zookeeper, final CountDownLatch latch) {
+        Properties props = new Properties();
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams-with-store");
+        props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        KStreamBuilder builder = new KStreamBuilder();
+
+        builder.addStateStore(Stores.create("store").withLongKeys().withByteArrayValues().persistent().build());
+
+        KStream<Long, byte[]> source = builder.stream(LONG_SERDE, BYTE_SERDE, SOURCE_TOPIC);
+
+        source.process(new ProcessorSupplier<Long, byte[]>() {
+            @Override
+            public Processor<Long, byte[]> get() {
+                return new Processor<Long, byte[]>() {
+
+                    KeyValueStore<Long, byte[]> store;
+
+                    @SuppressWarnings("unchecked")
+                    @Override
+                    public void init(ProcessorContext context) {
+                        store = (KeyValueStore<Long, byte[]>) context.getStateStore("store");
+                    }
+
+                    @Override
+                    public void process(Long key, byte[] value) {
+                        store.put(key, value);
+
+                        if (END_KEY.equals(key)) {
+                            latch.countDown();
+                        }
+                    }
+
+                    @Override
+                    public void punctuate(long timestamp) {
+                    }
+
+                    @Override
+                    public void close() {
+                    }
+                };
+            }
+        }, "store");
+
+        return new KafkaStreams(builder, props);
+    }
+
+    private double megaBytePerSec(long time) {
+        return (double) (RECORD_SIZE * NUM_RECORDS / 1024 / 1024) / ((double) time / 1000);
+    }
+
+    private List<TopicPartition> getAllPartitions(KafkaConsumer<?, ?> consumer,
String... topics) {
+        ArrayList<TopicPartition> partitions = new ArrayList<>();
+
+        for (String topic : topics) {
+            for (PartitionInfo info : consumer.partitionsFor(topic)) {
+                partitions.add(new TopicPartition(info.topic(), info.partition()));
+            }
+        }
+        return partitions;
+    }
+
+}


Mime
View raw message