kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-2802: kafka streams system tests
Date Tue, 23 Feb 2016 20:14:41 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 68af16ac1 -> 3358e1682


KAFKA-2802: kafka streams system tests

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Geoff Anderson <geoff@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #930 from ymatsuda/streams_systest


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3358e168
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3358e168
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3358e168

Branch: refs/heads/trunk
Commit: 3358e1682f034a82afd670be95505b2f620c78c6
Parents: 68af16a
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Authored: Tue Feb 23 12:14:26 2016 -0800
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Tue Feb 23 12:14:26 2016 -0800

----------------------------------------------------------------------
 bin/kafka-run-class.sh                          |   5 +
 bin/streams-smoke-test.sh                       |  23 +
 build.gradle                                    |   4 +
 .../streams/smoketest/SmokeTestClient.java      | 241 +++++++++
 .../streams/smoketest/SmokeTestDriver.java      | 495 +++++++++++++++++++
 .../kafka/streams/smoketest/SmokeTestUtil.java  | 184 +++++++
 .../streams/smoketest/StreamsSmokeTest.java     |  75 +++
 .../smoketest/TestTimestampExtractor.java       |  37 ++
 tests/kafkatest/services/streams.py             | 135 +++++
 tests/kafkatest/tests/streams_bounce_test.py    |  71 +++
 tests/kafkatest/tests/streams_smoke_test.py     |  73 +++
 11 files changed, 1343 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3358e168/bin/kafka-run-class.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index 9c98774..cb2556f 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -52,6 +52,11 @@ do
   CLASSPATH=$CLASSPATH:$file
 done
 
+for file in $base_dir/streams/build/dependant-libs-${SCALA_VERSION}/rocksdb*.jar;
+do
+  CLASSPATH=$CLASSPATH:$file
+done
+
 for file in $base_dir/tools/build/libs/kafka-tools*.jar;
 do
   CLASSPATH=$CLASSPATH:$file

http://git-wip-us.apache.org/repos/asf/kafka/blob/3358e168/bin/streams-smoke-test.sh
----------------------------------------------------------------------
diff --git a/bin/streams-smoke-test.sh b/bin/streams-smoke-test.sh
new file mode 100755
index 0000000..196990e
--- /dev/null
+++ b/bin/streams-smoke-test.sh
@@ -0,0 +1,23 @@
+#!/bin/sh
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+base_dir=$(dirname $0)
+
+if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
+    export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/tools-log4j.properties"
+fi
+
+exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.streams.smoketest.StreamsSmokeTest "$@"

http://git-wip-us.apache.org/repos/asf/kafka/blob/3358e168/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 275e250..feef93b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -541,6 +541,10 @@ project(':streams') {
     jar {
         dependsOn 'copyDependantLibs'
     }
