kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vvcep...@apache.org
Subject [kafka] branch 2.4 updated: MINOR: remove stream simple benchmark suite (#8353)
Date Mon, 05 Oct 2020 20:33:41 GMT
This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new b165385  MINOR: remove stream simple benchmark suite (#8353)
b165385 is described below

commit b16538553f90904472c41e549aeee821501f592e
Author: Boyang Chen <boyang@confluent.io>
AuthorDate: Tue Apr 14 09:49:03 2020 -0700

    MINOR: remove stream simple benchmark suite (#8353)
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>
---
 .../apache/kafka/streams/perf/SimpleBenchmark.java | 752 ---------------------
 .../apache/kafka/streams/perf/YahooBenchmark.java  | 306 ---------
 tests/kafkatest/benchmarks/streams/__init__.py     |  14 -
 .../streams/streams_simple_benchmark_test.py       | 164 -----
 .../services/performance/streams_performance.py    | 108 ---
 5 files changed, 1344 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
deleted file mode 100644
index 4a14b87..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ /dev/null
@@ -1,752 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.perf;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.kstream.JoinWindows;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.TimeWindows;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.state.WindowStore;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Properties;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import static java.time.Duration.ofMillis;
-import static java.time.Duration.ofSeconds;
-import static java.time.Instant.ofEpochMilli;
-
-/**
- * Class that provides support for a series of benchmarks. It is usually driven by
- * tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py.
- * If ran manually through the main() function below, you must do the following:
- * 1. Have ZK and a Kafka broker set up
- * 2. Run the loading step first: SimpleBenchmark localhost:9092 /tmp/statedir numRecords true "all"
- * 3. Run the stream processing step second: SimpleBenchmark localhost:9092 /tmp/statedir numRecords false "all"
- * Note that what changed is the 4th parameter, from "true" indicating that is a load phase, to "false" indicating
- * that this is a real run.
- *
- * Note that "all" is a convenience option when running this test locally and will not work when running the test
- * at scale (through tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py). That is due to exact syncronization
- * needs for each test (e.g., you wouldn't want one instance to run "count" while another
- * is still running "consume"
- */
-public class SimpleBenchmark {
-    private static final String LOADING_PRODUCER_CLIENT_ID = "simple-benchmark-loading-producer";
-
-    private static final String SOURCE_TOPIC_ONE = "simpleBenchmarkSourceTopic1";
-    private static final String SOURCE_TOPIC_TWO = "simpleBenchmarkSourceTopic2";
-    private static final String SINK_TOPIC = "simpleBenchmarkSinkTopic";
-
-    private static final String YAHOO_CAMPAIGNS_TOPIC = "yahooCampaigns";
-    private static final String YAHOO_EVENTS_TOPIC = "yahooEvents";
-
-    private static final ValueJoiner<byte[], byte[], byte[]> VALUE_JOINER = new ValueJoiner<byte[], byte[], byte[]>() {
-        @Override
-        public byte[] apply(final byte[] value1, final byte[] value2) {
-            // dump joiner in order to have as less join overhead as possible
-            if (value1 != null) {
-                return value1;
-            } else if (value2 != null) {
-                return value2;
-            } else {
-                return new byte[100];
-            }
-        }
-    };
-
-    private static final Serde<byte[]> BYTE_SERDE = Serdes.ByteArray();
-    private static final Serde<Integer> INTEGER_SERDE = Serdes.Integer();
-
-    long processedBytes = 0L;
-    int processedRecords = 0;
-
-    private static final long POLL_MS = 500L;
-    private static final long COMMIT_INTERVAL_MS = 30000L;
-    private static final int MAX_POLL_RECORDS = 1000;
-
-    /* ----------- benchmark variables that are hard-coded ----------- */
-
-    private static final int KEY_SPACE_SIZE = 10000;
-
-    private static final long STREAM_STREAM_JOIN_WINDOW = 10000L;
-
-    private static final long AGGREGATE_WINDOW_SIZE = 1000L;
-
-    private static final long AGGREGATE_WINDOW_ADVANCE = 500L;
-
-    private static final int SOCKET_SIZE_BYTES = 1024 * 1024;
-
-    // the following numbers are based on empirical results and should only
-    // be considered for updates when perf results have significantly changed
-
-    // with at least 10 million records, we run for at most 3 minutes
-    private static final int MAX_WAIT_MS = 3 * 60 * 1000;
-
-    /* ----------- benchmark variables that can be specified ----------- */
-
-    final String testName;
-
-    final int numRecords;
-
-    final Properties props;
-
-    private final int valueSize;
-
-    private final double keySkew;
-
-    /* ----------- ----------------------------------------- ----------- */
-
-
-    private SimpleBenchmark(final Properties props,
-                            final String testName,
-                            final int numRecords,
-                            final double keySkew,
-                            final int valueSize) {
-        super();
-        this.props = props;
-        this.testName = testName;
-        this.keySkew = keySkew;
-        this.valueSize = valueSize;
-        this.numRecords = numRecords;
-    }
-
-    private void run() {
-        switch (testName) {
-            // loading phases
-            case "load-one":
-                produce(LOADING_PRODUCER_CLIENT_ID, SOURCE_TOPIC_ONE, numRecords, keySkew, valueSize);
-                break;
-            case "load-two":
-                produce(LOADING_PRODUCER_CLIENT_ID, SOURCE_TOPIC_ONE, numRecords, keySkew, valueSize);
-                produce(LOADING_PRODUCER_CLIENT_ID, SOURCE_TOPIC_TWO, numRecords, keySkew, valueSize);
-                break;
-
-            // testing phases
-            case "consume":
-                consume(SOURCE_TOPIC_ONE);
-                break;
-            case "consumeproduce":
-                consumeAndProduce(SOURCE_TOPIC_ONE);
-                break;
-            case "streamcount":
-                countStreamsNonWindowed(SOURCE_TOPIC_ONE);
-                break;
-            case "streamcountwindowed":
-                countStreamsWindowed(SOURCE_TOPIC_ONE);
-                break;
-            case "streamprocess":
-                processStream(SOURCE_TOPIC_ONE);
-                break;
-            case "streamprocesswithsink":
-                processStreamWithSink(SOURCE_TOPIC_ONE);
-                break;
-            case "streamprocesswithstatestore":
-                processStreamWithStateStore(SOURCE_TOPIC_ONE);
-                break;
-            case "streamprocesswithwindowstore":
-                processStreamWithWindowStore(SOURCE_TOPIC_ONE);
-                break;
-            case "streamtablejoin":
-                streamTableJoin(SOURCE_TOPIC_ONE, SOURCE_TOPIC_TWO);
-                break;
-            case "streamstreamjoin":
-                streamStreamJoin(SOURCE_TOPIC_ONE, SOURCE_TOPIC_TWO);
-                break;
-            case "tabletablejoin":
-                tableTableJoin(SOURCE_TOPIC_ONE, SOURCE_TOPIC_TWO);
-                break;
-            case "yahoo":
-                yahooBenchmark(YAHOO_CAMPAIGNS_TOPIC, YAHOO_EVENTS_TOPIC);
-                break;
-            default:
-                throw new RuntimeException("Unknown test name " + testName);
-
-        }
-    }
-
-    public static void main(final String[] args) throws IOException {
-        if (args.length < 5) {
-            System.err.println("Not enough parameters are provided; expecting propFileName, testName, numRecords, keySkew, valueSize");
-            System.exit(1);
-        }
-
-        final String propFileName = args[0];
-        final String testName = args[1].toLowerCase(Locale.ROOT);
-        final int numRecords = Integer.parseInt(args[2]);
-        final double keySkew = Double.parseDouble(args[3]); // 0d means even distribution
-        final int valueSize = Integer.parseInt(args[4]);
-
-        final Properties props = Utils.loadProps(propFileName);
-        final String kafka = props.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
-
-        if (kafka == null) {
-            System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
-            System.exit(1);
-        }
-
-        // Note: this output is needed for automated tests and must not be removed
-        System.out.println("StreamsTest instance started");
-
-        System.out.println("testName=" + testName);
-        System.out.println("streamsProperties=" + props);
-        System.out.println("numRecords=" + numRecords);
-        System.out.println("keySkew=" + keySkew);
-        System.out.println("valueSize=" + valueSize);
-
-        final SimpleBenchmark benchmark = new SimpleBenchmark(props, testName, numRecords, keySkew, valueSize);
-
-        benchmark.run();
-    }
-
-    public void setStreamProperties(final String applicationId) {
-        props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
-        props.put(StreamsConfig.CLIENT_ID_CONFIG, "simple-benchmark");
-        props.put(StreamsConfig.POLL_MS_CONFIG, POLL_MS);
-        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
-        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
-        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass());
-        // the socket buffer needs to be large, especially when running in AWS with
-        // high latency. if running locally the default is fine.
-        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
-        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
-
-        // improve producer throughput
-        props.put(ProducerConfig.LINGER_MS_CONFIG, 5000);
-        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 128 * 1024);
-    }
-
-    private Properties setProduceConsumeProperties(final String clientId) {
-        final Properties clientProps = new Properties();
-        clientProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, props.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG));
-        clientProps.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
-        // the socket buffer needs to be large, especially when running in AWS with
-        // high latency. if running locally the default is fine.
-        clientProps.put(ProducerConfig.LINGER_MS_CONFIG, 5000);
-        clientProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 128 * 1024);
-        clientProps.put(ProducerConfig.SEND_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
-        clientProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
-        clientProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
-        clientProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
-        clientProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
-        clientProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-        // the socket buffer needs to be large, especially when running in AWS with
-        // high latency. if running locally the default is fine.
-        clientProps.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
-        clientProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
-        return clientProps;
-    }
-
-    void resetStats() {
-        processedRecords = 0;
-        processedBytes = 0L;
-    }
-
-    /**
-     * Produce values to a topic
-     * @param clientId String specifying client ID
-     * @param topic Topic to produce to
-     * @param numRecords Number of records to produce
-     * @param keySkew Key zipf distribution skewness
-     * @param valueSize Size of value in bytes
-     */
-    private void produce(final String clientId,
-                         final String topic,
-                         final int numRecords,
-                         final double keySkew,
-                         final int valueSize) {
-        final Properties props = setProduceConsumeProperties(clientId);
-        final ZipfGenerator keyGen = new ZipfGenerator(KEY_SPACE_SIZE, keySkew);
-
-        try (final KafkaProducer<Integer, byte[]> producer = new KafkaProducer<>(props)) {
-            final byte[] value = new byte[valueSize];
-            // put some random values to increase entropy. Some devices
-            // like SSDs do compression and if the array is all zeros
-            // the performance will be too good.
-            new Random(System.currentTimeMillis()).nextBytes(value);
-
-            for (int i = 0; i < numRecords; i++) {
-                producer.send(new ProducerRecord<>(topic, keyGen.next(), value));
-            }
-        }
-    }
-
-    private void consumeAndProduce(final String topic) {
-        final Properties consumerProps = setProduceConsumeProperties("simple-benchmark-consumer");
-        final Properties producerProps = setProduceConsumeProperties("simple-benchmark-producer");
-
-        final long startTime = System.currentTimeMillis();
-        try (final KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps);
-             final KafkaProducer<Integer, byte[]> producer = new KafkaProducer<>(producerProps)) {
-            final List<TopicPartition> partitions = getAllPartitions(consumer, topic);
-
-            consumer.assign(partitions);
-            consumer.seekToBeginning(partitions);
-
-            while (true) {
-                final ConsumerRecords<Integer, byte[]> records = consumer.poll(ofMillis(POLL_MS));
-                if (records.isEmpty()) {
-                    if (processedRecords == numRecords) {
-                        break;
-                    }
-                } else {
-                    for (final ConsumerRecord<Integer, byte[]> record : records) {
-                        producer.send(new ProducerRecord<>(SINK_TOPIC, record.key(), record.value()));
-                        processedRecords++;
-                        processedBytes += record.value().length + Integer.SIZE;
-                        if (processedRecords == numRecords) {
-                            break;
-                        }
-                    }
-                }
-                if (processedRecords == numRecords) {
-                    break;
-                }
-            }
-        }
-
-        final long endTime = System.currentTimeMillis();
-
-        printResults("ConsumerProducer Performance [records/latency/rec-sec/MB-sec read]: ", endTime - startTime);
-    }
-
-    private void consume(final String topic) {
-        final Properties consumerProps = setProduceConsumeProperties("simple-benchmark-consumer");
-
-        final long startTime = System.currentTimeMillis();
-
-        try (final KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps)) {
-            final List<TopicPartition> partitions = getAllPartitions(consumer, topic);
-
-            consumer.assign(partitions);
-            consumer.seekToBeginning(partitions);
-
-            while (true) {
-                final ConsumerRecords<Integer, byte[]> records = consumer.poll(ofMillis(POLL_MS));
-                if (records.isEmpty()) {
-                    if (processedRecords == numRecords) {
-                        break;
-                    }
-                } else {
-                    for (final ConsumerRecord<Integer, byte[]> record : records) {
-                        processedRecords++;
-                        processedBytes += record.value().length + Integer.SIZE;
-                        if (processedRecords == numRecords) {
-                            break;
-                        }
-                    }
-                }
-                if (processedRecords == numRecords) {
-                    break;
-                }
-            }
-        }
-
-        final long endTime = System.currentTimeMillis();
-
-        printResults("Consumer Performance [records/latency/rec-sec/MB-sec read]: ", endTime - startTime);
-    }
-
-    private void processStream(final String topic) {
-        final CountDownLatch latch = new CountDownLatch(1);
-
-        setStreamProperties("simple-benchmark-streams-source");
-
-        final StreamsBuilder builder = new StreamsBuilder();
-
-        builder.stream(topic, Consumed.with(INTEGER_SERDE, BYTE_SERDE)).peek(new CountDownAction(latch));
-
-        final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
-        runGenericBenchmark(streams, "Streams Source Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
-    }
-
-    private void processStreamWithSink(final String topic) {
-        final CountDownLatch latch = new CountDownLatch(1);
-
-        setStreamProperties("simple-benchmark-streams-source-sink");
-
-        final StreamsBuilder builder = new StreamsBuilder();
-
-        final KStream<Integer, byte[]> source = builder.stream(topic);
-        source.peek(new CountDownAction(latch)).to(SINK_TOPIC);
-
-        final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
-        runGenericBenchmark(streams, "Streams SourceSink Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
-    }
-
-    private void processStreamWithStateStore(final String topic) {
-        final CountDownLatch latch = new CountDownLatch(1);
-
-        setStreamProperties("simple-benchmark-streams-with-store");
-
-        final StreamsBuilder builder = new StreamsBuilder();
-        final StoreBuilder<KeyValueStore<Integer, byte[]>> storeBuilder =
-            Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("store"), INTEGER_SERDE, BYTE_SERDE);
-        builder.addStateStore(storeBuilder.withCachingEnabled());
-
-        final KStream<Integer, byte[]> source = builder.stream(topic);
-
-        source.peek(new CountDownAction(latch)).process(new ProcessorSupplier<Integer, byte[]>() {
-            @Override
-            public Processor<Integer, byte[]> get() {
-                return new AbstractProcessor<Integer, byte[]>() {
-                    KeyValueStore<Integer, byte[]> store;
-
-                    @SuppressWarnings("unchecked")
-                    @Override
-                    public void init(final ProcessorContext context) {
-                        super.init(context);
-                        store = (KeyValueStore<Integer, byte[]>) context.getStateStore("store");
-                    }
-
-                    @Override
-                    public void process(final Integer key, final byte[] value) {
-                        store.get(key);
-                        store.put(key, value);
-                    }
-                };
-            }
-        }, "store");
-
-        final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
-        runGenericBenchmark(streams, "Streams Stateful Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
-    }
-
-    private void processStreamWithWindowStore(final String topic) {
-        final CountDownLatch latch = new CountDownLatch(1);
-
-        setStreamProperties("simple-benchmark-streams-with-store");
-
-        final StreamsBuilder builder = new StreamsBuilder();
-
-        final StoreBuilder<WindowStore<Integer, byte[]>> storeBuilder = Stores.windowStoreBuilder(
-            Stores.persistentWindowStore(
-                "store",
-                ofMillis(AGGREGATE_WINDOW_SIZE * 3),
-                ofMillis(AGGREGATE_WINDOW_SIZE),
-                false
-            ),
-            INTEGER_SERDE,
-            BYTE_SERDE
-        );
-        builder.addStateStore(storeBuilder.withCachingEnabled());
-
-        final KStream<Integer, byte[]> source = builder.stream(topic);
-
-        source.peek(new CountDownAction(latch)).process(new ProcessorSupplier<Integer, byte[]>() {
-            @Override
-            public Processor<Integer, byte[]> get() {
-                return new AbstractProcessor<Integer, byte[]>() {
-                    WindowStore<Integer, byte[]> store;
-
-                    @SuppressWarnings("unchecked")
-                    @Override
-                    public void init(final ProcessorContext context) {
-                        super.init(context);
-                        store = (WindowStore<Integer, byte[]>) context.getStateStore("store");
-                    }
-
-                    @Override
-                    public void process(final Integer key, final byte[] value) {
-                        final long timestamp = context().timestamp();
-                        final KeyValueIterator<Windowed<Integer>, byte[]> iter = store.fetch(key - 10, key + 10, ofEpochMilli(timestamp - 1000L), ofEpochMilli(timestamp));
-                        while (iter.hasNext()) {
-                            iter.next();
-                        }
-                        iter.close();
-
-                        store.put(key, value, timestamp);
-                    }
-                };
-            }
-        }, "store");
-
-        final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
-        runGenericBenchmark(streams, "Streams Stateful Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
-    }
-
-    /**
-     * Measure the performance of a simple aggregate like count.
-     * Counts the occurrence of numbers (note that normally people count words, this
-     * example counts numbers)
-     */
-    private void countStreamsNonWindowed(final String sourceTopic) {
-        final CountDownLatch latch = new CountDownLatch(1);
-
-        setStreamProperties("simple-benchmark-nonwindowed-count");
-
-        final StreamsBuilder builder = new StreamsBuilder();
-        final KStream<Integer, byte[]> input = builder.stream(sourceTopic);
-
-        input.peek(new CountDownAction(latch))
-                .groupByKey()
-                .count();
-
-        final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
-        runGenericBenchmark(streams, "Streams Count Performance [records/latency/rec-sec/MB-sec counted]: ", latch);
-    }
-
-    /**
-     * Measure the performance of a simple aggregate like count.
-     * Counts the occurrence of numbers (note that normally people count words, this
-     * example counts numbers)
-     */
-    private void countStreamsWindowed(final String sourceTopic) {
-        final CountDownLatch latch = new CountDownLatch(1);
-
-        setStreamProperties("simple-benchmark-windowed-count");
-
-        final StreamsBuilder builder = new StreamsBuilder();
-        final KStream<Integer, byte[]> input = builder.stream(sourceTopic);
-
-        input.peek(new CountDownAction(latch))
-                .groupByKey()
-                .windowedBy(TimeWindows.of(ofMillis(AGGREGATE_WINDOW_SIZE)).advanceBy(ofMillis(AGGREGATE_WINDOW_ADVANCE)))
-                .count();
-
-        final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
-        runGenericBenchmark(streams, "Streams Count Windowed Performance [records/latency/rec-sec/MB-sec counted]: ", latch);
-    }
-
-    /**
-     * Measure the performance of a KStream-KTable left join. The setup is such that each
-     * KStream record joins to exactly one element in the KTable
-     */
-    private void streamTableJoin(final String kStreamTopic, final String kTableTopic) {
-        final CountDownLatch latch = new CountDownLatch(1);
-
-        setStreamProperties("simple-benchmark-stream-table-join");
-
-        final StreamsBuilder builder = new StreamsBuilder();
-
-        final KStream<Integer, byte[]> input1 = builder.stream(kStreamTopic);
-        final KTable<Integer, byte[]> input2 = builder.table(kTableTopic);
-
-        input1.leftJoin(input2, VALUE_JOINER).foreach(new CountDownAction(latch));
-
-        final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
-
-        // run benchmark
-        runGenericBenchmark(streams, "Streams KStreamKTable LeftJoin Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
-    }
-
-    /**
-     * Measure the performance of a KStream-KStream left join. The setup is such that each
-     * KStream record joins to exactly one element in the other KStream
-     */
-    private void streamStreamJoin(final String kStreamTopic1, final String kStreamTopic2) {
-        final CountDownLatch latch = new CountDownLatch(1);
-
-        setStreamProperties("simple-benchmark-stream-stream-join");
-
-        final StreamsBuilder builder = new StreamsBuilder();
-
-        final KStream<Integer, byte[]> input1 = builder.stream(kStreamTopic1);
-        final KStream<Integer, byte[]> input2 = builder.stream(kStreamTopic2);
-
-        input1.leftJoin(input2, VALUE_JOINER, JoinWindows.of(ofMillis(STREAM_STREAM_JOIN_WINDOW))).foreach(new CountDownAction(latch));
-
-        final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
-
-        // run benchmark
-        runGenericBenchmark(streams, "Streams KStreamKStream LeftJoin Performance [records/latency/rec-sec/MB-sec  joined]: ", latch);
-    }
-
-    /**
-     * Measure the performance of a KTable-KTable left join. The setup is such that each
-     * KTable record joins to exactly one element in the other KTable
-     */
-    private void tableTableJoin(final String kTableTopic1, final String kTableTopic2) {
-        final CountDownLatch latch = new CountDownLatch(1);
-
-        // setup join
-        setStreamProperties("simple-benchmark-table-table-join");
-
-        final StreamsBuilder builder = new StreamsBuilder();
-
-        final KTable<Integer, byte[]> input1 = builder.table(kTableTopic1);
-        final KTable<Integer, byte[]> input2 = builder.table(kTableTopic2);
-
-        input1.leftJoin(input2, VALUE_JOINER).toStream().foreach(new CountDownAction(latch));
-
-        final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
-
-        // run benchmark
-        runGenericBenchmark(streams, "Streams KTableKTable LeftJoin Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
-    }
-
-    void printResults(final String nameOfBenchmark, final long latency) {
-        System.out.println(nameOfBenchmark +
-            processedRecords + "/" +
-            latency + "/" +
-            recordsPerSec(latency, processedRecords) + "/" +
-            megabytesPerSec(latency, processedBytes));
-    }
-
-    void runGenericBenchmark(final KafkaStreams streams, final String nameOfBenchmark, final CountDownLatch latch) {
-        streams.start();
-
-        final long startTime = System.currentTimeMillis();
-        long endTime = startTime;
-
-        while (latch.getCount() > 0 && (endTime - startTime < MAX_WAIT_MS)) {
-            try {
-                latch.await(1000, TimeUnit.MILLISECONDS);
-            } catch (final InterruptedException ex) {
-                Thread.interrupted();
-            }
-
-            endTime = System.currentTimeMillis();
-        }
-        streams.close();
-
-        printResults(nameOfBenchmark, endTime - startTime);
-    }
-
-    private class CountDownAction implements ForeachAction<Integer, byte[]> {
-        private final CountDownLatch latch;
-
-        CountDownAction(final CountDownLatch latch) {
-            this.latch = latch;
-        }
-
-        @Override
-        public void apply(final Integer key, final byte[] value) {
-            processedRecords++;
-            processedBytes += Integer.SIZE + value.length;
-
-            if (processedRecords == numRecords) {
-                this.latch.countDown();
-            }
-        }
-    }
-
-    private KafkaStreams createKafkaStreamsWithExceptionHandler(final StreamsBuilder builder, final Properties props) {
-        final KafkaStreams streamsClient = new KafkaStreams(builder.build(), props);
-        streamsClient.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
-            @Override
-            public void uncaughtException(final Thread t, final Throwable e) {
-                System.out.println("FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
-
-                streamsClient.close(ofSeconds(30));
-            }
-        });
-
-        return streamsClient;
-    }
-    
-    private double megabytesPerSec(final long time, final long processedBytes) {
-        return  (processedBytes / 1024.0 / 1024.0) / (time / 1000.0);
-    }
-
-    private double recordsPerSec(final long time, final int numRecords) {
-        return numRecords / (time / 1000.0);
-    }
-
-    private List<TopicPartition> getAllPartitions(final KafkaConsumer<?, ?> consumer, final String... topics) {
-        final ArrayList<TopicPartition> partitions = new ArrayList<>();
-
-        for (final String topic : topics) {
-            for (final PartitionInfo info : consumer.partitionsFor(topic)) {
-                partitions.add(new TopicPartition(info.topic(), info.partition()));
-            }
-        }
-        return partitions;
-    }
-
-    private void yahooBenchmark(final String campaignsTopic, final String eventsTopic) {
-        final YahooBenchmark benchmark = new YahooBenchmark(this, campaignsTopic, eventsTopic);
-
-        benchmark.run();
-    }
-
-    private class ZipfGenerator {
-        final private Random rand = new Random(System.currentTimeMillis());
-        final private int size;
-        final private double skew;
-
-        private double bottom = 0.0d;
-
-        ZipfGenerator(final int size, final double skew) {
-            this.size = size;
-            this.skew = skew;
-
-            for (int i = 1; i < size; i++) {
-                this.bottom += 1.0d / Math.pow(i, this.skew);
-            }
-        }
-
-        int next() {
-            if (skew == 0.0d) {
-                return rand.nextInt(size);
-            } else {
-                int rank;
-                double dice;
-                double frequency;
-
-                rank = rand.nextInt(size);
-                frequency = (1.0d / Math.pow(rank, this.skew)) / this.bottom;
-                dice = rand.nextDouble();
-
-                while (!(dice < frequency)) {
-                    rank = rand.nextInt(size);
-                    frequency = (1.0d / Math.pow(rank, this.skew)) / this.bottom;
-                    dice = rand.nextDouble();
-                }
-
-                return rank;
-            }
-        }
-    }
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
deleted file mode 100644
index 2cab626..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.perf;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.errors.SerializationException;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.Grouped;
-import org.apache.kafka.streams.kstream.Joined;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.TimeWindows;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-
-
-/**
- * A basic DSL and data generation that emulates the behavior of the Yahoo Benchmark
- * https://yahooeng.tumblr.com/post/135321837876/benchmarking-streaming-computation-engines-at
- * Thanks to Michael Armbrust for providing the initial code for this benchmark in his blog:
- * https://databricks.com/blog/2017/06/06/simple-super-fast-streaming-engine-apache-spark.html
- */
-public class YahooBenchmark {
-    private final SimpleBenchmark parent;
-    private final String campaignsTopic;
-    private final String eventsTopic;
-
-    static class ProjectedEvent {
-        /* attributes need to be public for serializer to work */
-        /* main attributes */
-        String eventType;
-        String adID;
-
-        /* other attributes */
-        long eventTime;
-        /* not used
-        public String userID = UUID.randomUUID().toString();
-        public String pageID = UUID.randomUUID().toString();
-        public String addType = "banner78";
-        public String ipAddress = "1.2.3.4";
-         */
-    }
-
-    static class CampaignAd {
-        /* attributes need to be public for serializer to work */
-        String adID;
-        String campaignID;
-    }
-
-    @SuppressWarnings("WeakerAccess")
-    public YahooBenchmark(final SimpleBenchmark parent, final String campaignsTopic, final String eventsTopic) {
-        this.parent = parent;
-        this.campaignsTopic = campaignsTopic;
-        this.eventsTopic = eventsTopic;
-    }
-
-    // just for Yahoo benchmark
-    private boolean maybeSetupPhaseCampaigns(final String topic,
-                                             final String clientId,
-                                             final boolean skipIfAllTests,
-                                             final int numCampaigns,
-                                             final int adsPerCampaign,
-                                             final List<String> ads) {
-        parent.resetStats();
-        // initialize topics
-        System.out.println("Initializing topic " + topic);
-
-        final Properties props = new Properties();
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, parent.props.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG));
-        props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-
-        try (final KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
-            for (int c = 0; c < numCampaigns; c++) {
-                final String campaignID = UUID.randomUUID().toString();
-                for (int a = 0; a < adsPerCampaign; a++) {
-                    final String adId = UUID.randomUUID().toString();
-                    final String concat = adId + ":" + campaignID;
-                    producer.send(new ProducerRecord<>(topic, adId, concat));
-                    ads.add(adId);
-                    parent.processedRecords++;
-                    parent.processedBytes += concat.length() + adId.length();
-                }
-            }
-        }
-        return true;
-    }
-
-    // just for Yahoo benchmark
-    private void maybeSetupPhaseEvents(final String topic,
-                                       final String clientId,
-                                       final int numRecords,
-                                       final List<String> ads) {
-        parent.resetStats();
-        final String[] eventTypes = new String[]{"view", "click", "purchase"};
-        final Random rand = new Random(System.currentTimeMillis());
-        System.out.println("Initializing topic " + topic);
-
-        final Properties props = new Properties();
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, parent.props.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG));
-        props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
-
-        final long startTime = System.currentTimeMillis();
-
-        try (final KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props)) {
-            final ProjectedEvent event = new ProjectedEvent();
-            final Map<String, Object> serdeProps = new HashMap<>();
-            final Serializer<ProjectedEvent> projectedEventSerializer = new JsonPOJOSerializer<>();
-            serdeProps.put("JsonPOJOClass", ProjectedEvent.class);
-            projectedEventSerializer.configure(serdeProps, false);
-
-            for (int i = 0; i < numRecords; i++) {
-                event.eventType = eventTypes[rand.nextInt(eventTypes.length - 1)];
-                event.adID = ads.get(rand.nextInt(ads.size() - 1));
-                event.eventTime = System.currentTimeMillis();
-                final byte[] value = projectedEventSerializer.serialize(topic, event);
-                producer.send(new ProducerRecord<>(topic, event.adID, value));
-                parent.processedRecords++;
-                parent.processedBytes += value.length + event.adID.length();
-            }
-        }
-
-        final long endTime = System.currentTimeMillis();
-
-        parent.printResults("Producer Performance [records/latency/rec-sec/MB-sec write]: ", endTime - startTime);
-    }
-
-
-    public void run() {
-        final int numCampaigns = 100;
-        final int adsPerCampaign = 10;
-
-        final List<String> ads = new ArrayList<>(numCampaigns * adsPerCampaign);
-        maybeSetupPhaseCampaigns(campaignsTopic, "simple-benchmark-produce-campaigns", false, numCampaigns, adsPerCampaign, ads);
-        maybeSetupPhaseEvents(eventsTopic, "simple-benchmark-produce-events", parent.numRecords, ads);
-
-        final CountDownLatch latch = new CountDownLatch(1);
-        parent.setStreamProperties("simple-benchmark-yahoo" + new Random().nextInt());
-
-        final KafkaStreams streams = createYahooBenchmarkStreams(parent.props, campaignsTopic, eventsTopic, latch, parent.numRecords);
-        parent.runGenericBenchmark(streams, "Streams Yahoo Performance [records/latency/rec-sec/MB-sec counted]: ", latch);
-
-    }
-    // Note: these are also in the streams example package, eventually use 1 file
-    private class JsonPOJOSerializer<T> implements Serializer<T> {
-        private final ObjectMapper objectMapper = new ObjectMapper();
-
-        /**
-         * Default constructor needed by Kafka
-         */
-        @SuppressWarnings("WeakerAccess")
-        public JsonPOJOSerializer() {}
-
-        @Override
-        public byte[] serialize(final String topic, final T data) {
-            if (data == null) {
-                return null;
-            }
-
-            try {
-                return objectMapper.writeValueAsBytes(data);
-            } catch (final Exception e) {
-                throw new SerializationException("Error serializing JSON message", e);
-            }
-        }
-    }
-
-    // Note: these are also in the streams example package, eventuall use 1 file
-    private class JsonPOJODeserializer<T> implements Deserializer<T> {
-        private final ObjectMapper objectMapper = new ObjectMapper();
-
-        private Class<T> tClass;
-
-        /**
-         * Default constructor needed by Kafka
-         */
-        @SuppressWarnings("WeakerAccess")
-        public JsonPOJODeserializer() {}
-
-        @SuppressWarnings("unchecked")
-        @Override
-        public void configure(final Map<String, ?> props, final boolean isKey) {
-            tClass = (Class<T>) props.get("JsonPOJOClass");
-        }
-
-        @Override
-        public T deserialize(final String topic, final byte[] bytes) {
-            if (bytes == null) {
-                return null;
-            }
-
-            final T data;
-            try {
-                data = objectMapper.readValue(bytes, tClass);
-            } catch (final Exception e) {
-                throw new SerializationException(e);
-            }
-
-            return data;
-        }
-    }
-
-    private KafkaStreams createYahooBenchmarkStreams(final Properties streamConfig, final String campaignsTopic, final String eventsTopic,
-                                                     final CountDownLatch latch, final int numRecords) {
-        final Map<String, Object> serdeProps = new HashMap<>();
-        final Serializer<ProjectedEvent> projectedEventSerializer = new JsonPOJOSerializer<>();
-        serdeProps.put("JsonPOJOClass", ProjectedEvent.class);
-        projectedEventSerializer.configure(serdeProps, false);
-        final Deserializer<ProjectedEvent> projectedEventDeserializer = new JsonPOJODeserializer<>();
-        serdeProps.put("JsonPOJOClass", ProjectedEvent.class);
-        projectedEventDeserializer.configure(serdeProps, false);
-
-        final StreamsBuilder builder = new StreamsBuilder();
-        final KStream<String, ProjectedEvent> kEvents = builder.stream(eventsTopic,
-                                                                       Consumed.with(Serdes.String(),
-                                                                                     Serdes.serdeFrom(projectedEventSerializer, projectedEventDeserializer)));
-        final KTable<String, String> kCampaigns = builder.table(campaignsTopic, Consumed.with(Serdes.String(), Serdes.String()));
-
-        final KStream<String, ProjectedEvent> filteredEvents = kEvents
-            // use peek to quick when last element is processed
-            .peek((key, value) -> {
-                parent.processedRecords++;
-                if (parent.processedRecords % 1000000 == 0) {
-                    System.out.println("Processed " + parent.processedRecords);
-                }
-                if (parent.processedRecords >= numRecords) {
-                    latch.countDown();
-                }
-            })
-            // only keep "view" events
-            .filter((key, value) -> value.eventType.equals("view"))
-            // select just a few of the columns
-            .mapValues(value -> {
-                final ProjectedEvent event = new ProjectedEvent();
-                event.adID = value.adID;
-                event.eventTime = value.eventTime;
-                event.eventType = value.eventType;
-                return event;
-            });
-
-        // deserialize the add ID and campaign ID from the stored value in Kafka
-        final KTable<String, CampaignAd> deserCampaigns = kCampaigns.mapValues(value -> {
-            final String[] parts = value.split(":");
-            final CampaignAd cAdd = new CampaignAd();
-            cAdd.adID = parts[0];
-            cAdd.campaignID = parts[1];
-            return cAdd;
-        });
-
-        // join the events with the campaigns
-        final KStream<String, String> joined = filteredEvents.join(
-            deserCampaigns,
-            (value1, value2) -> value2.campaignID,
-            Joined.with(Serdes.String(), Serdes.serdeFrom(projectedEventSerializer, projectedEventDeserializer), null)
-        );
-
-        // key by campaign rather than by ad as original
-        final KStream<String, String> keyedByCampaign = joined
-            .selectKey((key, value) -> value);
-
-        // calculate windowed counts
-        keyedByCampaign
-            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(TimeWindows.of(Duration.ofMillis(10 * 1000)))
-            .count(Materialized.as("time-windows"));
-
-        return new KafkaStreams(builder.build(), streamConfig);
-    }
-}
diff --git a/tests/kafkatest/benchmarks/streams/__init__.py b/tests/kafkatest/benchmarks/streams/__init__.py
deleted file mode 100644
index ec20143..0000000
--- a/tests/kafkatest/benchmarks/streams/__init__.py
+++ /dev/null
@@ -1,14 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
diff --git a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
deleted file mode 100644
index 2f87f4a..0000000
--- a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
+++ /dev/null
@@ -1,164 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.tests.test import Test
-from ducktape.mark.resource import cluster
-from ducktape.mark import parametrize, matrix
-from kafkatest.tests.kafka_test import KafkaTest
-
-from kafkatest.services.performance.streams_performance import StreamsSimpleBenchmarkService
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.version import DEV_BRANCH
-
-STREAMS_SIMPLE_TESTS = ["streamprocess", "streamprocesswithsink", "streamprocesswithstatestore", "streamprocesswithwindowstore"]
-STREAMS_COUNT_TESTS = ["streamcount", "streamcountwindowed"]
-STREAMS_JOIN_TESTS = ["streamtablejoin", "streamstreamjoin", "tabletablejoin"]
-NON_STREAMS_TESTS = ["consume", "consumeproduce"]
-
-ALL_TEST = "all"
-STREAMS_SIMPLE_TEST = "streams-simple"
-STREAMS_COUNT_TEST = "streams-count"
-STREAMS_JOIN_TEST = "streams-join"
-
-
-class StreamsSimpleBenchmarkTest(Test):
-    """
-    Simple benchmark of Kafka Streams.
-    """
-
-    def __init__(self, test_context):
-        super(StreamsSimpleBenchmarkTest, self).__init__(test_context)
-
-        # these values could be updated in ad-hoc benchmarks
-        self.key_skew = 0
-        self.value_size = 1024
-        self.num_records = 10000000L
-        self.num_threads = 1
-
-        self.replication = 1
-
-    @cluster(num_nodes=12)
-    @matrix(test=["consume", "consumeproduce",
-                  "streamprocess", "streamprocesswithsink", "streamprocesswithstatestore", "streamprocesswithwindowstore",
-                  "streamcount", "streamcountwindowed",
-                  "streamtablejoin", "streamstreamjoin", "tabletablejoin"],
-            scale=[1])
-    def test_simple_benchmark(self, test, scale):
-        """
-        Run simple Kafka Streams benchmark
-        """
-        self.driver = [None] * (scale + 1)
-
-        self.final = {}
-
-        #############
-        # SETUP PHASE
-        #############
-        self.zk = ZookeeperService(self.test_context, num_nodes=1)
-        self.zk.start()
-        self.kafka = KafkaService(self.test_context, num_nodes=scale, zk=self.zk, version=DEV_BRANCH, topics={
-            'simpleBenchmarkSourceTopic1' : { 'partitions': scale, 'replication-factor': self.replication },
-            'simpleBenchmarkSourceTopic2' : { 'partitions': scale, 'replication-factor': self.replication },
-            'simpleBenchmarkSinkTopic' : { 'partitions': scale, 'replication-factor': self.replication },
-            'yahooCampaigns' : { 'partitions': 20, 'replication-factor': self.replication },
-            'yahooEvents' : { 'partitions': 20, 'replication-factor': self.replication }
-        })
-        self.kafka.log_level = "INFO"
-        self.kafka.start()
-
-
-        load_test = ""
-        if test == ALL_TEST:
-            load_test = "load-two"
-        if test in STREAMS_JOIN_TESTS or test == STREAMS_JOIN_TEST:
-            load_test = "load-two"
-        if test in STREAMS_COUNT_TESTS or test == STREAMS_COUNT_TEST:
-            load_test = "load-one"
-        if test in STREAMS_SIMPLE_TESTS or test == STREAMS_SIMPLE_TEST:
-            load_test = "load-one"
-        if test in NON_STREAMS_TESTS:
-            load_test = "load-one"
-
-
-
-        ################
-        # LOAD PHASE
-        ################
-        self.load_driver = StreamsSimpleBenchmarkService(self.test_context,
-                                                         self.kafka,
-                                                         load_test,
-                                                         self.num_threads,
-                                                         self.num_records,
-                                                         self.key_skew,
-                                                         self.value_size)
-
-        self.load_driver.start()
-        self.load_driver.wait(3600) # wait at most 30 minutes
-        self.load_driver.stop()
-
-        if test == ALL_TEST:
-            for single_test in STREAMS_SIMPLE_TESTS + STREAMS_COUNT_TESTS + STREAMS_JOIN_TESTS:
-                self.execute(single_test, scale)
-        elif test == STREAMS_SIMPLE_TEST:
-            for single_test in STREAMS_SIMPLE_TESTS:
-                self.execute(single_test, scale)
-        elif test == STREAMS_COUNT_TEST:
-            for single_test in STREAMS_COUNT_TESTS:
-                self.execute(single_test, scale)
-        elif test == STREAMS_JOIN_TEST:
-            for single_test in STREAMS_JOIN_TESTS:
-                self.execute(single_test, scale)
-        else:
-            self.execute(test, scale)
-
-        return self.final
-
-    def execute(self, test, scale):
-
-        ################
-        # RUN PHASE
-        ################
-        for num in range(0, scale):
-            self.driver[num] = StreamsSimpleBenchmarkService(self.test_context,
-                                                             self.kafka,
-                                                             test,
-                                                             self.num_threads,
-                                                             self.num_records,
-                                                             self.key_skew,
-                                                             self.value_size)
-            self.driver[num].start()
-
-        #######################
-        # STOP + COLLECT PHASE
-        #######################
-        data = [None] * (scale)
-
-        for num in range(0, scale):
-            self.driver[num].wait()
-            self.driver[num].stop()
-            self.driver[num].node.account.ssh("grep Performance %s" % self.driver[num].STDOUT_FILE, allow_fail=False)
-            data[num] = self.driver[num].collect_data(self.driver[num].node, "")
-            self.driver[num].read_jmx_output_all_nodes()
-
-        for num in range(0, scale):
-            for key in data[num]:
-                self.final[key + "-" + str(num)] = data[num][key]
-
-            for key in sorted(self.driver[num].jmx_stats[0]):
-                self.logger.info("%s: %s" % (key, self.driver[num].jmx_stats[0][key]))
-
-            self.final[test + "-jmx-avg-" + str(num)] = self.driver[num].average_jmx_value
-            self.final[test + "-jmx-max-" + str(num)] = self.driver[num].maximum_jmx_value
diff --git a/tests/kafkatest/services/performance/streams_performance.py b/tests/kafkatest/services/performance/streams_performance.py
deleted file mode 100644
index e2dd15b..0000000
--- a/tests/kafkatest/services/performance/streams_performance.py
+++ /dev/null
@@ -1,108 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from kafkatest.services.monitor.jmx import JmxMixin
-from kafkatest.services.streams import StreamsTestBaseService
-from kafkatest.services.kafka import KafkaConfig
-from kafkatest.services import streams_property
-
-#
-# Class used to start the simple Kafka Streams benchmark
-#
-
-class StreamsSimpleBenchmarkService(StreamsTestBaseService):
-    """Base class for simple Kafka Streams benchmark"""
-
-    def __init__(self, test_context, kafka, test_name, num_threads, num_recs_or_wait_ms, key_skew, value_size):
-        super(StreamsSimpleBenchmarkService, self).__init__(test_context,
-                                                            kafka,
-                                                            "org.apache.kafka.streams.perf.SimpleBenchmark",
-                                                            test_name,
-                                                            num_recs_or_wait_ms,
-                                                            key_skew,
-                                                            value_size)
-
-        self.jmx_option = ""
-        if test_name.startswith('stream') or test_name.startswith('table'):
-            self.jmx_option = "stream-jmx"
-            JmxMixin.__init__(self,
-                              num_nodes=1,
-                              jmx_object_names=['kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-%d' %(i+1) for i in range(num_threads)],
-                              jmx_attributes=['process-latency-avg',
-                                              'process-rate',
-                                              'commit-latency-avg',
-                                              'commit-rate',
-                                              'poll-latency-avg',
-                                              'poll-rate'],
-                              root=StreamsTestBaseService.PERSISTENT_ROOT)
-
-        if test_name.startswith('consume'):
-            self.jmx_option = "consumer-jmx"
-            JmxMixin.__init__(self,
-                              num_nodes=1,
-                              jmx_object_names=['kafka.consumer:type=consumer-fetch-manager-metrics,client-id=simple-benchmark-consumer'],
-                              jmx_attributes=['records-consumed-rate'],
-                              root=StreamsTestBaseService.PERSISTENT_ROOT)
-
-        self.num_threads = num_threads
-
-    def prop_file(self):
-        cfg = KafkaConfig(**{streams_property.STATE_DIR: self.PERSISTENT_ROOT,
-                             streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(),
-                             streams_property.NUM_THREADS: self.num_threads})
-        return cfg.render()
-
-
-    def start_cmd(self, node):
-        if self.jmx_option != "":
-            args = self.args.copy()
-            args['jmx_port'] = self.jmx_port
-            args['config_file'] = self.CONFIG_FILE
-            args['stdout'] = self.STDOUT_FILE
-            args['stderr'] = self.STDERR_FILE
-            args['pidfile'] = self.PID_FILE
-            args['log4j'] = self.LOG4J_CONFIG_FILE
-            args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
-
-            cmd = "( export JMX_PORT=%(jmx_port)s; export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
-                  "INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
-                  " %(config_file)s %(user_test_args1)s %(user_test_args2)s %(user_test_args3)s" \
-                  " %(user_test_args4)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
-
-        else:
-            cmd = super(StreamsSimpleBenchmarkService, self).start_cmd(node)
-
-        return cmd
-
-    def start_node(self, node):
-        super(StreamsSimpleBenchmarkService, self).start_node(node)
-
-        if self.jmx_option != "":
-            self.start_jmx_tool(1, node)
-
-    def clean_node(self, node):
-        if self.jmx_option != "":
-            JmxMixin.clean_node(self, node)
-
-        super(StreamsSimpleBenchmarkService, self).clean_node(node)
-
-    def collect_data(self, node, tag = None):
-        # Collect the data and return it to the framework
-        output = node.account.ssh_capture("grep Performance %s" % self.STDOUT_FILE)
-        data = {}
-        for line in output:
-            parts = line.split(':')
-            data[tag + parts[0]] = parts[1]
-        return data


Mime
View raw message