Repository: kafka
Updated Branches:
refs/heads/trunk 68af16ac1 -> 3358e1682
KAFKA-2802: kafka streams system tests
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Reviewers: Geoff Anderson <geoff@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes #930 from ymatsuda/streams_systest
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3358e168
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3358e168
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3358e168
Branch: refs/heads/trunk
Commit: 3358e1682f034a82afd670be95505b2f620c78c6
Parents: 68af16a
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Authored: Tue Feb 23 12:14:26 2016 -0800
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Tue Feb 23 12:14:26 2016 -0800
----------------------------------------------------------------------
bin/kafka-run-class.sh | 5 +
bin/streams-smoke-test.sh | 23 +
build.gradle | 4 +
.../streams/smoketest/SmokeTestClient.java | 241 +++++++++
.../streams/smoketest/SmokeTestDriver.java | 495 +++++++++++++++++++
.../kafka/streams/smoketest/SmokeTestUtil.java | 184 +++++++
.../streams/smoketest/StreamsSmokeTest.java | 75 +++
.../smoketest/TestTimestampExtractor.java | 37 ++
tests/kafkatest/services/streams.py | 135 +++++
tests/kafkatest/tests/streams_bounce_test.py | 71 +++
tests/kafkatest/tests/streams_smoke_test.py | 73 +++
11 files changed, 1343 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/3358e168/bin/kafka-run-class.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index 9c98774..cb2556f 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -52,6 +52,11 @@ do
CLASSPATH=$CLASSPATH:$file
done
+for file in $base_dir/streams/build/dependant-libs-${SCALA_VERSION}/rocksdb*.jar;
+do
+ CLASSPATH=$CLASSPATH:$file
+done
+
for file in $base_dir/tools/build/libs/kafka-tools*.jar;
do
CLASSPATH=$CLASSPATH:$file
http://git-wip-us.apache.org/repos/asf/kafka/blob/3358e168/bin/streams-smoke-test.sh
----------------------------------------------------------------------
diff --git a/bin/streams-smoke-test.sh b/bin/streams-smoke-test.sh
new file mode 100755
index 0000000..196990e
--- /dev/null
+++ b/bin/streams-smoke-test.sh
@@ -0,0 +1,23 @@
+#!/bin/sh
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+base_dir=$(dirname $0)
+
+if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
+ export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/tools-log4j.properties"
+fi
+
+exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.streams.smoketest.StreamsSmokeTest "$@"
http://git-wip-us.apache.org/repos/asf/kafka/blob/3358e168/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 275e250..feef93b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -541,6 +541,10 @@ project(':streams') {
jar {
dependsOn 'copyDependantLibs'
}
+
+ systemTestLibs {
+ dependsOn testJar
+ }
}
project(':streams:examples') {
http://git-wip-us.apache.org/repos/asf/kafka/blob/3358e168/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
new file mode 100644
index 0000000..7f1b343
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.smoketest;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.TumblingWindows;
+import org.apache.kafka.streams.kstream.UnlimitedWindows;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.Windowed;
+
+import java.io.File;
+import java.util.Properties;
+
+public class SmokeTestClient extends SmokeTestUtil {
+
+ private final String kafka;
+ private final String zookeeper;
+ private final File stateDir;
+ private KafkaStreams streams;
+ private Thread thread;
+
+ public SmokeTestClient(File stateDir, String kafka, String zookeeper) {
+ super();
+ this.stateDir = stateDir;
+ this.kafka = kafka;
+ this.zookeeper = zookeeper;
+ }
+
+ public void start() {
+ streams = createKafkaStreams(stateDir, kafka, zookeeper);
+ streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ e.printStackTrace();
+ }
+ });
+
+ thread = new Thread() {
+ public void run() {
+ streams.start();
+ }
+ };
+ thread.start();
+ }
+
+ public void close() {
+ streams.close();
+ try {
+ thread.join();
+ } catch (Exception ex) {
+ // ignore
+ }
+ }
+
+ private static KafkaStreams createKafkaStreams(File stateDir, String kafka, String zookeeper) {
+ Properties props = new Properties();
+ props.put(StreamsConfig.JOB_ID_CONFIG, "SmokeTest");
+ props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+ props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
+ props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TestTimestampExtractor.class);
+ props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+ props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+ props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+ props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+ props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
+ props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
+ props.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100);
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ KStreamBuilder builder = new KStreamBuilder();
+
+ KStream<String, Integer> source = builder.stream(stringDeserializer, integerDeserializer, "data");
+
+ source.to("echo", stringSerializer, integerSerializer);
+
+ KStream<String, Integer> data = source.filter(new Predicate<String, Integer>() {
+ @Override
+ public boolean test(String key, Integer value) {
+ return value == null || value != END;
+ }
+ });
+
+ data.process(SmokeTestUtil.<Integer>printProcessorSupplier("data"));
+
+ // min
+ data.aggregateByKey(
+ new Initializer<Integer>() {
+ public Integer apply() {
+ return Integer.MAX_VALUE;
+ }
+ },
+ new Aggregator<String, Integer, Integer>() {
+ @Override
+ public Integer apply(String aggKey, Integer value, Integer aggregate) {
+ return (value < aggregate) ? value : aggregate;
+ }
+ },
+ UnlimitedWindows.of("uwin-min"),
+ stringSerializer,
+ integerSerializer,
+ stringDeserializer,
+ integerDeserializer
+ ).toStream().map(
+ new Unwindow<String, Integer>()
+ ).to("min", stringSerializer, integerSerializer);
+
+ KTable<String, Integer> minTable = builder.table(stringSerializer, integerSerializer, stringDeserializer, integerDeserializer, "min");
+ minTable.toStream().process(SmokeTestUtil.<Integer>printProcessorSupplier("min"));
+
+ // max
+ data.aggregateByKey(
+ new Initializer<Integer>() {
+ public Integer apply() {
+ return Integer.MIN_VALUE;
+ }
+ },
+ new Aggregator<String, Integer, Integer>() {
+ @Override
+ public Integer apply(String aggKey, Integer value, Integer aggregate) {
+ return (value > aggregate) ? value : aggregate;
+ }
+ },
+ UnlimitedWindows.of("uwin-max"),
+ stringSerializer,
+ integerSerializer,
+ stringDeserializer,
+ integerDeserializer
+ ).toStream().map(
+ new Unwindow<String, Integer>()
+ ).to("max", stringSerializer, integerSerializer);
+
+ KTable<String, Integer> maxTable = builder.table(stringSerializer, integerSerializer, stringDeserializer, integerDeserializer, "max");
+ maxTable.toStream().process(SmokeTestUtil.<Integer>printProcessorSupplier("max"));
+
+ // sum
+ data.aggregateByKey(
+ new Initializer<Long>() {
+ public Long apply() {
+ return 0L;
+ }
+ },
+ new Aggregator<String, Integer, Long>() {
+ @Override
+ public Long apply(String aggKey, Integer value, Long aggregate) {
+ return (long) value + aggregate;
+ }
+ },
+ UnlimitedWindows.of("win-sum"),
+ stringSerializer,
+ longSerializer,
+ stringDeserializer,
+ longDeserializer
+ ).toStream().map(
+ new Unwindow<String, Long>()
+ ).to("sum", stringSerializer, longSerializer);
+
+
+ KTable<String, Long> sumTable = builder.table(stringSerializer, longSerializer, stringDeserializer, longDeserializer, "sum");
+ sumTable.toStream().process(SmokeTestUtil.<Long>printProcessorSupplier("sum"));
+
+ // cnt
+ data.countByKey(
+ UnlimitedWindows.of("uwin-cnt"),
+ stringSerializer,
+ longSerializer,
+ stringDeserializer,
+ longDeserializer
+ ).toStream().map(
+ new Unwindow<String, Long>()
+ ).to("cnt", stringSerializer, longSerializer);
+
+ KTable<String, Long> cntTable = builder.table(stringSerializer, longSerializer, stringDeserializer, longDeserializer, "cnt");
+ cntTable.toStream().process(SmokeTestUtil.<Long>printProcessorSupplier("cnt"));
+
+ // dif
+ maxTable.join(minTable,
+ new ValueJoiner<Integer, Integer, Integer>() {
+ public Integer apply(Integer value1, Integer value2) {
+ return value1 - value2;
+ }
+ }
+ ).to("dif", stringSerializer, integerSerializer);
+
+ // avg
+ sumTable.join(
+ cntTable,
+ new ValueJoiner<Long, Long, Double>() {
+ public Double apply(Long value1, Long value2) {
+ return (double) value1 / (double) value2;
+ }
+ }
+ ).to("avg", stringSerializer, doubleSerializer);
+
+ // windowed count
+ data.countByKey(
+ TumblingWindows.of("tumbling-win-cnt").with(WINDOW_SIZE),
+ stringSerializer,
+ longSerializer,
+ stringDeserializer,
+ longDeserializer
+ ).toStream().map(
+ new KeyValueMapper<Windowed<String>, Long, KeyValue<String, Long>>() {
+ @Override
+ public KeyValue<String, Long> apply(Windowed<String> key, Long value) {
+ return new KeyValue<>(key.value() + "@" + key.window().start(), value);
+ }
+ }
+ ).to("wcnt", stringSerializer, longSerializer);
+
+ return new KafkaStreams(builder, props);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3358e168/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
new file mode 100644
index 0000000..e56a369
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
@@ -0,0 +1,495 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.smoketest;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+
+public class SmokeTestDriver extends SmokeTestUtil {
+
+ private static class ValueList {
+ public final String key;
+ private final int[] values;
+ private int index;
+
+ ValueList(int min, int max) {
+ this.key = min + "-" + max;
+
+ this.values = new int[max - min + 1];
+ for (int i = 0; i < this.values.length; i++) {
+ this.values[i] = min + i;
+ }
+ // We want to randomize the order of data to test not completely predictable processing order
+ // However, values are also use as a timestamp of the record. (TODO: separate data and timestamp)
+ // We keep some correlation of time and order. Thus, the shuffling is done with a sliding window
+ shuffle(this.values, 10);
+
+ this.index = 0;
+ }
+
+ int next() {
+ return (index < values.length) ? values[index++] : -1;
+ }
+ }
+
+ // This main() is not used by the system test. It is intended to be used for local debugging.
+ public static void main(String[] args) throws Exception {
+ final String kafka = "localhost:9092";
+ final String zookeeper = "localhost:2181";
+ final File stateDir = createDir("/tmp/kafka-streams-smoketest");
+
+ final int numKeys = 10;
+ final int maxRecordsPerKey = 500;
+
+ Thread driver = new Thread() {
+ public void run() {
+ try {
+ Map<String, Set<Integer>> allData = generate(kafka, numKeys, maxRecordsPerKey);
+ verify(kafka, allData, maxRecordsPerKey);
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ }
+ };
+
+ SmokeTestClient streams1 = new SmokeTestClient(createDir(stateDir, "1"), kafka, zookeeper);
+ SmokeTestClient streams2 = new SmokeTestClient(createDir(stateDir, "2"), kafka, zookeeper);
+ SmokeTestClient streams3 = new SmokeTestClient(createDir(stateDir, "3"), kafka, zookeeper);
+ SmokeTestClient streams4 = new SmokeTestClient(createDir(stateDir, "4"), kafka, zookeeper);
+
+ System.out.println("starting the driver");
+ driver.start();
+
+ System.out.println("starting the first and second client");
+ streams1.start();
+ streams2.start();
+
+ sleep(10000);
+
+ System.out.println("starting the third client");
+ streams3.start();
+
+ System.out.println("closing the first client");
+ streams1.close();
+ System.out.println("closed the first client");
+
+ sleep(10000);
+
+ System.out.println("starting the forth client");
+ streams4.start();
+
+ driver.join();
+
+ System.out.println("driver stopped");
+
+ streams2.close();
+ streams3.close();
+ streams4.close();
+
+ System.out.println("shutdown");
+ }
+
+ public static Map<String, Set<Integer>> generate(String kafka, final int numKeys, final int maxRecordsPerKey) throws Exception {
+ Properties props = new Properties();
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+
+ KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props);
+
+ int numRecordsProduced = 0;
+
+ Map<String, Set<Integer>> allData = new HashMap<>();
+ ValueList[] data = new ValueList[numKeys];
+ for (int i = 0; i < numKeys; i++) {
+ data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
+ allData.put(data[i].key, new HashSet<Integer>());
+ }
+ Random rand = new Random();
+
+ int remaining = data.length;
+
+ while (remaining > 0) {
+ int index = rand.nextInt(remaining);
+ String key = data[index].key;
+ int value = data[index].next();
+
+ if (value < 0) {
+ remaining--;
+ data[index] = data[remaining];
+ value = END;
+ }
+
+ ProducerRecord<byte[], byte[]> record =
+ new ProducerRecord<>("data", stringSerializer.serialize("", key), integerSerializer.serialize("", value));
+
+ producer.send(record);
+
+ if (value != END) {
+ numRecordsProduced++;
+ allData.get(key).add(value);
+
+ if (numRecordsProduced % 100 == 0)
+ System.out.println(numRecordsProduced + " records produced");
+
+ Thread.sleep(10);
+ }
+ }
+
+ producer.close();
+
+ return Collections.unmodifiableMap(allData);
+ }
+
+ private static void shuffle(int[] data, int windowSize) {
+ Random rand = new Random();
+ for (int i = 0; i < data.length; i++) {
+ // we shuffle data within windowSize
+ int j = rand.nextInt(Math.min(data.length - i, windowSize)) + i;
+
+ // swap
+ int tmp = data[i];
+ data[i] = data[j];
+ data[j] = tmp;
+ }
+ }
+
+ public static void verify(String kafka, Map<String, Set<Integer>> allData, int maxRecordsPerKey) {
+ Properties props = new Properties();
+ props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+
+ KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
+ List<TopicPartition> partitions = getAllPartitions(consumer, "echo", "max", "min", "dif", "sum", "cnt", "avg", "wcnt");
+ consumer.assign(partitions);
+ consumer.seekToBeginning(partitions.toArray(new TopicPartition[partitions.size()]));
+
+ final int recordsGenerated = allData.size() * maxRecordsPerKey;
+ int recordsProcessed = 0;
+
+ HashMap<String, Integer> max = new HashMap<>();
+ HashMap<String, Integer> min = new HashMap<>();
+ HashMap<String, Integer> dif = new HashMap<>();
+ HashMap<String, Long> sum = new HashMap<>();
+ HashMap<String, Long> cnt = new HashMap<>();
+ HashMap<String, Double> avg = new HashMap<>();
+ HashMap<String, Long> wcnt = new HashMap<>();
+
+ HashSet<String> keys = new HashSet<>();
+ HashMap<String, Set<Integer>> received = new HashMap<>();
+ for (String key : allData.keySet()) {
+ keys.add(key);
+ received.put(key, new HashSet<Integer>());
+ }
+
+ int retryCount = 0;
+ int maxRetry = 240; // max two minutes (500ms * 240) (before we reach the end of records)
+
+ while (true) {
+ ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+ if (records.isEmpty()) {
+ retryCount++;
+ if (retryCount > maxRetry) break;
+ } else {
+ retryCount = 0;
+
+ for (ConsumerRecord<byte[], byte[]> record : records) {
+ String key = stringDeserializer.deserialize("", record.key());
+ switch (record.topic()) {
+ case "echo":
+ Integer value = integerDeserializer.deserialize("", record.value());
+ if (value != null && value == END) {
+ keys.remove(key);
+ if (keys.isEmpty()) {
+ // we reached the end of records, set retry to 60 (max 30 seconds)
+ maxRetry = 60;
+ }
+ } else {
+ recordsProcessed++;
+ received.get(key).add(value);
+ }
+ break;
+ case "min":
+ min.put(key, integerDeserializer.deserialize("", record.value()));
+ break;
+ case "max":
+ max.put(key, integerDeserializer.deserialize("", record.value()));
+ break;
+ case "dif":
+ dif.put(key, integerDeserializer.deserialize("", record.value()));
+ break;
+ case "sum":
+ sum.put(key, longDeserializer.deserialize("", record.value()));
+ break;
+ case "cnt":
+ cnt.put(key, longDeserializer.deserialize("", record.value()));
+ break;
+ case "avg":
+ avg.put(key, doubleDeserializer.deserialize("", record.value()));
+ break;
+ case "wcnt":
+ wcnt.put(key, longDeserializer.deserialize("", record.value()));
+ break;
+ default:
+ System.out.println("unknown topic: " + record.topic());
+ }
+ }
+ }
+ }
+
+
+ System.out.println("-------------------");
+ System.out.println("Result Verification");
+ System.out.println("-------------------");
+ System.out.println("recordGenerated=" + recordsGenerated);
+ System.out.println("recordProcessed=" + recordsProcessed);
+
+ if (recordsProcessed > recordsGenerated) {
+ System.out.println("PROCESSED-MORE-THAN-GENERATED");
+ } else if (recordsProcessed < recordsGenerated) {
+ System.out.println("PROCESSED-LESS-THAN-GENERATED");
+ }
+
+ boolean success;
+ success = allData.equals(received);
+
+ if (success) {
+ System.out.println("ALL-RECORDS-DELIVERED");
+ } else {
+ int missedCount = 0;
+ for (Map.Entry<String, Set<Integer>> entry : allData.entrySet()) {
+ missedCount += received.get(entry.getKey()).size();
+ }
+ System.out.println("missedRecords=" + missedCount);
+ }
+
+ success &= verifyMin(min);
+ success &= verifyMax(max);
+ success &= verifyDif(dif);
+ success &= verifySum(sum);
+ success &= verifyCnt(cnt);
+ success &= verifyAvg(avg);
+ success &= verifyWCnt(wcnt);
+
+ System.out.println(success ? "SUCCESS" : "FAILURE");
+ }
+
+ private static boolean verifyMin(Map<String, Integer> map) {
+ boolean success = true;
+ if (map.isEmpty()) {
+ System.out.println("min is empty");
+ success = false;
+ } else {
+ System.out.println("verifying min");
+
+ for (Map.Entry<String, Integer> entry : map.entrySet()) {
+ int expected = getMin(entry.getKey());
+ if (expected != entry.getValue()) {
+ System.out.println("fail: key=" + entry.getKey() + " min=" + entry.getValue() + " expected=" + expected);
+ success = false;
+ }
+ }
+ }
+ return success;
+ }
+
+ private static boolean verifyMax(Map<String, Integer> map) {
+ boolean success = true;
+ if (map.isEmpty()) {
+ System.out.println("max is empty");
+ success = false;
+ } else {
+ System.out.println("verifying max");
+
+ for (Map.Entry<String, Integer> entry : map.entrySet()) {
+ int expected = getMax(entry.getKey());
+ if (expected != entry.getValue()) {
+ System.out.println("fail: key=" + entry.getKey() + " max=" + entry.getValue() + " expected=" + expected);
+ success = false;
+ }
+ }
+ }
+ return success;
+ }
+
+ private static boolean verifyDif(Map<String, Integer> map) {
+ boolean success = true;
+ if (map.isEmpty()) {
+ System.out.println("dif is empty");
+ success = false;
+ } else {
+ System.out.println("verifying dif");
+
+ for (Map.Entry<String, Integer> entry : map.entrySet()) {
+ int min = getMin(entry.getKey());
+ int max = getMax(entry.getKey());
+ int expected = max - min;
+ if (expected != entry.getValue()) {
+ System.out.println("fail: key=" + entry.getKey() + " dif=" + entry.getValue() + " expected=" + expected);
+ success = false;
+ }
+ }
+ }
+ return success;
+ }
+
+ private static boolean verifyCnt(Map<String, Long> map) {
+ boolean success = true;
+ if (map.isEmpty()) {
+ System.out.println("cnt is empty");
+ success = false;
+ } else {
+ System.out.println("verifying cnt");
+
+ for (Map.Entry<String, Long> entry : map.entrySet()) {
+ int min = getMin(entry.getKey());
+ int max = getMax(entry.getKey());
+ long expected = (max - min) + 1L;
+ if (expected != entry.getValue()) {
+ System.out.println("fail: key=" + entry.getKey() + " cnt=" + entry.getValue() + " expected=" + expected);
+ success = false;
+ }
+ }
+ }
+ return success;
+ }
+
+ private static boolean verifySum(Map<String, Long> map) {
+ boolean success = true;
+ if (map.isEmpty()) {
+ System.out.println("sum is empty");
+ success = false;
+ } else {
+ System.out.println("verifying sum");
+
+ for (Map.Entry<String, Long> entry : map.entrySet()) {
+ int min = getMin(entry.getKey());
+ int max = getMax(entry.getKey());
+ long expected = ((long) min + (long) max) * (max - min + 1L) / 2L;
+ if (expected != entry.getValue()) {
+ System.out.println("fail: key=" + entry.getKey() + " sum=" + entry.getValue() + " expected=" + expected);
+ success = false;
+ }
+ }
+ }
+ return success;
+ }
+
+ private static boolean verifyAvg(Map<String, Double> map) {
+ boolean success = true;
+ if (map.isEmpty()) {
+ System.out.println("avg is empty");
+ success = false;
+ } else {
+ System.out.println("verifying avg");
+
+ for (Map.Entry<String, Double> entry : map.entrySet()) {
+ int min = getMin(entry.getKey());
+ int max = getMax(entry.getKey());
+ double expected = ((long) min + (long) max) / 2.0;
+
+ if (expected != entry.getValue()) {
+ System.out.println("fail: key=" + entry.getKey() + " avg=" + entry.getValue() + " expected=" + expected);
+ success = false;
+ }
+ }
+ }
+ return success;
+ }
+
+ private static boolean verifyWCnt(Map<String, Long> map) {
+ boolean success = true;
+ if (map.isEmpty()) {
+ System.out.println("wcnt is empty");
+ success = false;
+ } else {
+ System.out.println("verifying wcnt");
+
+ for (Map.Entry<String, Long> entry : map.entrySet()) {
+ long minTime = getMinFromWKey(entry.getKey()) + START_TIME;
+ long maxTime = getMaxFromWKey(entry.getKey()) + START_TIME;
+ long winTime = getStartFromWKey(entry.getKey());
+
+ long expected = WINDOW_SIZE;
+ if (minTime > winTime) expected -= minTime - winTime;
+ if (maxTime < winTime + WINDOW_SIZE - 1) expected -= winTime + WINDOW_SIZE - 1 - maxTime;
+
+ if (expected != entry.getValue()) {
+ System.out.println("fail: key=" + entry.getKey() + " wcnt=" + entry.getValue() + " expected=" + expected);
+ success = false;
+ }
+ }
+ }
+ return success;
+ }
+
+ private static int getMin(String key) {
+ return Integer.parseInt(key.split("-")[0]);
+ }
+
+ private static int getMax(String key) {
+ return Integer.parseInt(key.split("-")[1]);
+ }
+
+ private static int getMinFromWKey(String key) {
+ return getMin(key.split("@")[0]);
+ }
+
+ private static int getMaxFromWKey(String key) {
+ return getMax(key.split("@")[0]);
+ }
+
+ private static long getStartFromWKey(String key) {
+ return Long.parseLong(key.split("@")[1]);
+ }
+
+ private static List<TopicPartition> getAllPartitions(KafkaConsumer<?, ?> consumer, String... topics) {
+ ArrayList<TopicPartition> partitions = new ArrayList<>();
+
+ for (String topic : topics) {
+ for (PartitionInfo info : consumer.partitionsFor(topic)) {
+ partitions.add(new TopicPartition(info.topic(), info.partition()));
+ }
+ }
+ return partitions;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3358e168/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
new file mode 100644
index 0000000..4a13599
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.smoketest;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.io.File;
+import java.util.Map;
+
+public class SmokeTestUtil {
+
+ public final static int WINDOW_SIZE = 100;
+ public final static long START_TIME = 60000L * 60 * 24 * 365 * 30;
+ public final static int END = Integer.MAX_VALUE;
+
+ public static <T> ProcessorSupplier<String, T> printProcessorSupplier(final String topic) {
+ return printProcessorSupplier(topic, false);
+ }
+
+ public static <T> ProcessorSupplier<String, T> printProcessorSupplier(final String topic, final boolean printOffset) {
+ return new ProcessorSupplier<String, T>() {
+ public Processor<String, T> get() {
+ return new Processor<String, T>() {
+ private int numRecordsProcessed = 0;
+ private ProcessorContext context;
+
+ @Override
+ public void init(ProcessorContext context) {
+ System.out.println("initializing processor: topic=" + topic + " taskId=" + context.taskId());
+ numRecordsProcessed = 0;
+ this.context = context;
+ }
+
+ @Override
+ public void process(String key, T value) {
+ if (printOffset) System.out.println(">>> " + context.offset());
+ numRecordsProcessed++;
+ if (numRecordsProcessed % 100 == 0) {
+ System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
+ }
+ }
+
+ @Override
+ public void punctuate(long timestamp) {
+ }
+
+ @Override
+ public void close() {
+ }
+ };
+ }
+ };
+ }
+
+ public static final class Unwindow<K, V> implements KeyValueMapper<Windowed<K>, V, KeyValue<K, V>> {
+ public KeyValue<K, V> apply(Windowed<K> winKey, V value) {
+ return new KeyValue<K, V>(winKey.value(), value);
+ }
+ }
+
+ public static Serializer<String> stringSerializer = new StringSerializer();
+
+ public static Deserializer<String> stringDeserializer = new StringDeserializer();
+
+ public static Serializer<Integer> integerSerializer = new IntegerSerializer();
+
+ public static Deserializer<Integer> integerDeserializer = new IntegerDeserializer();
+
+ public static Serializer<Long> longSerializer = new LongSerializer();
+
+ public static Deserializer<Long> longDeserializer = new LongDeserializer();
+
+ public static Serializer<Double> doubleSerializer = new Serializer<Double>() {
+
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ }
+
+ @Override
+ public byte[] serialize(String topic, Double data) {
+ if (data == null)
+ return null;
+
+ long bits = Double.doubleToLongBits(data);
+ return new byte[] {
+ (byte) (bits >>> 56),
+ (byte) (bits >>> 48),
+ (byte) (bits >>> 40),
+ (byte) (bits >>> 32),
+ (byte) (bits >>> 24),
+ (byte) (bits >>> 16),
+ (byte) (bits >>> 8),
+ (byte) bits
+ };
+ }
+
+ @Override
+ public void close() {
+ }
+ };
+
+ public static Deserializer<Double> doubleDeserializer = new Deserializer<Double>() {
+
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ }
+
+ @Override
+ public Double deserialize(String topic, byte[] data) {
+ if (data == null)
+ return null;
+ if (data.length != 8) {
+ throw new SerializationException("Size of data received by Deserializer is " +
+ "not 8");
+ }
+
+ long value = 0;
+ for (byte b : data) {
+ value <<= 8;
+ value |= b & 0xFF;
+ }
+ return Double.longBitsToDouble(value);
+ }
+
+ @Override
+ public void close() {
+ }
+ };
+
+ public static File createDir(String path) throws Exception {
+ File dir = new File(path);
+
+ dir.mkdir();
+
+ return dir;
+ }
+
+ public static File createDir(File parent, String child) throws Exception {
+ File dir = new File(parent, child);
+
+ dir.mkdir();
+
+ return dir;
+ }
+
+ public static void sleep(long duration) {
+ try {
+ Thread.sleep(duration);
+ } catch (Exception ex) {
+ //
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3358e168/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java
new file mode 100644
index 0000000..a6cd141
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.smoketest;
+
+import java.io.File;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Created by yasuhiro on 2/10/16.
+ */
+public class StreamsSmokeTest {
+
+ /**
+ * args ::= command kafka zookeeper stateDir
+ * command := "run" | "process"
+ *
+ * @param args
+ */
+ public static void main(String[] args) throws Exception {
+ String command = args[0];
+ String kafka = args.length > 1 ? args[1] : null;
+ String zookeeper = args.length > 2 ? args[2] : null;
+ String stateDir = args.length > 3 ? args[3] : null;
+
+ System.out.println("StreamsSmokeTest instance started");
+ System.out.println("command=" + command);
+ System.out.println("kafka=" + kafka);
+ System.out.println("zookeeper=" + zookeeper);
+ System.out.println("stateDir=" + stateDir);
+
+ switch (command) {
+ case "standalone":
+ SmokeTestDriver.main(args);
+ break;
+ case "run":
+ // this starts the driver (data generation and result verification)
+ final int numKeys = 10;
+ final int maxRecordsPerKey = 500;
+ Map<String, Set<Integer>> allData = SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey);
+ SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey);
+ break;
+ case "process":
+ // this starts a KafkaStreams client
+ final SmokeTestClient client = new SmokeTestClient(new File(stateDir), kafka, zookeeper);
+ client.start();
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ client.close();
+ }
+ });
+ break;
+ default:
+ System.out.println("unknown command: " + command);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3358e168/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java
new file mode 100644
index 0000000..04e264c
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.smoketest;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+
+public class TestTimestampExtractor implements TimestampExtractor {
+
+ private final long base = SmokeTestUtil.START_TIME;
+
+ @Override
+ public long extract(ConsumerRecord<Object, Object> record) {
+ switch (record.topic()) {
+ case "data":
+ return base + (Integer) record.value();
+ default:
+ return System.currentTimeMillis();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3358e168/tests/kafkatest/services/streams.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
new file mode 100644
index 0000000..192a8d9
--- /dev/null
+++ b/tests/kafkatest/services/streams.py
@@ -0,0 +1,135 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.services.service import Service
+from ducktape.utils.util import wait_until
+from ducktape.errors import DucktapeError
+
+from kafkatest.services.kafka.directory import kafka_dir
+import signal, random, requests, os.path, json
+
+class StreamsSmokeTestBaseService(Service):
+ """Base class for Streams Smoke Test services providing some common settings and functionality"""
+
+ PERSISTENT_ROOT = "/mnt/streams"
+ # The log file contains normal log4j logs written using a file appender. stdout and stderr are handled separately
+ LOG_FILE = os.path.join(PERSISTENT_ROOT, "streams.log")
+ STDOUT_FILE = os.path.join(PERSISTENT_ROOT, "streams.stdout")
+ STDERR_FILE = os.path.join(PERSISTENT_ROOT, "streams.stderr")
+ LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
+ PID_FILE = os.path.join(PERSISTENT_ROOT, "streams.pid")
+
+ logs = {
+ "streams_log": {
+ "path": LOG_FILE,
+ "collect_default": True},
+ "streams_stdout": {
+ "path": STDOUT_FILE,
+ "collect_default": True},
+ "streams_stderr": {
+ "path": STDERR_FILE,
+ "collect_default": True},
+ }
+
+ def __init__(self, context, kafka, command):
+ super(StreamsSmokeTestBaseService, self).__init__(context, 1)
+ self.kafka = kafka
+ self.args = { 'command': command }
+
+ @property
+ def node(self):
+ return self.nodes[0]
+
+ def pids(self, node):
+ try:
+ return [pid for pid in node.account.ssh_capture("cat " + self.PID_FILE, callback=int)]
+ except:
+ return []
+
+ def stop_node(self, node, clean_shutdown=True):
+ self.logger.info((clean_shutdown and "Cleanly" or "Forcibly") + " stopping Streams Smoke Test on " + str(node.account))
+ pids = self.pids(node)
+ sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
+
+ for pid in pids:
+ node.account.signal(pid, sig, allow_fail=True)
+ if clean_shutdown:
+ for pid in pids:
+ wait_until(lambda: not node.account.alive(pid), timeout_sec=60, err_msg="Streams Smoke Test process on " + str(node.account) + " took too long to exit")
+
+ node.account.ssh("rm -f " + self.PID_FILE, allow_fail=False)
+
+ def restart(self):
+ # We don't want to do any clean up here, just restart the process.
+ for node in self.nodes:
+ self.logger.info("Restarting Kafka Streams on " + str(node.account))
+ self.stop_node(node)
+ self.start_node(node)
+
+ def abortThenRestart(self):
+ # We don't want to do any clean up here, just abort then restart the process. The running service is killed immediately.
+ for node in self.nodes:
+ self.logger.info("Aborting Kafka Streams on " + str(node.account))
+ self.stop_node(node, False)
+ self.logger.info("Restarting Kafka Streams on " + str(node.account))
+ self.start_node(node)
+
+ def wait(self):
+ for node in self.nodes:
+ for pid in self.pids(node):
+ wait_until(lambda: not node.account.alive(pid), timeout_sec=120, err_msg="Streams Smoke Test process on " + str(node.account) + " took too long to exit")
+
+ def clean_node(self, node):
+ node.account.kill_process("streams", clean_shutdown=False, allow_fail=True)
+ node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False)
+
+ def start_cmd(self, node):
+ args = self.args.copy()
+ args['kafka'] = self.kafka.bootstrap_servers()
+ args['zk'] = self.kafka.zk.connect_setting()
+ args['state_dir'] = self.PERSISTENT_ROOT
+ args['stdout'] = self.STDOUT_FILE
+ args['stderr'] = self.STDERR_FILE
+ args['pidfile'] = self.PID_FILE
+ args['log4j'] = self.LOG4J_CONFIG_FILE
+ args['kafka_dir'] = kafka_dir(node)
+
+ cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
+ "/opt/%(kafka_dir)s/bin/streams-smoke-test.sh %(command)s %(kafka)s %(zk)s %(state_dir)s " \
+ " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
+
+ return cmd
+
+ def start_node(self, node):
+ node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False)
+
+ node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('tools_log4j.properties', log_file=self.LOG_FILE))
+
+ self.logger.info("Starting StreamsSmokeTest process on " + str(node.account))
+ with node.account.monitor_log(self.STDOUT_FILE) as monitor:
+ node.account.ssh(self.start_cmd(node))
+ monitor.wait_until('StreamsSmokeTest instance started', timeout_sec=15, err_msg="Never saw message indicating StreamsSmokeTest finished startup on " + str(node.account))
+
+ if len(self.pids(node)) == 0:
+ raise RuntimeError("No process ids recorded")
+
+
+class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService):
+ def __init__(self, context, kafka):
+ super(StreamsSmokeTestDriverService, self).__init__(context, kafka, "run")
+
+class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService):
+ def __init__(self, context, kafka):
+ super(StreamsSmokeTestJobRunnerService, self).__init__(context, kafka, "process")
http://git-wip-us.apache.org/repos/asf/kafka/blob/3358e168/tests/kafkatest/tests/streams_bounce_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams_bounce_test.py b/tests/kafkatest/tests/streams_bounce_test.py
new file mode 100644
index 0000000..176f010
--- /dev/null
+++ b/tests/kafkatest/tests/streams_bounce_test.py
@@ -0,0 +1,71 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
+from ducktape.utils.util import wait_until
+import time
+
+class StreamsBounceTest(KafkaTest):
+ """
+ Simple test of Kafka Streams.
+ """
+
+ def __init__(self, test_context):
+ super(StreamsBounceTest, self).__init__(test_context, num_zk=1, num_brokers=2, topics={
+ 'echo' : { 'partitions': 5, 'replication-factor': 2 },
+ 'data' : { 'partitions': 5, 'replication-factor': 2 },
+ 'min' : { 'partitions': 5, 'replication-factor': 2 },
+ 'max' : { 'partitions': 5, 'replication-factor': 2 },
+ 'sum' : { 'partitions': 5, 'replication-factor': 2 },
+ 'dif' : { 'partitions': 5, 'replication-factor': 2 },
+ 'cnt' : { 'partitions': 5, 'replication-factor': 2 },
+ 'avg' : { 'partitions': 5, 'replication-factor': 2 },
+ 'wcnt' : { 'partitions': 5, 'replication-factor': 2 }
+ })
+
+ self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
+ self.processor1 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
+
+ def test_bounce(self):
+ """
+ Start a smoke test client, then abort (kill -9) and restart it a few times.
+ Ensure that all records are delivered.
+ """
+
+ self.driver.start()
+
+ self.processor1.start()
+
+ time.sleep(15);
+
+ self.processor1.abortThenRestart()
+
+ time.sleep(15);
+
+ # enable this after we add change log partition replicas
+ #self.kafka.signal_leader("data")
+
+ time.sleep(15);
+
+ self.processor1.abortThenRestart()
+
+ self.driver.wait()
+ self.driver.stop()
+
+ self.processor1.stop()
+
+ node = self.driver.node
+ node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False)
http://git-wip-us.apache.org/repos/asf/kafka/blob/3358e168/tests/kafkatest/tests/streams_smoke_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams_smoke_test.py b/tests/kafkatest/tests/streams_smoke_test.py
new file mode 100644
index 0000000..2861837
--- /dev/null
+++ b/tests/kafkatest/tests/streams_smoke_test.py
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
+from ducktape.utils.util import wait_until
+import time
+
+class StreamsSmokeTest(KafkaTest):
+ """
+ Simple test of Kafka Streams.
+ """
+
+ def __init__(self, test_context):
+ super(StreamsSmokeTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
+ 'echo' : { 'partitions': 5, 'replication-factor': 1 },
+ 'data' : { 'partitions': 5, 'replication-factor': 1 },
+ 'min' : { 'partitions': 5, 'replication-factor': 1 },
+ 'max' : { 'partitions': 5, 'replication-factor': 1 },
+ 'sum' : { 'partitions': 5, 'replication-factor': 1 },
+ 'dif' : { 'partitions': 5, 'replication-factor': 1 },
+ 'cnt' : { 'partitions': 5, 'replication-factor': 1 },
+ 'avg' : { 'partitions': 5, 'replication-factor': 1 },
+ 'wcnt' : { 'partitions': 5, 'replication-factor': 1 }
+ })
+
+ self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
+ self.processor1 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
+ self.processor2 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
+ self.processor3 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
+ self.processor4 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
+
+ def test_streams(self):
+ """
+ Start a few smoke test clients, then repeat start a new one, stop (cleanly) running one a few times.
+ Ensure that all results (stats on values computed by Kafka Streams) are correct.
+ """
+
+ self.driver.start()
+
+ self.processor1.start()
+ self.processor2.start()
+
+ time.sleep(15);
+
+ self.processor3.start()
+ self.processor1.stop()
+
+ time.sleep(15);
+
+ self.processor4.start();
+
+ self.driver.wait()
+ self.driver.stop()
+
+ self.processor2.stop()
+ self.processor3.stop()
+ self.processor4.stop()
+
+ node = self.driver.node
+ node.account.ssh("grep SUCCESS %s" % self.driver.STDOUT_FILE, allow_fail=False)
|