+
+    systemTestLibs {
+        dependsOn testJar
+    }
 }
 
 project(':streams:examples') {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3358e168/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
new file mode 100644
index 0000000..7f1b343
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.smoketest;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.TumblingWindows;
+import org.apache.kafka.streams.kstream.UnlimitedWindows;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.Windowed;
+
+import java.io.File;
+import java.util.Properties;
+
+public class SmokeTestClient extends SmokeTestUtil {
+
+    private final String kafka;
+    private final String zookeeper;
+    private final File stateDir;
+    private KafkaStreams streams;
+    private Thread thread;
+
+    public SmokeTestClient(File stateDir, String kafka, String zookeeper) {
+        super();
+        this.stateDir = stateDir;
+        this.kafka = kafka;
+        this.zookeeper = zookeeper;
+    }
+
+    public void start() {
+        streams = createKafkaStreams(stateDir, kafka, zookeeper);
+        streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+            @Override
+            public void uncaughtException(Thread t, Throwable e) {
+                e.printStackTrace();
+            }
+        });
+
+        thread = new Thread() {
+            public void run() {
+                streams.start();
+            }
+        };
+        thread.start();
+    }
+
+    public void close() {
+        streams.close();
+        try {
+            thread.join();
+        } catch (Exception ex) {
+            // ignore
+        }
+    }
+
+    private static KafkaStreams createKafkaStreams(File stateDir, String kafka, String zookeeper) {
+        Properties props = new Properties();
+        props.put(StreamsConfig.JOB_ID_CONFIG, "SmokeTest");
+        props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
+        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TestTimestampExtractor.class);
+        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
+        props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
+        props.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100);
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        KStreamBuilder builder = new KStreamBuilder();
+
+        KStream<String, Integer> source = builder.stream(stringDeserializer, integerDeserializer, "data");
+
+        source.to("echo", stringSerializer, integerSerializer);
+
+        KStream<String, Integer> data = source.filter(new Predicate<String, Integer>() {
+            @Override
+            public boolean test(String key, Integer value) {
+                return value == null || value != END;
+            }
+        });
+
+        data.process(SmokeTestUtil.<Integer>printProcessorSupplier("data"));
+
+        // min
+        data.aggregateByKey(
+                new Initializer<Integer>() {
+                    public Integer apply() {
+                        return Integer.MAX_VALUE;
+                    }
+                },
+                new Aggregator<String, Integer, Integer>() {
+                    @Override
+                    public Integer apply(String aggKey, Integer value, Integer aggregate) {
+                        return (value < aggregate) ? value : aggregate;
+                    }
+                },
+                UnlimitedWindows.of("uwin-min"),
+                stringSerializer,
+                integerSerializer,
+                stringDeserializer,
+                integerDeserializer
+        ).toStream().map(
+                new Unwindow<String, Integer>()
+        ).to("min", stringSerializer, integerSerializer);
+
+        KTable<String, Integer> minTable = builder.table(stringSerializer, integerSerializer, stringDeserializer, integerDeserializer, "min");
+        minTable.toStream().process(SmokeTestUtil.<Integer>printProcessorSupplier("min"));
+
+        // max
+        data.aggregateByKey(
+                new Initializer<Integer>() {
+                    public Integer apply() {
+                        return Integer.MIN_VALUE;
+                    }
+                },
+                new Aggregator<String, Integer, Integer>() {
+                    @Override
+                    public Integer apply(String aggKey, Integer value, Integer aggregate) {
+                        return (value > aggregate) ? value : aggregate;
+                    }
+                },
+                UnlimitedWindows.of("uwin-max"),
+                stringSerializer,
+                integerSerializer,
+                stringDeserializer,
+                integerDeserializer
+        ).toStream().map(
+                new Unwindow<String, Integer>()
+        ).to("max", stringSerializer, integerSerializer);
+
+        KTable<String, Integer> maxTable = builder.table(stringSerializer, integerSerializer, stringDeserializer, integerDeserializer, "max");
+        maxTable.toStream().process(SmokeTestUtil.<Integer>printProcessorSupplier("max"));
+
+        // sum
+        data.aggregateByKey(
+                new Initializer<Long>() {
+                    public Long apply() {
+                        return 0L;
+                    }
+                },
+                new Aggregator<String, Integer, Long>() {
+                    @Override
+                    public Long apply(String aggKey, Integer value, Long aggregate) {
+                        return (long) value + aggregate;
+                    }
+                },
+                UnlimitedWindows.of("win-sum"),
+                stringSerializer,
+                longSerializer,
+                stringDeserializer,
+                longDeserializer
+        ).toStream().map(
+                new Unwindow<String, Long>()
+        ).to("sum", stringSerializer, longSerializer);
+
+
+        KTable<String, Long> sumTable = builder.table(stringSerializer, longSerializer, stringDeserializer, longDeserializer, "sum");
+        sumTable.toStream().process(SmokeTestUtil.<Long>printProcessorSupplier("sum"));
+
+        // cnt
+        data.countByKey(
+                UnlimitedWindows.of("uwin-cnt"),
+                stringSerializer,
+                longSerializer,
+                stringDeserializer,
+                longDeserializer
+        ).toStream().map(
+                new Unwindow<String, Long>()
+        ).to("cnt", stringSerializer, longSerializer);
+
+        KTable<String, Long> cntTable = builder.table(stringSerializer, longSerializer, stringDeserializer, longDeserializer, "cnt");
+        cntTable.toStream().process(SmokeTestUtil.<Long>printProcessorSupplier("cnt"));
+
+        // dif
+        maxTable.join(minTable,
+                new ValueJoiner<Integer, Integer, Integer>() {
+                    public Integer apply(Integer value1, Integer value2) {
+                        return value1 - value2;
+                    }
+                }
+        ).to("dif", stringSerializer, integerSerializer);
+
+        // avg
+        sumTable.join(
+                cntTable,
+                new ValueJoiner<Long, Long, Double>() {
+                    public Double apply(Long value1, Long value2) {
+                        return (double) value1 / (double) value2;
+                    }
+                }
+        ).to("avg", stringSerializer, doubleSerializer);
+
+        // windowed count
+        data.countByKey(
+                TumblingWindows.of("tumbling-win-cnt").with(WINDOW_SIZE),
+                stringSerializer,
+                longSerializer,
+                stringDeserializer,
+                longDeserializer
+        ).toStream().map(
+                new KeyValueMapper<Windowed<String>, Long, KeyValue<String, Long>>() {
+                    @Override
+                    public KeyValue<String, Long> apply(Windowed<String> key, Long value) {
+                        return new KeyValue<>(key.value() + "@" + key.window().start(), value);
+                    }
+                }
+        ).to("wcnt", stringSerializer, longSerializer);
+
+        return new KafkaStreams(builder, props);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3358e168/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
new file mode 100644
index 0000000..e56a369
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
@@ -0,0 +1,495 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.smoketest;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+
+public class SmokeTestDriver extends SmokeTestUtil {
+
+    private static class ValueList {
+        public final String key;
+        private final int[] values;
+        private int index;
+
+        ValueList(int min, int max) {
+            this.key = min + "-" + max;
+
+            this.values = new int[max - min + 1];
+            for (int i = 0; i < this.values.length; i++) {
+                this.values[i] = min + i;
+            }
+            // We want to randomize the order of data to test not completely predictable processing order
+            // However, values are also use as a timestamp of the record. (TODO: separate data and timestamp)
+            // We keep some correlation of time and order. Thus, the shuffling is done with a sliding window
+            shuffle(this.values, 10);
+
+            this.index = 0;
+        }
+
+        int next() {
+            return (index < values.length) ? values[index++] : -1;
+        }
+    }
+
+    // This main() is not used by the system test. It is intended to be used for local debugging.
+    public static void main(String[] args) throws Exception {
+        final String kafka = "localhost:9092";
+        final String zookeeper = "localhost:2181";
+        final File stateDir = createDir("/tmp/kafka-streams-smoketest");
+
+        final int numKeys = 10;
+        final int maxRecordsPerKey = 500;
+
+        Thread driver = new Thread() {
+            public void run() {
+                try {
+                    Map<String, Set<Integer>> allData = generate(kafka, numKeys, maxRecordsPerKey);
+                    verify(kafka, allData, maxRecordsPerKey);
+                } catch (Exception ex) {
+                    ex.printStackTrace();
+                }
+            }
+        };
+
+        SmokeTestClient streams1 = new SmokeTestClient(createDir(stateDir, "1"), kafka, zookeeper);
+        SmokeTestClient streams2 = new SmokeTestClient(createDir(stateDir, "2"), kafka, zookeeper);
+        SmokeTestClient streams3 = new SmokeTestClient(createDir(stateDir, "3"), kafka, zookeeper);
+        SmokeTestClient streams4 = new SmokeTestClient(createDir(stateDir, "4"), kafka, zookeeper);
+
+        System.out.println("starting the driver");
+        driver.start();
+
+        System.out.println("starting the first and second client");
+        streams1.start();
+        streams2.start();
+
+        sleep(10000);
+
+        System.out.println("starting the third client");
+        streams3.start();
+
+        System.out.println("closing the first client");
+        streams1.close();
+        System.out.println("closed the first client");
+
+        sleep(10000);
+
+        System.out.println("starting the forth client");
+        streams4.start();
+
+        driver.join();
+
+        System.out.println("driver stopped");
+
+        streams2.close();
+        streams3.close();
+        streams4.close();
+
+        System.out.println("shutdown");
+    }
+
+    public static Map<String, Set<Integer>> generate(String kafka, final int numKeys, final int maxRecordsPerKey) throws Exception {
+        Properties props = new Properties();
+        props.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+
+        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props);
+
+        int numRecordsProduced = 0;
+
+        Map<String, Set<Integer>> allData = new HashMap<>();
+        ValueList[] data = new ValueList[numKeys];
+        for (int i = 0; i < numKeys; i++) {
+            data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
+            allData.put(data[i].key, new HashSet<Integer>());
+        }
+        Random rand = new Random();
+
+        int remaining = data.length;
+
+        while (remaining > 0) {
+            int index = rand.nextInt(remaining);
+            String key = data[index].key;
+            int value = data[index].next();
+
+            if (value < 0) {
+                remaining--;
+                data[index] = data[remaining];
+                value = END;
+            }
+
+            ProducerRecord<byte[], byte[]> record =
+                    new ProducerRecord<>("data", stringSerializer.serialize("", key), integerSerializer.serialize("", value));
+
+            producer.send(record);
+
+            if (value != END) {
+                numRecordsProduced++;
+                allData.get(key).add(value);
+
+                if (numRecordsProduced % 100 == 0)
+                    System.out.println(numRecordsProduced + " records produced");
+
+                Thread.sleep(10);
+            }
+        }
+
+        producer.close();
+
+        return Collections.unmodifiableMap(allData);
+    }
+
+    private static void shuffle(int[] data, int windowSize) {
+        Random rand = new Random();
+        for (int i = 0; i < data.length; i++) {
+            // we shuffle data within windowSize
+            int j = rand.nextInt(Math.min(data.length - i, windowSize)) + i;
+
+            // swap
+            int tmp = data[i];
+            data[i] = data[j];
+            data[j] = tmp;
+        }
+    }
+
+    public static void verify(String kafka, Map<String, Set<Integer>> allData, int maxRecordsPerKey) {
+        Properties props = new Properties();
+        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+
+        KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
+        List<TopicPartition> partitions = getAllPartitions(consumer, "echo", "max", "min", "dif", "sum", "cnt", "avg", "wcnt");
+        consumer.assign(partitions);
+        consumer.seekToBeginning(partitions.toArray(new TopicPartition[partitions.size()]));
+
+        final int recordsGenerated = allData.size() * maxRecordsPerKey;
+        int recordsProcessed = 0;
+
+        HashMap<String, Integer> max = new HashMap<>();
+        HashMap<String, Integer> min = new HashMap<>();
+        HashMap<String, Integer> dif = new HashMap<>();
+        HashMap<String, Long> sum = new HashMap<>();
+        HashMap<String, Long> cnt = new HashMap<>();
+        HashMap<String, Double> avg = new HashMap<>();
+        HashMap<String, Long> wcnt = new HashMap<>();
+
+        HashSet<String> keys = new HashSet<>();
+        HashMap<String, Set<Integer>> received = new HashMap<>();
+        for (String key : allData.keySet()) {
+            keys.add(key);
+            received.put(key, new HashSet<Integer>());
+        }
+
+        int retryCount = 0;
+        int maxRetry = 240; // max two minutes (500ms * 240) (before we reach the end of records)
+
+        while (true) {
+            ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+            if (records.isEmpty()) {
+                retryCount++;
+                if (retryCount > maxRetry) break;
+            } else {
+                retryCount = 0;
+
+                for (ConsumerRecord<byte[], byte[]> record : records) {
+                    String key = stringDeserializer.deserialize("", record.key());
+                    switch (record.topic()) {
+                        case "echo":
+                            Integer value = integerDeserializer.deserialize("", record.value());
+                            if (value != null && value == END) {
+                                keys.remove(key);
+                                if (keys.isEmpty()) {
+                                    // we reached the end of records, set retry to 60 (max 30 seconds)
+                                    maxRetry = 60;
+                                }
+                            } else {
+                                recordsProcessed++;
+                                received.get(key).add(value);
+                            }
+                            break;
+                        case "min":
+                            min.put(key, integerDeserializer.deserialize("", record.value()));
+                            break;
+                        case "max":
+                            max.put(key, integerDeserializer.deserialize("", record.value()));
+                            break;
+                        case "dif":
+                            dif.put(key, integerDeserializer.deserialize("", record.value()));
+                            break;
+                        case "sum":
+                            sum.put(key, longDeserializer.deserialize("", record.value()));
+                            break;
+                        case "cnt":
+                            cnt.put(key, longDeserializer.deserialize("", record.value()));
+                            break;
+                        case "avg":
+                            avg.put(key, doubleDeserializer.deserialize("", record.value()));
+                            break;
+                        case "wcnt":
+                            wcnt.put(key, longDeserializer.deserialize("", record.value()));
+                            break;
+                        default:
+                            System.out.println("unknown topic: " + record.topic());
+                    }
+                }
+            }
+        }
+
+
+        System.out.println("-------------------");
+        System.out.println("Result Verification");
+        System.out.println("-------------------");
+        System.out.println("recordGenerated=" + recordsGenerated);
+        System.out.println("recordProcessed=" + recordsProcessed);
+
+        if (recordsProcessed > recordsGenerated) {
+            System.out.println("PROCESSED-MORE-THAN-GENERATED");
+        } else if (recordsProcessed < recordsGenerated) {
+            System.out.println("PROCESSED-LESS-THAN-GENERATED");
+        }
+
+        boolean success;
+        success = allData.equals(received);
+
+        if (success) {
+            System.out.println("ALL-RECORDS-DELIVERED");
+        } else {
+            int missedCount = 0;
+            for (Map.Entry<String, Set<Integer>> entry : allData.entrySet()) {
+                missedCount += received.get(entry.getKey()).size();
+            }
+            System.out.println("missedRecords=" + missedCount);
+        }
+
+        success &= verifyMin(min);
+        success &= verifyMax(max);
+        success &= verifyDif(dif);
+        success &= verifySum(sum);
+        success &= verifyCnt(cnt);
+        success &= verifyAvg(avg);
+        success &= verifyWCnt(wcnt);
+
+        System.out.println(success ? "SUCCESS" : "FAILURE");
+    }
+
+    private static boolean verifyMin(Map<String, Integer> map) {
+        boolean success = true;
+        if (map.isEmpty()) {
+            System.out.println("min is empty");
+            success = false;
+        } else {
+            System.out.println("verifying min");
+
+            for (Map.Entry<String, Integer> entry : map.entrySet()) {
+                int expected = getMin(entry.getKey());
+                if (expected != entry.getValue()) {
+                    System.out.println("fail: key=" + entry.getKey() + " min=" + entry.getValue() + " expected=" + expected);
+                    success = false;
+                }
+            }
+        }
+        return success;
+    }
+
+    private static boolean verifyMax(Map<String, Integer> map) {
+        boolean success = true;
+        if (map.isEmpty()) {
+            System.out.println("max is empty");
+            success = false;
+        } else {
+            System.out.println("verifying max");
+
+            for (Map.Entry<String, Integer> entry : map.entrySet()) {
+                int expected = getMax(entry.getKey());
+                if (expected != entry.getValue()) {
+                    System.out.println("fail: key=" + entry.getKey() + " max=" + entry.getValue() + " expected=" + expected);
+                    success = false;
+                }
+            }
+        }
+        return success;
+    }
+
+    private static boolean verifyDif(Map<String, Integer> map) {
+        boolean success = true;
+        if (map.isEmpty()) {
+            System.out.println("dif is empty");
+            success = false;
+        } else {
+            System.out.println("verifying dif");
+
+            for (Map.Entry<String, Integer> entry : map.entrySet()) {
+                int min = getMin(entry.getKey());
+                int max = getMax(entry.getKey());
+                int expected = max - min;
+                if (expected != entry.getValue()) {
+                    System.out.println("fail: key=" + entry.getKey() + " dif=" + entry.getValue() + " expected=" + expected);
+                    success = false;
+                }
+            }
+        }
+        return success;
+    }
+
+    private static boolean verifyCnt(Map<String, Long> map) {
+        boolean success = true;
+        if (map.isEmpty()) {
+            System.out.println("cnt is empty");
+            success = false;
+        } else {
+            System.out.println("verifying cnt");
+
+            for (Map.Entry<String, Long> entry : map.entrySet()) {
+                int min = getMin(entry.getKey());
+                int max = getMax(entry.getKey());
+                long expected = (max - min) + 1L;
+                if (expected != entry.getValue()) {
+                    System.out.println("fail: key=" + entry.getKey() + " cnt=" + entry.getValue() + " expected=" + expected);
+                    success = false;
+                }
+            }
+        }
+        return success;
+    }
+
+    private static boolean verifySum(Map<String, Long> map) {
+        boolean success = true;
+        if (map.isEmpty()) {
+            System.out.println("sum is empty");
+            success = false;
+        } else {
+            System.out.println("verifying sum");
+
+            for (Map.Entry<String, Long> entry : map.entrySet()) {
+                int min = getMin(entry.getKey());
+                int max = getMax(entry.getKey());
+                long expected = ((long) min + (long) max) * (max - min + 1L) / 2L;
+                if (expected != entry.getValue()) {
+                    System.out.println("fail: key=" + entry.getKey() + " sum=" + entry.getValue() + " expected=" + expected);
+                    success = false;
+                }
+            }
+        }
+        return success;
+    }
+
+    private static boolean verifyAvg(Map<String, Double> map) {
+        boolean success = true;
+        if (map.isEmpty()) {
+            System.out.println("avg is empty");
+            success = false;
+        } else {
+            System.out.println("verifying avg");
+
+            for (Map.Entry<String, Double> entry : map.entrySet()) {
+                int min = getMin(entry.getKey());
+                int max = getMax(entry.getKey());
+                double expected = ((long) min + (long) max) / 2.0;
+
+                if (expected != entry.getValue()) {
+                    System.out.println("fail: key=" + entry.getKey() + " avg=" + entry.getValue() + " expected=" + expected);
+                    success = false;
+                }
+            }
+        }
+        return success;
+    }
+
+    private static boolean verifyWCnt(Map<String, Long> map) {
+        boolean success = true;
+        if (map.isEmpty()) {
+            System.out.println("wcnt is empty");
+            success = false;
+        } else {
+            System.out.println("verifying wcnt");
+
+            for (Map.Entry<String, Long> entry : map.entrySet()) {
+                long minTime = getMinFromWKey(entry.getKey()) + START_TIME;
+                long maxTime = getMaxFromWKey(entry.getKey()) + START_TIME;
+                long winTime = getStartFromWKey(entry.getKey());
+
+                long expected = WINDOW_SIZE;
+                if (minTime > winTime) expected -= minTime - winTime;
+                if (maxTime < winTime + WINDOW_SIZE - 1) expected -= winTime + WINDOW_SIZE - 1 - maxTime;
+
+                if (expected != entry.getValue()) {
+                    System.out.println("fail: key=" + entry.getKey() + " wcnt=" + entry.getValue() + " expected=" + expected);
+                    success = false;
+                }
+            }
+        }
+        return success;
+    }
+
+    private static int getMin(String key) {
+        return Integer.parseInt(key.split("-")[0]);
+    }
+
+    private static int getMax(String key) {
+        return Integer.parseInt(key.split("-")[1]);
+    }
+
+    private static int getMinFromWKey(String key) {
+        return getMin(key.split("@")[0]);
+    }
+
+    private static int getMaxFromWKey(String key) {
+        return getMax(key.split("@")[0]);
+    }
+
+    private static long getStartFromWKey(String key) {
+        return Long.parseLong(key.split("@")[1]);
+    }
+
+    private static List<TopicPartition> getAllPartitions(KafkaConsumer<?, ?> consumer, String... topics) {
+        ArrayList<TopicPartition> partitions = new ArrayList<>();
+
+        for (String topic : topics) {
+            for (PartitionInfo info : consumer.partitionsFor(topic)) {
+                partitions.add(new TopicPartition(info.topic(), info.partition()));
+            }
+        }
+        return partitions;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3358e168/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
new file mode 100644
index 0000000..4a13599
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.smoketest;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.io.File;
+import java.util.Map;
+
+public class SmokeTestUtil {
+
+    public final static int WINDOW_SIZE = 100;
+    public final static long START_TIME = 60000L * 60 * 24 * 365 * 30;
+    public final static int END = Integer.MAX_VALUE;
+
+    public static <T> ProcessorSupplier<String, T> printProcessorSupplier(final String topic) {
+        return printProcessorSupplier(topic, false);
+    }
+
+    public static <T> ProcessorSupplier<String, T> printProcessorSupplier(final String topic, final boolean printOffset) {
+        return new ProcessorSupplier<String, T>() {
+            public Processor<String, T> get() {
+                return new Processor<String, T>() {
+                    private int numRecordsProcessed = 0;
+                    private ProcessorContext context;
+
+                    @Override
+                    public void init(ProcessorContext context) {
+                        System.out.println("initializing processor: topic=" + topic + " taskId=" + context.taskId());
+                        numRecordsProcessed = 0;
+                        this.context = context;
+                    }
+
+                    @Override
+                    public void process(String key, T value) {
+                        if (printOffset) System.out.println(">>> " + context.offset());
+                        numRecordsProcessed++;
+                        if (numRecordsProcessed % 100 == 0) {
+                            System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
+                        }
+                    }
+
+                    @Override
+                    public void punctuate(long timestamp) {
+                    }
+
+                    @Override
+                    public void close() {
+                    }
+                };
+            }
+        };
+    }
+
+    public static final class Unwindow<K, V> implements KeyValueMapper<Windowed<K>, V, KeyValue<K, V>> {
+        public KeyValue<K, V> apply(Windowed<K> winKey, V value) {
+            return new KeyValue<K, V>(winKey.value(), value);
+        }
+    }
+
+    public static Serializer<String> stringSerializer = new StringSerializer();
+
+    public static Deserializer<String> stringDeserializer = new StringDeserializer();
+
+    public static Serializer<Integer> integerSerializer = new IntegerSerializer();
+
+    public static Deserializer<Integer> integerDeserializer = new IntegerDeserializer();
+
+    public static Serializer<Long> longSerializer = new LongSerializer();
+
+    public static Deserializer<Long> longDeserializer = new LongDeserializer();
+
+    public static Serializer<Double> doubleSerializer = new Serializer<Double>() {
+
+        @Override
+        public void configure(Map<String, ?> configs, boolean isKey) {
+        }
+
+        @Override
+        public byte[] serialize(String topic, Double data) {
+            if (data == null)
+                return null;
+
+            long bits = Double.doubleToLongBits(data);
+            return new byte[] {
+                (byte) (bits >>> 56),
+                (byte) (bits >>> 48),
+                (byte) (bits >>> 40),
+                (byte) (bits >>> 32),
+                (byte) (bits >>> 24),
+                (byte) (bits >>> 16),
+                (byte) (bits >>> 8),
+                (byte) bits
+            };
+        }
+
+        @Override
+        public void close() {
+        }
+    };
+
+    public static Deserializer<Double> doubleDeserializer = new Deserializer<Double>() {
+
+        @Override
+        public void configure(Map<String, ?> configs, boolean isKey) {
+        }
+
+        @Override
+        public Double deserialize(String topic, byte[] data) {
+            if (data == null)
+                return null;
+            if (data.length != 8) {
+                throw new SerializationException("Size of data received by Deserializer is " +
+                        "not 8");
+            }
+
+            long value = 0;
+            for (byte b : data) {
+                value <<= 8;
+                value |= b & 0xFF;
+            }
+            return Double.longBitsToDouble(value);
+        }
+
+        @Override
+        public void close() {
+        }
+    };
+
+    public static File createDir(String path) throws Exception {
+        File dir = new File(path);
+
+        dir.mkdir();
+
+        return dir;
+    }
+
+    public static File createDir(File parent, String child) throws Exception {
+        File dir = new File(parent, child);
+
+        dir.mkdir();
+
+        return dir;
+    }
+
+    public static void sleep(long duration) {
+        try {
+            Thread.sleep(duration);
+        } catch (Exception ex) {
+            //
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3358e168/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java
new file mode 100644
index 0000000..a6cd141
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.smoketest;
+
+import java.io.File;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Created by yasuhiro on 2/10/16.
+ */
+public class StreamsSmokeTest {
+
+    /**
+     *  args ::= command kafka zookeeper stateDir
+     *  command := "run" | "process"
+     *
+     * @param args
+     */
+    public static void main(String[] args) throws Exception {
+        String command = args[0];
+        String kafka = args.length > 1 ? args[1] : null;
+        String zookeeper = args.length > 2 ? args[2] : null;
+        String stateDir = args.length > 3 ? args[3] : null;
+
+        System.out.println("StreamsSmokeTest instance started");
+        System.out.println("command=" + command);
+        System.out.println("kafka=" + kafka);
+        System.out.println("zookeeper=" + zookeeper);
+        System.out.println("stateDir=" + stateDir);
+
+        switch (command) {
+            case "standalone":
+                SmokeTestDriver.main(args);
+                break;
+            case "run":
+                // this starts the driver (data generation and result verification)
+                final int numKeys = 10;
+                final int maxRecordsPerKey = 500;
+                Map<String, Set<Integer>> allData = SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey);
+                SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey);
+                break;
+            case "process":
+                // this starts a KafkaStreams client
+                final SmokeTestClient client = new SmokeTestClient(new File(stateDir), kafka, zookeeper);
+                client.start();
+
+                Runtime.getRuntime().addShutdownHook(new Thread() {
+                    @Override
+                    public void run() {
+                        client.close();
+                    }
+                });
+                break;
+            default:
+                System.out.println("unknown command: " + command);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3358e168/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java
new file mode 100644
index 0000000..04e264c
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.smoketest;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+
+public class TestTimestampExtractor implements TimestampExtractor {
+
+    private final long base = SmokeTestUtil.START_TIME;
+
+    @Override
+    public long extract(ConsumerRecord<Object, Object> record) {
+        switch (record.topic()) {
+            case "data":
+                return base + (Integer) record.value();
+            default:
+                return System.currentTimeMillis();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3358e168/tests/kafkatest/services/streams.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
new file mode 100644
index 0000000..192a8d9
--- /dev/null
+++ b/tests/kafkatest/services/streams.py
@@ -0,0 +1,135 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.services.service import Service
+from ducktape.utils.util import wait_until
+from ducktape.errors import DucktapeError
+
+from kafkatest.services.kafka.directory import kafka_dir
+import signal, random, requests, os.path, json
+
+class StreamsSmokeTestBaseService(Service):
+    """Base class for Streams Smoke Test services providing some common settings and functionality"""
+
+    PERSISTENT_ROOT = "/mnt/streams"
+    # The log file contains normal log4j logs written using a file appender. stdout and stderr are handled separately
+    LOG_FILE = os.path.join(PERSISTENT_ROOT, "streams.log")
+    STDOUT_FILE = os.path.join(PERSISTENT_ROOT, "streams.stdout")
+    STDERR_FILE = os.path.join(PERSISTENT_ROOT, "streams.stderr")
+    LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
+    PID_FILE = os.path.join(PERSISTENT_ROOT, "streams.pid")
+
+    logs = {
+        "streams_log": {
+            "path": LOG_FILE,
+            "collect_default": True},
+        "streams_stdout": {
+            "path": STDOUT_FILE,
+            "collect_default": True},
+        "streams_stderr": {
+            "path": STDERR_FILE,
+            "collect_default": True},
+    }
+
+    def __init__(self, context, kafka, command):
+        super(StreamsSmokeTestBaseService, self).__init__(context, 1)
+        self.kafka = kafka
+        self.args = { 'command': command }
+
+    @property
+    def node(self):
+        return self.nodes[0]
+
+    def pids(self, node):
+        try:
+            return [pid for pid in node.account.ssh_capture("cat " + self.PID_FILE, callback=int)]
+        except:
+            return []
+
+    def stop_node(self, node, clean_shutdown=True):
+        self.logger.info((clean_shutdown and "Cleanly" or "Forcibly") + " stopping Streams Smoke Test on " + str(node.account))
+        pids = self.pids(node)
+        sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
+
+        for pid in pids:
+            node.account.signal(pid, sig, allow_fail=True)
+        if clean_shutdown:
+            for pid in pids:
+                wait_until(lambda: not node.account.alive(pid), timeout_sec=60, err_msg="Streams Smoke Test process on " + str(node.account) + " took too long to exit")
+
+        node.account.ssh("rm -f " + self.PID_FILE, allow_fail=False)
+
+    def restart(self):
+        # We don't want to do any clean up here, just restart the process.
+        for node in self.nodes:
+            self.logger.info("Restarting Kafka Streams on " + str(node.account))
+            self.stop_node(node)
+            self.start_node(node)
+
+    def abortThenRestart(self):
+        # We don't want to do any clean up here, just abort then restart the process. The running service is killed immediately.
+        for node in self.nodes:
+            self.logger.info("Aborting Kafka Streams on " + str(node.account))
+            self.stop_node(node, False)
+            self.logger.info("Restarting Kafka Streams on " + str(node.account))
+            self.start_node(node)
+
+    def wait(self):
+        for node in self.nodes:
+            for pid in self.pids(node):
+                wait_until(lambda: not node.account.alive(pid), timeout_sec=120, err_msg="Streams Smoke Test process on " + str(node.account) + " took too long to exit")
+
+    def clean_node(self, node):
+        node.account.kill_process("streams", clean_shutdown=False, allow_fail=True)
+        node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False)
+
+    def start_cmd(self, node):
+        args = self.args.copy()
+        args['kafka'] = self.kafka.bootstrap_servers()
+        args['zk'] = self.kafka.zk.connect_setting()
+        args['state_dir'] = self.PERSISTENT_ROOT
+        args['stdout'] = self.STDOUT_FILE
+        args['stderr'] = self.STDERR_FILE
+        args['pidfile'] = self.PID_FILE
+        args['log4j'] = self.LOG4J_CONFIG_FILE
+        args['kafka_dir'] = kafka_dir(node)
+
+        cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
+              "/opt/%(kafka_dir)s/bin/streams-smoke-test.sh %(command)s %(kafka)s %(zk)s %(state_dir)s " \
+              " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
+
+        return cmd
+
+    def start_node(self, node):
+        node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False)
+
+        node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('tools_log4j.properties', log_file=self.LOG_FILE))
+
+        self.logger.info("Starting StreamsSmokeTest process on " + str(node.account))
+        with node.account.monitor_log(self.STDOUT_FILE) as monitor:
+            node.account.ssh(self.start_cmd(node))
+            monitor.wait_until('StreamsSmokeTest instance started', timeout_sec=15, err_msg="Never saw message indicating StreamsSmokeTest finished startup on " + str(node.account))
+
+        if len(self.pids(node)) == 0:
+            raise RuntimeError("No process ids recorded")
+
+
+class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService):
+    def __init__(self, context, kafka):
+        super(StreamsSmokeTestDriverService, self).__init__(context, kafka, "run")
+
+class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService):
+    def __init__(self, context, kafka):
+        super(StreamsSmokeTestJobRunnerService, self).__init__(context, kafka, "process")

http://git-wip-us.apache.org/repos/asf/kafka/blob/3358e168/tests/kafkatest/tests/streams_bounce_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams_bounce_test.py b/tests/kafkatest/tests/streams_bounce_test.py
new file mode 100644
index 0000000..176f010
--- /dev/null
+++ b/tests/kafkatest/tests/streams_bounce_test.py
@@ -0,0 +1,71 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
+from ducktape.utils.util import wait_until
+import time
+
+class StreamsBounceTest(KafkaTest):
+    """
+    Simple test of Kafka Streams.
+    """
+
+    def __init__(self, test_context):
+        super(StreamsBounceTest, self).__init__(test_context, num_zk=1, num_brokers=2, topics={
+            'echo' : { 'partitions': 5, 'replication-factor': 2 },
+            'data' : { 'partitions': 5, 'replication-factor': 2 },
+            'min' : { 'partitions': 5, 'replication-factor': 2 },
+            'max' : { 'partitions': 5, 'replication-factor': 2 },
+            'sum' : { 'partitions': 5, 'replication-factor': 2 },
+            'dif' : { 'partitions': 5, 'replication-factor': 2 },
+            'cnt' : { 'partitions': 5, 'replication-factor': 2 },
+            'avg' : { 'partitions': 5, 'replication-factor': 2 },
+            'wcnt' : { 'partitions': 5, 'replication-factor': 2 }
+        })
+
+        self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
+        self.processor1 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
+
+    def test_bounce(self):
+        """
+        Start a smoke test client, then abort (kill -9) and restart it a few times.
+        Ensure that all records are delivered.
+        """
+
+        self.driver.start()
+
+        self.processor1.start()
+
+        time.sleep(15);
+
+        self.processor1.abortThenRestart()
+
+        time.sleep(15);
+
+        # enable this after we add change log partition replicas
+        #self.kafka.signal_leader("data")
+
+        time.sleep(15);
+
+        self.processor1.abortThenRestart()
+
+        self.driver.wait()
+        self.driver.stop()
+
+        self.processor1.stop()
+
+        node = self.driver.node
+        node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3358e168/tests/kafkatest/tests/streams_smoke_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams_smoke_test.py b/tests/kafkatest/tests/streams_smoke_test.py
new file mode 100644
index 0000000..2861837
--- /dev/null
+++ b/tests/kafkatest/tests/streams_smoke_test.py
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
+from ducktape.utils.util import wait_until
+import time
+
+class StreamsSmokeTest(KafkaTest):
+    """
+    Simple test of Kafka Streams.
+    """
+
+    def __init__(self, test_context):
+        super(StreamsSmokeTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
+            'echo' : { 'partitions': 5, 'replication-factor': 1 },
+            'data' : { 'partitions': 5, 'replication-factor': 1 },
+            'min' : { 'partitions': 5, 'replication-factor': 1 },
+            'max' : { 'partitions': 5, 'replication-factor': 1 },
+            'sum' : { 'partitions': 5, 'replication-factor': 1 },
+            'dif' : { 'partitions': 5, 'replication-factor': 1 },
+            'cnt' : { 'partitions': 5, 'replication-factor': 1 },
+            'avg' : { 'partitions': 5, 'replication-factor': 1 },
+            'wcnt' : { 'partitions': 5, 'replication-factor': 1 }
+        })
+
+        self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
+        self.processor1 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
+        self.processor2 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
+        self.processor3 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
+        self.processor4 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
+
+    def test_streams(self):
+        """
+        Start a few smoke test clients, then repeat start a new one, stop (cleanly) running one a few times.
+        Ensure that all results (stats on values computed by Kafka Streams) are correct.
+        """
+
+        self.driver.start()
+
+        self.processor1.start()
+        self.processor2.start()
+
+        time.sleep(15);
+
+        self.processor3.start()
+        self.processor1.stop()
+
+        time.sleep(15);
+
+        self.processor4.start();
+
+        self.driver.wait()
+        self.driver.stop()
+
+        self.processor2.stop()
+        self.processor3.stop()
+        self.processor4.stop()
+
+        node = self.driver.node
+        node.account.ssh("grep SUCCESS %s" % self.driver.STDOUT_FILE, allow_fail=False)


Mime
View raw message