From commits-return-3643-apmail-kafka-commits-archive=kafka.apache.org@kafka.apache.org Thu Feb 25 02:11:40 2016 Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BDB1118486 for ; Thu, 25 Feb 2016 02:11:40 +0000 (UTC) Received: (qmail 72041 invoked by uid 500); 25 Feb 2016 02:11:40 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 72010 invoked by uid 500); 25 Feb 2016 02:11:40 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 72001 invoked by uid 99); 25 Feb 2016 02:11:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 Feb 2016 02:11:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7CD3DE8EDF; Thu, 25 Feb 2016 02:11:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: guozhang@apache.org To: commits@kafka.apache.org Message-Id: <2043d68086be4c8c905cbcae18ddd4be@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: MINOR: enhance streams system test Date: Thu, 25 Feb 2016 02:11:40 +0000 (UTC) Repository: kafka Updated Branches: refs/heads/trunk fa05752cc -> 13b8fb295 MINOR: enhance streams system test guozhangwang * add table aggregate to the system test * actually create change log partition replica Author: Yasuhiro Matsuda Reviewers: Guozhang Wang Closes #966 from ymatsuda/enh_systest Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/13b8fb29 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/13b8fb29 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/13b8fb29 Branch: refs/heads/trunk Commit: 13b8fb295c3becc27e7954af623fe90b3062409e Parents: fa05752 Author: Yasuhiro Matsuda Authored: Wed Feb 24 18:11:36 2016 -0800 Committer: Guozhang Wang Committed: Wed Feb 24 18:11:36 2016 -0800 ---------------------------------------------------------------------- .../org/apache/kafka/streams/StreamsConfig.java | 2 - .../internals/InternalTopicManager.java | 8 +- .../internals/ProcessorStateManager.java | 13 ++- .../internals/StreamPartitionAssignor.java | 9 +- .../processor/internals/StreamThread.java | 4 +- .../streams/smoketest/SmokeTestClient.java | 17 +++ .../streams/smoketest/SmokeTestDriver.java | 111 ++++++++++++++++--- .../kafka/streams/smoketest/SmokeTestUtil.java | 41 +++++++ .../streams/smoketest/StreamsSmokeTest.java | 3 - tests/kafkatest/tests/streams_bounce_test.py | 5 +- tests/kafkatest/tests/streams_smoke_test.py | 3 +- 11 files changed, 182 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/13b8fb29/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 0cf2888..71d1a6a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -293,11 +293,9 @@ public class StreamsConfig extends AbstractConfig { private void removeStreamsSpecificConfigs(Map props) { props.remove(StreamsConfig.JOB_ID_CONFIG); - props.remove(StreamsConfig.REPLICATION_FACTOR_CONFIG); props.remove(StreamsConfig.STATE_DIR_CONFIG); props.remove(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG); props.remove(StreamsConfig.NUM_STREAM_THREADS_CONFIG); - props.remove(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG); props.remove(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG); props.remove(InternalConfig.STREAM_THREAD_INSTANCE); } http://git-wip-us.apache.org/repos/asf/kafka/blob/13b8fb29/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java index 3768260..ce95bb0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java @@ -47,6 +47,7 @@ public class InternalTopicManager { private static final String ZK_DELETE_TOPIC_PATH = "/admin/delete_topics"; private final ZkClient zkClient; + private final int replicationFactor; private class ZKStringSerializer implements ZkSerializer { @@ -72,11 +73,12 @@ public class InternalTopicManager { } } - public InternalTopicManager(String zkConnect) { - zkClient = new ZkClient(zkConnect, 30 * 1000, 30 * 1000, new ZKStringSerializer()); + public InternalTopicManager(String zkConnect, int replicationFactor) { + this.zkClient = new ZkClient(zkConnect, 30 * 1000, 30 * 1000, new ZKStringSerializer()); + this.replicationFactor = replicationFactor; } - public void makeReady(String topic, int numPartitions, int replicationFactor) { + public void makeReady(String topic, int numPartitions) { boolean topicNotReady = true; while (topicNotReady) { http://git-wip-us.apache.org/repos/asf/kafka/blob/13b8fb29/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 30441c5..bae30e3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -217,18 +217,23 @@ public class ProcessorStateManager { restoreConsumer.seekToBeginning(storePartition); } - // restore its state from changelog records; while restoring the log end offset - // should not change since it is only written by this thread. + // restore its state from changelog records long limit = offsetLimit(storePartition); while (true) { + long offset = 0L; for (ConsumerRecord record : restoreConsumer.poll(100).records(storePartition)) { - if (record.offset() >= limit) break; + offset = record.offset(); + if (offset >= limit) break; stateRestoreCallback.restore(record.key(), record.value()); } - if (restoreConsumer.position(storePartition) == endOffset) { + if (offset >= limit) { + break; + } else if (restoreConsumer.position(storePartition) == endOffset) { break; } else if (restoreConsumer.position(storePartition) > endOffset) { + // For a logging enabled changelog (no offset limit), + // the log end offset should not change while restoring since it is only written by this thread. throw new IllegalStateException("Log end offset should not change while restoring"); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/13b8fb29/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index f49601c..440efc8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -115,8 +115,11 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable this.topicGroups = streamThread.builder.topicGroups(); - if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG)) - internalTopicManager = new InternalTopicManager((String) configs.get(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG)); + if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG)) { + internalTopicManager = new InternalTopicManager( + (String) configs.get(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG), + (Integer) configs.get(StreamsConfig.REPLICATION_FACTOR_CONFIG)); + } } @Override @@ -289,7 +292,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable numPartitions = task.partition + 1; } - internalTopicManager.makeReady(topic, numPartitions, 1); + internalTopicManager.makeReady(topic, numPartitions); // wait until the topic metadata has been propagated to all brokers List partitions; http://git-wip-us.apache.org/repos/asf/kafka/blob/13b8fb29/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 10e458a..7d460e1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -283,6 +283,9 @@ public class StreamThread extends Thread { // already logged in commitAll() } + // Close standby tasks before closing the restore consumer since closing standby tasks uses the restore consumer. + removeStandbyTasks(); + // We need to first close the underlying clients before closing the state // manager, for example we need to make sure producer's message sends // have all been acked before the state manager records @@ -304,7 +307,6 @@ public class StreamThread extends Thread { } removeStreamTasks(); - removeStandbyTasks(); log.info("Stream thread shutdown complete [" + this.getName() + "]"); } http://git-wip-us.apache.org/repos/asf/kafka/blob/13b8fb29/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java index 7f1b343..fec447f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java @@ -93,6 +93,7 @@ public class SmokeTestClient extends SmokeTestUtil { props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3); props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2); props.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100); + props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); KStreamBuilder builder = new KStreamBuilder(); @@ -235,6 +236,22 @@ public class SmokeTestClient extends SmokeTestUtil { } ).to("wcnt", stringSerializer, longSerializer); + // test repartition + Agg agg = new Agg(); + cntTable.aggregate( + agg.init(), + agg.adder(), + agg.remover(), + agg.selector(), + stringSerializer, + longSerializer, + longSerializer, + stringDeserializer, + longDeserializer, + longDeserializer, + "cntByCnt" + ).to("tagg", stringSerializer, longSerializer); + return new KafkaStreams(builder, props); } http://git-wip-us.apache.org/repos/asf/kafka/blob/13b8fb29/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java index e56a369..c0a6f46 100644 --- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java @@ -198,7 +198,7 @@ public class SmokeTestDriver extends SmokeTestUtil { props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); KafkaConsumer consumer = new KafkaConsumer<>(props); - List partitions = getAllPartitions(consumer, "echo", "max", "min", "dif", "sum", "cnt", "avg", "wcnt"); + List partitions = getAllPartitions(consumer, "echo", "max", "min", "dif", "sum", "cnt", "avg", "wcnt", "tagg"); consumer.assign(partitions); consumer.seekToBeginning(partitions.toArray(new TopicPartition[partitions.size()])); @@ -212,6 +212,7 @@ public class SmokeTestDriver extends SmokeTestUtil { HashMap cnt = new HashMap<>(); HashMap avg = new HashMap<>(); HashMap wcnt = new HashMap<>(); + HashMap tagg = new HashMap<>(); HashSet keys = new HashSet<>(); HashMap> received = new HashMap<>(); @@ -268,6 +269,9 @@ public class SmokeTestDriver extends SmokeTestUtil { case "wcnt": wcnt.put(key, longDeserializer.deserialize("", record.value())); break; + case "tagg": + tagg.put(key, longDeserializer.deserialize("", record.value())); + break; default: System.out.println("unknown topic: " + record.topic()); } @@ -301,18 +305,19 @@ public class SmokeTestDriver extends SmokeTestUtil { System.out.println("missedRecords=" + missedCount); } - success &= verifyMin(min); - success &= verifyMax(max); - success &= verifyDif(dif); - success &= verifySum(sum); - success &= verifyCnt(cnt); - success &= verifyAvg(avg); - success &= verifyWCnt(wcnt); + success &= verifyMin(min, allData); + success &= verifyMax(max, allData); + success &= verifyDif(dif, allData); + success &= verifySum(sum, allData); + success &= verifyCnt(cnt, allData); + success &= verifyAvg(avg, allData); + success &= verifyWCnt(wcnt, allData); + success &= verifyTAgg(tagg, allData); System.out.println(success ? "SUCCESS" : "FAILURE"); } - private static boolean verifyMin(Map map) { + private static boolean verifyMin(Map map, Map> allData) { boolean success = true; if (map.isEmpty()) { System.out.println("min is empty"); @@ -320,6 +325,10 @@ public class SmokeTestDriver extends SmokeTestUtil { } else { System.out.println("verifying min"); + if (map.size() != allData.size()) { + System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size()); + success = false; + } for (Map.Entry entry : map.entrySet()) { int expected = getMin(entry.getKey()); if (expected != entry.getValue()) { @@ -331,7 +340,7 @@ public class SmokeTestDriver extends SmokeTestUtil { return success; } - private static boolean verifyMax(Map map) { + private static boolean verifyMax(Map map, Map> allData) { boolean success = true; if (map.isEmpty()) { System.out.println("max is empty"); @@ -339,6 +348,10 @@ public class SmokeTestDriver extends SmokeTestUtil { } else { System.out.println("verifying max"); + if (map.size() != allData.size()) { + System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size()); + success = false; + } for (Map.Entry entry : map.entrySet()) { int expected = getMax(entry.getKey()); if (expected != entry.getValue()) { @@ -350,7 +363,7 @@ public class SmokeTestDriver extends SmokeTestUtil { return success; } - private static boolean verifyDif(Map map) { + private static boolean verifyDif(Map map, Map> allData) { boolean success = true; if (map.isEmpty()) { System.out.println("dif is empty"); @@ -358,6 +371,10 @@ public class SmokeTestDriver extends SmokeTestUtil { } else { System.out.println("verifying dif"); + if (map.size() != allData.size()) { + System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size()); + success = false; + } for (Map.Entry entry : map.entrySet()) { int min = getMin(entry.getKey()); int max = getMax(entry.getKey()); @@ -371,7 +388,7 @@ public class SmokeTestDriver extends SmokeTestUtil { return success; } - private static boolean verifyCnt(Map map) { + private static boolean verifyCnt(Map map, Map> allData) { boolean success = true; if (map.isEmpty()) { System.out.println("cnt is empty"); @@ -379,6 +396,10 @@ public class SmokeTestDriver extends SmokeTestUtil { } else { System.out.println("verifying cnt"); + if (map.size() != allData.size()) { + System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size()); + success = false; + } for (Map.Entry entry : map.entrySet()) { int min = getMin(entry.getKey()); int max = getMax(entry.getKey()); @@ -392,7 +413,7 @@ public class SmokeTestDriver extends SmokeTestUtil { return success; } - private static boolean verifySum(Map map) { + private static boolean verifySum(Map map, Map> allData) { boolean success = true; if (map.isEmpty()) { System.out.println("sum is empty"); @@ -400,6 +421,10 @@ public class SmokeTestDriver extends SmokeTestUtil { } else { System.out.println("verifying sum"); + if (map.size() != allData.size()) { + System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size()); + success = false; + } for (Map.Entry entry : map.entrySet()) { int min = getMin(entry.getKey()); int max = getMax(entry.getKey()); @@ -413,7 +438,7 @@ public class SmokeTestDriver extends SmokeTestUtil { return success; } - private static boolean verifyAvg(Map map) { + private static boolean verifyAvg(Map map, Map> allData) { boolean success = true; if (map.isEmpty()) { System.out.println("avg is empty"); @@ -421,6 +446,10 @@ public class SmokeTestDriver extends SmokeTestUtil { } else { System.out.println("verifying avg"); + if (map.size() != allData.size()) { + System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size()); + success = false; + } for (Map.Entry entry : map.entrySet()) { int min = getMin(entry.getKey()); int max = getMax(entry.getKey()); @@ -435,7 +464,7 @@ public class SmokeTestDriver extends SmokeTestUtil { return success; } - private static boolean verifyWCnt(Map map) { + private static boolean verifyWCnt(Map map, Map> allData) { boolean success = true; if (map.isEmpty()) { System.out.println("wcnt is empty"); @@ -443,6 +472,17 @@ public class SmokeTestDriver extends SmokeTestUtil { } else { System.out.println("verifying wcnt"); + int expectedSize = 0; + for (Set values : allData.values()) { + int maxValue = Collections.max(values); + int minValue = Collections.min(values); + expectedSize += maxValue / WINDOW_SIZE + 1; + expectedSize -= minValue / WINDOW_SIZE; + } + if (map.size() != expectedSize) { + System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + expectedSize); + success = false; + } for (Map.Entry entry : map.entrySet()) { long minTime = getMinFromWKey(entry.getKey()) + START_TIME; long maxTime = getMaxFromWKey(entry.getKey()) + START_TIME; @@ -461,6 +501,47 @@ public class SmokeTestDriver extends SmokeTestUtil { return success; } + private static boolean verifyTAgg(Map map, Map> allData) { + boolean success = true; + if (map.isEmpty()) { + System.out.println("tagg is empty"); + success = false; + } else { + System.out.println("verifying tagg"); + + // generate expected answer + Map expected = new HashMap<>(); + for (String key : allData.keySet()) { + int min = getMin(key); + int max = getMax(key); + String cnt = Long.toString(max - min + 1L); + + if (expected.containsKey(cnt)) { + expected.put(cnt, expected.get(cnt) + 1L); + } else { + expected.put(cnt, 1L); + } + } + + // check the result + for (Map.Entry entry : map.entrySet()) { + String key = entry.getKey(); + Long expectedCount = expected.remove(key); + if (expectedCount == null) + expectedCount = 0L; + + if (entry.getValue() != expectedCount) { + System.out.println("fail: key=" + key + " tagg=" + entry.getValue() + " expected=" + expected.get(key)); + success = false; + } + } + for (Map.Entry entry : expected.entrySet()) { + System.out.println("fail: missingKey=" + entry.getKey() + " expected=" + entry.getValue()); + } + } + return success; + } + private static int getMin(String key) { return Integer.parseInt(key.split("-")[0]); } http://git-wip-us.apache.org/repos/asf/kafka/blob/13b8fb29/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java index 4a13599..3f5503f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java +++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java @@ -27,6 +27,8 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.Processor; @@ -87,6 +89,45 @@ public class SmokeTestUtil { } } + public static class Agg { + + public KeyValueMapper> selector() { + return new KeyValueMapper>() { + @Override + public KeyValue apply(String key, Long value) { + return new KeyValue<>(Long.toString(value), 1L); + } + }; + } + + public Initializer init() { + return new Initializer() { + @Override + public Long apply() { + return 0L; + } + }; + } + + public Aggregator adder() { + return new Aggregator() { + @Override + public Long apply(String aggKey, Long value, Long aggregate) { + return aggregate + value; + } + }; + } + + public Aggregator remover() { + return new Aggregator() { + @Override + public Long apply(String aggKey, Long value, Long aggregate) { + return aggregate - value; + } + }; + } + } + public static Serializer stringSerializer = new StringSerializer(); public static Deserializer stringDeserializer = new StringDeserializer(); http://git-wip-us.apache.org/repos/asf/kafka/blob/13b8fb29/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java index a6cd141..c26544e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java @@ -21,9 +21,6 @@ import java.io.File; import java.util.Map; import java.util.Set; -/** - * Created by yasuhiro on 2/10/16. - */ public class StreamsSmokeTest { /** http://git-wip-us.apache.org/repos/asf/kafka/blob/13b8fb29/tests/kafkatest/tests/streams_bounce_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/streams_bounce_test.py b/tests/kafkatest/tests/streams_bounce_test.py index 176f010..2b9c4d6 100644 --- a/tests/kafkatest/tests/streams_bounce_test.py +++ b/tests/kafkatest/tests/streams_bounce_test.py @@ -33,7 +33,8 @@ class StreamsBounceTest(KafkaTest): 'dif' : { 'partitions': 5, 'replication-factor': 2 }, 'cnt' : { 'partitions': 5, 'replication-factor': 2 }, 'avg' : { 'partitions': 5, 'replication-factor': 2 }, - 'wcnt' : { 'partitions': 5, 'replication-factor': 2 } + 'wcnt' : { 'partitions': 5, 'replication-factor': 2 }, + 'tagg' : { 'partitions': 5, 'replication-factor': 2 } }) self.driver = StreamsSmokeTestDriverService(test_context, self.kafka) @@ -58,7 +59,7 @@ class StreamsBounceTest(KafkaTest): # enable this after we add change log partition replicas #self.kafka.signal_leader("data") - time.sleep(15); + #time.sleep(15); self.processor1.abortThenRestart() http://git-wip-us.apache.org/repos/asf/kafka/blob/13b8fb29/tests/kafkatest/tests/streams_smoke_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/streams_smoke_test.py b/tests/kafkatest/tests/streams_smoke_test.py index 2861837..48e4db8 100644 --- a/tests/kafkatest/tests/streams_smoke_test.py +++ b/tests/kafkatest/tests/streams_smoke_test.py @@ -33,7 +33,8 @@ class StreamsSmokeTest(KafkaTest): 'dif' : { 'partitions': 5, 'replication-factor': 1 }, 'cnt' : { 'partitions': 5, 'replication-factor': 1 }, 'avg' : { 'partitions': 5, 'replication-factor': 1 }, - 'wcnt' : { 'partitions': 5, 'replication-factor': 1 } + 'wcnt' : { 'partitions': 5, 'replication-factor': 1 }, + 'tagg' : { 'partitions': 5, 'replication-factor': 1 } }) self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)