kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbej...@apache.org
Subject [kafka] branch trunk updated: MINOR: add test for StreamsSmokeTestDriver (#6231)
Date Fri, 15 Feb 2019 16:24:16 GMT
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3656ad9  MINOR: add test for StreamsSmokeTestDriver (#6231)
3656ad9 is described below

commit 3656ad93bc7fb74138e36d39575c3b86c319577b
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Fri Feb 15 10:23:57 2019 -0600

    MINOR: add test for StreamsSmokeTestDriver (#6231)
    
    * MINOR: add test for StreamsSmokeTestDriver
    Hi @bbejeck @mjsax @ableegoldman @guozhangwang ,
    
    Please take a look at this when you get the chance.
    
    The primary concern is adding the test. It will help us verify changes to the smoke test (such as adding suppression).
    
    I've also added some extra output to the smoke test stdout, which will hopefully aid us in diagnosing the flaky tests.
    
    Finally, I bundled in some cleanup. It was my intention to do that in a separate PR, but it wound up getting smashed together during refactoring.
    
    Please let me know if you'd prefer for me to pull any of these out into a separate request.
    
    Thanks,
    -John
    
    Also, add more output for debuggability
    
    * cleanup
    
    * cleanup
    
    * refactor
    
    * refactor
    
    * remove redundant printlns
    
    * Update EmbeddedKafkaCluster.java
    
    * move to integration package
    
    * replace early-exit on pass
    
    * use classrule for embedded kafka
    
    * pull in smoke test improvements from side branch
    
    * try-with-resources
    
    * format events instead of printing long lines
    
    * minor formatting fix
    
    Reviewers:  Matthias J. Sax <mjsax@apache.org>, Bill Bejeck <bbejeck@gmail.com>
---
 .../SmokeTestDriverIntegrationTest.java            | 134 +++++
 .../kafka/streams/tests/SmokeTestClient.java       | 189 +++----
 .../kafka/streams/tests/SmokeTestDriver.java       | 564 ++++++++-------------
 .../apache/kafka/streams/tests/SmokeTestUtil.java  |   7 +-
 .../kafka/streams/tests/StreamsSmokeTest.java      |  10 +-
 5 files changed, 438 insertions(+), 466 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
new file mode 100644
index 0000000..82f86c2
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.tests.SmokeTestClient;
+import org.apache.kafka.streams.tests.SmokeTestDriver;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.kafka.streams.tests.SmokeTestDriver.generate;
+import static org.apache.kafka.streams.tests.SmokeTestDriver.verify;
+
+public class SmokeTestDriverIntegrationTest {
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);
+
+
+    private static class Driver extends Thread {
+        private String bootstrapServers;
+        private int numKeys;
+        private int maxRecordsPerKey;
+        private Exception exception = null;
+        private SmokeTestDriver.VerificationResult result;
+
+        private Driver(final String bootstrapServers, final int numKeys, final int maxRecordsPerKey) {
+            this.bootstrapServers = bootstrapServers;
+            this.numKeys = numKeys;
+            this.maxRecordsPerKey = maxRecordsPerKey;
+        }
+
+        @Override
+        public void run() {
+            try {
+                final Map<String, Set<Integer>> allData = generate(bootstrapServers, numKeys, maxRecordsPerKey, true);
+                result = verify(bootstrapServers, allData, maxRecordsPerKey);
+
+            } catch (final Exception ex) {
+                this.exception = ex;
+            }
+        }
+
+        public Exception exception() {
+            return exception;
+        }
+
+        SmokeTestDriver.VerificationResult result() {
+            return result;
+        }
+
+    }
+
+    @Test
+    public void shouldWorkWithRebalance() throws InterruptedException {
+        int numClientsCreated = 0;
+        final ArrayList<SmokeTestClient> clients = new ArrayList<>();
+
+        CLUSTER.createTopics(SmokeTestDriver.topics());
+
+        final String bootstrapServers = CLUSTER.bootstrapServers();
+        final Driver driver = new Driver(bootstrapServers, 10, 1000);
+        driver.start();
+        System.out.println("started driver");
+
+
+        final Properties props = new Properties();
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+
+        // cycle out Streams instances as long as the test is running.
+        while (driver.isAlive()) {
+            // take a nap
+            Thread.sleep(1000);
+
+            // add a new client
+            final SmokeTestClient smokeTestClient = new SmokeTestClient("streams-" + numClientsCreated++);
+            clients.add(smokeTestClient);
+            smokeTestClient.start(props);
+
+            while (!clients.get(clients.size() - 1).started()) {
+                Thread.sleep(100);
+            }
+
+            // let the oldest client die of "natural causes"
+            if (clients.size() >= 3) {
+                clients.remove(0).closeAsync();
+            }
+        }
+        try {
+            // wait for verification to finish
+            driver.join();
+
+
+        } finally {
+            // whether or not the assertions failed, tell all the streams instances to stop
+            for (final SmokeTestClient client : clients) {
+                client.closeAsync();
+            }
+
+            // then, wait for them to stop
+            for (final SmokeTestClient client : clients) {
+                client.close();
+            }
+        }
+
+        // check to make sure that it actually succeeded
+        if (driver.exception() != null) {
+            driver.exception().printStackTrace();
+            throw new AssertionError(driver.exception());
+        }
+        Assert.assertTrue(driver.result().result(), driver.result().passed());
+    }
+
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index a396ad1..a8f974a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -16,89 +16,84 @@
  */
 package org.apache.kafka.streams.tests;
 
-import java.time.Duration;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Grouped;
-import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.TimeWindows;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.test.TestUtils;
 
+import java.time.Duration;
 import java.util.Properties;
 
 public class SmokeTestClient extends SmokeTestUtil {
 
-    private final Properties streamsProperties;
+    private final String name;
 
     private Thread thread;
     private KafkaStreams streams;
     private boolean uncaughtException = false;
+    private boolean started;
 
-    public SmokeTestClient(final Properties streamsProperties) {
+    public SmokeTestClient(final String name) {
         super();
-        this.streamsProperties = streamsProperties;
+        this.name = name;
     }
 
-    public void start() {
+    public boolean started() {
+        return started;
+    }
+
+    public void start(final Properties streamsProperties) {
         streams = createKafkaStreams(streamsProperties);
-        streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
-            @Override
-            public void uncaughtException(final Thread t, final Throwable e) {
-                System.out.println("SMOKE-TEST-CLIENT-EXCEPTION");
-                uncaughtException = true;
-                e.printStackTrace();
-            }
+        streams.setUncaughtExceptionHandler((t, e) -> {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
+            uncaughtException = true;
+            e.printStackTrace();
         });
 
-        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
-            @Override
-            public void run() {
-                close();
-            }
-        }));
+        Runtime.getRuntime().addShutdownHook(new Thread(this::close));
 
-        thread = new Thread() {
-            public void run() {
-                streams.start();
-            }
-        };
+        thread = new Thread(() -> streams.start());
         thread.start();
     }
 
+    public void closeAsync() {
+        streams.close(Duration.ZERO);
+    }
+
     public void close() {
         streams.close(Duration.ofSeconds(5));
         // do not remove these printouts since they are needed for health scripts
         if (!uncaughtException) {
-            System.out.println("SMOKE-TEST-CLIENT-CLOSED");
+            System.out.println(name + ": SMOKE-TEST-CLIENT-CLOSED");
         }
         try {
             thread.join();
         } catch (final Exception ex) {
             // do not remove these printouts since they are needed for health scripts
-            System.out.println("SMOKE-TEST-CLIENT-EXCEPTION");
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
             // ignore
         }
     }
 
-    private static Properties getStreamsConfig(final Properties props) {
+    private Properties getStreamsConfig(final Properties props) {
         final Properties fullProps = new Properties(props);
         fullProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
+        fullProps.put(StreamsConfig.CLIENT_ID_CONFIG, "SmokeTest-" + name);
         fullProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
         fullProps.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
         fullProps.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100);
@@ -106,23 +101,35 @@ public class SmokeTestClient extends SmokeTestUtil {
         fullProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
         fullProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         fullProps.put(ProducerConfig.ACKS_CONFIG, "all");
-
+        fullProps.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
         fullProps.putAll(props);
         return fullProps;
     }
 
-    private static KafkaStreams createKafkaStreams(final Properties props) {
+    private KafkaStreams createKafkaStreams(final Properties props) {
+        final Topology build = getTopology();
+        final KafkaStreams streamsClient = new KafkaStreams(build, getStreamsConfig(props));
+        streamsClient.setStateListener((newState, oldState) -> {
+            System.out.printf("%s: %s -> %s%n", name, oldState, newState);
+            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
+                started = true;
+            }
+        });
+        streamsClient.setUncaughtExceptionHandler((t, e) -> {
+            System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
+            streamsClient.close(Duration.ofSeconds(30));
+        });
+
+        return streamsClient;
+    }
+
+    public Topology getTopology() {
         final StreamsBuilder builder = new StreamsBuilder();
         final Consumed<String, Integer> stringIntConsumed = Consumed.with(stringSerde, intSerde);
         final KStream<String, Integer> source = builder.stream("data", stringIntConsumed);
         source.to("echo", Produced.with(stringSerde, intSerde));
-        final KStream<String, Integer> data = source.filter(new Predicate<String, Integer>() {
-            @Override
-            public boolean test(final String key, final Integer value) {
-                return value == null || value != END;
-            }
-        });
-        data.process(SmokeTestUtil.printProcessorSupplier("data"));
+        final KStream<String, Integer> data = source.filter((key, value) -> value == null || value != END);
+        data.process(SmokeTestUtil.printProcessorSupplier("data", name));
 
         // min
         final KGroupedStream<String, Integer> groupedData = data.groupByKey(Grouped.with(stringSerde, intSerde));
@@ -130,97 +137,66 @@ public class SmokeTestClient extends SmokeTestUtil {
         groupedData
             .windowedBy(TimeWindows.of(Duration.ofDays(1)))
             .aggregate(
-                new Initializer<Integer>() {
-                    public Integer apply() {
-                        return Integer.MAX_VALUE;
-                    }
-                },
-                new Aggregator<String, Integer, Integer>() {
-                    @Override
-                    public Integer apply(final String aggKey, final Integer value, final Integer aggregate) {
-                        return (value < aggregate) ? value : aggregate;
-                    }
-                },
+                () -> Integer.MAX_VALUE,
+                (aggKey, value, aggregate) -> (value < aggregate) ? value : aggregate,
                 Materialized.<String, Integer, WindowStore<Bytes, byte[]>>as("uwin-min").withValueSerde(intSerde))
-            .toStream(new Unwindow<String, Integer>())
+            .toStream(new Unwindow<>())
             .to("min", Produced.with(stringSerde, intSerde));
 
         final KTable<String, Integer> minTable = builder.table(
             "min",
             Consumed.with(stringSerde, intSerde),
-            Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("minStoreName"));
-        minTable.toStream().process(SmokeTestUtil.printProcessorSupplier("min"));
+            Materialized.as("minStoreName"));
+        minTable.toStream().process(SmokeTestUtil.printProcessorSupplier("min", name));
 
         // max
         groupedData
             .windowedBy(TimeWindows.of(Duration.ofDays(2)))
             .aggregate(
-                new Initializer<Integer>() {
-                    public Integer apply() {
-                        return Integer.MIN_VALUE;
-                    }
-                },
-                new Aggregator<String, Integer, Integer>() {
-                    @Override
-                    public Integer apply(final String aggKey, final Integer value, final Integer aggregate) {
-                        return (value > aggregate) ? value : aggregate;
-                    }
-                },
+                () -> Integer.MIN_VALUE,
+                (aggKey, value, aggregate) -> (value > aggregate) ? value : aggregate,
                 Materialized.<String, Integer, WindowStore<Bytes, byte[]>>as("uwin-max").withValueSerde(intSerde))
-            .toStream(new Unwindow<String, Integer>())
+            .toStream(new Unwindow<>())
             .to("max", Produced.with(stringSerde, intSerde));
 
         final KTable<String, Integer> maxTable = builder.table(
             "max",
             Consumed.with(stringSerde, intSerde),
-            Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("maxStoreName"));
-        maxTable.toStream().process(SmokeTestUtil.printProcessorSupplier("max"));
+            Materialized.as("maxStoreName"));
+        maxTable.toStream().process(SmokeTestUtil.printProcessorSupplier("max", name));
 
         // sum
         groupedData
             .windowedBy(TimeWindows.of(Duration.ofDays(2)))
             .aggregate(
-                new Initializer<Long>() {
-                    public Long apply() {
-                        return 0L;
-                    }
-                },
-                new Aggregator<String, Integer, Long>() {
-                    @Override
-                    public Long apply(final String aggKey, final Integer value, final Long aggregate) {
-                        return (long) value + aggregate;
-                    }
-                },
+                () -> 0L,
+                (aggKey, value, aggregate) -> (long) value + aggregate,
                 Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("win-sum").withValueSerde(longSerde))
-            .toStream(new Unwindow<String, Long>())
+            .toStream(new Unwindow<>())
             .to("sum", Produced.with(stringSerde, longSerde));
 
         final Consumed<String, Long> stringLongConsumed = Consumed.with(stringSerde, longSerde);
         final KTable<String, Long> sumTable = builder.table("sum", stringLongConsumed);
-        sumTable.toStream().process(SmokeTestUtil.printProcessorSupplier("sum"));
+        sumTable.toStream().process(SmokeTestUtil.printProcessorSupplier("sum", name));
 
         // cnt
         groupedData
             .windowedBy(TimeWindows.of(Duration.ofDays(2)))
-            .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("uwin-cnt"))
-            .toStream(new Unwindow<String, Long>())
+            .count(Materialized.as("uwin-cnt"))
+            .toStream(new Unwindow<>())
             .to("cnt", Produced.with(stringSerde, longSerde));
 
         final KTable<String, Long> cntTable = builder.table(
             "cnt",
             Consumed.with(stringSerde, longSerde),
-            Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("cntStoreName"));
-        cntTable.toStream().process(SmokeTestUtil.printProcessorSupplier("cnt"));
+            Materialized.as("cntStoreName"));
+        cntTable.toStream().process(SmokeTestUtil.printProcessorSupplier("cnt", name));
 
         // dif
         maxTable
             .join(
                 minTable,
-                new ValueJoiner<Integer, Integer, Integer>() {
-                    public Integer apply(final Integer value1, final Integer value2) {
-                        return value1 - value2;
-                    }
-                })
+                (value1, value2) -> value1 - value2)
             .toStream()
             .to("dif", Produced.with(stringSerde, intSerde));
 
@@ -228,33 +204,20 @@ public class SmokeTestClient extends SmokeTestUtil {
         sumTable
             .join(
                 cntTable,
-                new ValueJoiner<Long, Long, Double>() {
-                    public Double apply(final Long value1, final Long value2) {
-                        return (double) value1 / (double) value2;
-                    }
-                })
+                (value1, value2) -> (double) value1 / (double) value2)
             .toStream()
             .to("avg", Produced.with(stringSerde, doubleSerde));
 
         // test repartition
         final Agg agg = new Agg();
         cntTable.groupBy(agg.selector(), Grouped.with(stringSerde, longSerde))
-            .aggregate(agg.init(), agg.adder(), agg.remover(),
-                Materialized.<String, Long>as(Stores.inMemoryKeyValueStore("cntByCnt"))
-                    .withKeySerde(Serdes.String())
-                    .withValueSerde(Serdes.Long()))
-            .toStream()
-            .to("tagg", Produced.with(stringSerde, longSerde));
-
-        final KafkaStreams streamsClient = new KafkaStreams(builder.build(), getStreamsConfig(props));
-        streamsClient.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
-            @Override
-            public void uncaughtException(final Thread t, final Throwable e) {
-                System.out.println("FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
-                streamsClient.close(Duration.ofSeconds(30));
-            }
-        });
-
-        return streamsClient;
+                .aggregate(agg.init(), agg.adder(), agg.remover(),
+                           Materialized.<String, Long>as(Stores.inMemoryKeyValueStore("cntByCnt"))
+                               .withKeySerde(Serdes.String())
+                               .withValueSerde(Serdes.Long()))
+                .toStream()
+                .to("tagg", Produced.with(stringSerde, longSerde));
+
+        return builder.build();
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index 078cbe4..de40818 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -28,29 +28,51 @@ import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.test.TestUtils;
 
-import java.io.File;
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
 import java.time.Duration;
+import java.time.Instant;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
-public class SmokeTestDriver extends SmokeTestUtil {
+import static java.util.Collections.emptyMap;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
 
-    private static final int MAX_RECORD_EMPTY_RETRIES = 60;
+public class SmokeTestDriver extends SmokeTestUtil {
+    private static final String[] TOPICS = new String[] {
+        "data",
+        "echo",
+        "max",
+        "min", "min-suppressed", "min-raw",
+        "dif",
+        "sum",
+        "cnt",
+        "avg",
+        "tagg"
+    };
+
+    private static final int MAX_RECORD_EMPTY_RETRIES = 30;
 
     private static class ValueList {
         public final String key;
@@ -77,69 +99,8 @@ public class SmokeTestDriver extends SmokeTestUtil {
         }
     }
 
-    // This main() is not used by the system test. It is intended to be used for local debugging.
-    public static void main(final String[] args) throws InterruptedException {
-        final String kafka = "localhost:9092";
-        final File stateDir = TestUtils.tempDirectory();
-
-        final int numKeys = 20;
-        final int maxRecordsPerKey = 1000;
-
-        final Thread driver = new Thread(() -> {
-            try {
-                final Map<String, Set<Integer>> allData = generate(kafka, numKeys, maxRecordsPerKey);
-                verify(kafka, allData, maxRecordsPerKey);
-            } catch (final Exception ex) {
-                ex.printStackTrace();
-            }
-        });
-
-        final Properties props = new Properties();
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
-        props.put(StreamsConfig.STATE_DIR_CONFIG, createDir(stateDir, "1").getAbsolutePath());
-        final SmokeTestClient streams1 = new SmokeTestClient(props);
-        props.put(StreamsConfig.STATE_DIR_CONFIG, createDir(stateDir, "2").getAbsolutePath());
-        final SmokeTestClient streams2 = new SmokeTestClient(props);
-        props.put(StreamsConfig.STATE_DIR_CONFIG, createDir(stateDir, "3").getAbsolutePath());
-        final SmokeTestClient streams3 = new SmokeTestClient(props);
-        props.put(StreamsConfig.STATE_DIR_CONFIG, createDir(stateDir, "4").getAbsolutePath());
-        final SmokeTestClient streams4 = new SmokeTestClient(props);
-
-        System.out.println("starting the driver");
-        driver.start();
-
-        System.out.println("starting the first and second client");
-        streams1.start();
-        streams2.start();
-
-        sleep(10000);
-
-        System.out.println("starting the third client");
-        streams3.start();
-
-        System.out.println("closing the first client");
-        streams1.close();
-        System.out.println("closed the first client");
-
-        sleep(10000);
-
-        System.out.println("starting the forth client");
-        streams4.start();
-
-        driver.join();
-
-        System.out.println("driver stopped");
-        streams2.close();
-        streams3.close();
-        streams4.close();
-
-        System.out.println("shutdown");
-    }
-
-    public static Map<String, Set<Integer>> generate(final String kafka,
-                                                     final int numKeys,
-                                                     final int maxRecordsPerKey) {
-        return generate(kafka, numKeys, maxRecordsPerKey, true);
+    public static String[] topics() {
+        return Arrays.copyOf(TOPICS, TOPICS.length);
     }
 
     public static Map<String, Set<Integer>> generate(final String kafka,
@@ -183,7 +144,11 @@ public class SmokeTestDriver extends SmokeTestUtil {
             } else {
 
                 final ProducerRecord<byte[], byte[]> record =
-                    new ProducerRecord<>("data", stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value));
+                    new ProducerRecord<>(
+                        "data",
+                        stringSerde.serializer().serialize("", key),
+                        intSerde.serializer().serialize("", value)
+                    );
 
                 producer.send(record, new TestCallback(record, needRetry));
 
@@ -201,6 +166,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
         while (!needRetry.isEmpty()) {
             final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
             for (final ProducerRecord<byte[], byte[]> record : needRetry) {
+                System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key()));
                 producer.send(record, new TestCallback(record, needRetry2));
             }
             producer.flush();
@@ -239,7 +205,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
         }
     }
 
-    private static void shuffle(final int[] data, final int windowSize) {
+    private static void shuffle(final int[] data, @SuppressWarnings("SameParameterValue") final int windowSize) {
         final Random rand = new Random();
         for (int i = 0; i < data.length; i++) {
             // we shuffle data within windowSize
@@ -252,93 +218,104 @@ public class SmokeTestDriver extends SmokeTestUtil {
         }
     }
 
-    public static void verify(final String kafka, final Map<String, Set<Integer>> allData, final int maxRecordsPerKey) {
+    public static class NumberDeserializer implements Deserializer<Number> {
+
+        @Override
+        public void configure(final Map<String, ?> configs, final boolean isKey) {
+
+        }
+
+        @Override
+        public Number deserialize(final String topic, final byte[] data) {
+            final Number value;
+            switch (topic) {
+                case "data":
+                case "echo":
+                case "min":
+                case "min-raw":
+                case "min-suppressed":
+                case "max":
+                case "dif":
+                    value = intSerde.deserializer().deserialize(topic, data);
+                    break;
+                case "sum":
+                case "cnt":
+                case "tagg":
+                    value = longSerde.deserializer().deserialize(topic, data);
+                    break;
+                case "avg":
+                    value = doubleSerde.deserializer().deserialize(topic, data);
+                    break;
+                default:
+                    throw new RuntimeException("unknown topic: " + topic);
+            }
+            return value;
+        }
+
+        @Override
+        public void close() {
+
+        }
+    }
+
+    public static VerificationResult verify(final String kafka,
+                                            final Map<String, Set<Integer>> inputs,
+                                            final int maxRecordsPerKey) {
         final Properties props = new Properties();
         props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NumberDeserializer.class);
 
-        final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
-        final List<TopicPartition> partitions = getAllPartitions(consumer, "echo", "max", "min", "dif", "sum", "cnt", "avg", "wcnt", "tagg");
+        final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
+        final List<TopicPartition> partitions = getAllPartitions(consumer, TOPICS);
         consumer.assign(partitions);
         consumer.seekToBeginning(partitions);
 
-        final int recordsGenerated = allData.size() * maxRecordsPerKey;
+        final int recordsGenerated = inputs.size() * maxRecordsPerKey;
         int recordsProcessed = 0;
+        final Map<String, AtomicInteger> processed =
+            Stream.of(TOPICS)
+                  .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));
 
-        final HashMap<String, Integer> max = new HashMap<>();
-        final HashMap<String, Integer> min = new HashMap<>();
-        final HashMap<String, Integer> dif = new HashMap<>();
-        final HashMap<String, Long> sum = new HashMap<>();
-        final HashMap<String, Long> cnt = new HashMap<>();
-        final HashMap<String, Double> avg = new HashMap<>();
-        final HashMap<String, Long> wcnt = new HashMap<>();
-        final HashMap<String, Long> tagg = new HashMap<>();
-
-        final HashSet<String> keys = new HashSet<>();
-        final HashMap<String, Set<Integer>> received = new HashMap<>();
-        for (final String key : allData.keySet()) {
-            keys.add(key);
-            received.put(key, new HashSet<Integer>());
-        }
+        final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();
+
+        VerificationResult verificationResult = new VerificationResult(false, "no results yet");
         int retry = 0;
         final long start = System.currentTimeMillis();
         while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) {
-            final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(500));
+            final ConsumerRecords<String, Number> records = consumer.poll(Duration.ofSeconds(1));
             if (records.isEmpty() && recordsProcessed >= recordsGenerated) {
-                if (verifyMin(min, allData, false)
-                    && verifyMax(max, allData, false)
-                    && verifyDif(dif, allData, false)
-                    && verifySum(sum, allData, false)
-                    && verifyCnt(cnt, allData, false)
-                    && verifyAvg(avg, allData, false)
-                    && verifyTAgg(tagg, allData, false)) {
+                verificationResult = verifyAll(inputs, events);
+                if (verificationResult.passed()) {
                     break;
-                }
-                if (retry++ > MAX_RECORD_EMPTY_RETRIES) {
+                } else if (retry++ > MAX_RECORD_EMPTY_RETRIES) {
+                    System.out.println(Instant.now() + " Didn't get any more results, verification hasn't passed, and out of retries.");
                     break;
+                } else {
+                    System.out.println(Instant.now() + " Didn't get any more results, but verification hasn't passed (yet). Retrying...");
                 }
             } else {
-                for (final ConsumerRecord<byte[], byte[]> record : records) {
-                    final String key = stringSerde.deserializer().deserialize("", record.key());
-                    switch (record.topic()) {
-                        case "echo":
-                            final Integer value = intSerde.deserializer().deserialize("", record.value());
-                            recordsProcessed++;
-                            if (recordsProcessed % 100 == 0) {
-                                System.out.println("Echo records processed = " + recordsProcessed);
-                            }
-                            received.get(key).add(value);
-                            break;
-                        case "min":
-                            min.put(key, intSerde.deserializer().deserialize("", record.value()));
-                            break;
-                        case "max":
-                            max.put(key, intSerde.deserializer().deserialize("", record.value()));
-                            break;
-                        case "dif":
-                            dif.put(key, intSerde.deserializer().deserialize("", record.value()));
-                            break;
-                        case "sum":
-                            sum.put(key, longSerde.deserializer().deserialize("", record.value()));
-                            break;
-                        case "cnt":
-                            cnt.put(key, longSerde.deserializer().deserialize("", record.value()));
-                            break;
-                        case "avg":
-                            avg.put(key, doubleSerde.deserializer().deserialize("", record.value()));
-                            break;
-                        case "wcnt":
-                            wcnt.put(key, longSerde.deserializer().deserialize("", record.value()));
-                            break;
-                        case "tagg":
-                            tagg.put(key, longSerde.deserializer().deserialize("", record.value()));
-                            break;
-                        default:
-                            System.out.println("unknown topic: " + record.topic());
+                retry = 0;
+                for (final ConsumerRecord<String, Number> record : records) {
+                    final String key = record.key();
+
+                    final String topic = record.topic();
+                    processed.get(topic).incrementAndGet();
+
+                    if (topic.equals("echo")) {
+                        recordsProcessed++;
+                        if (recordsProcessed % 100 == 0) {
+                            System.out.println("Echo records processed = " + recordsProcessed);
+                        }
                     }
+
+                    events.computeIfAbsent(topic, t -> new HashMap<>())
+                          .computeIfAbsent(key, k -> new LinkedList<>())
+                          .add(record);
                 }
+
+                System.out.println(processed);
             }
         }
         consumer.close();
@@ -357,256 +334,167 @@ public class SmokeTestDriver extends SmokeTestUtil {
         }
 
         boolean success;
-        success = allData.equals(received);
+
+        final Map<String, Set<Number>> received =
+            events.get("echo")
+                  .entrySet()
+                  .stream()
+                  .map(entry -> mkEntry(
+                      entry.getKey(),
+                      entry.getValue().stream().map(ConsumerRecord::value).collect(Collectors.toSet()))
+                  )
+                  .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+        success = inputs.equals(received);
 
         if (success) {
             System.out.println("ALL-RECORDS-DELIVERED");
         } else {
             int missedCount = 0;
-            for (final Map.Entry<String, Set<Integer>> entry : allData.entrySet()) {
+            for (final Map.Entry<String, Set<Integer>> entry : inputs.entrySet()) {
                 missedCount += received.get(entry.getKey()).size();
             }
             System.out.println("missedRecords=" + missedCount);
         }
 
-        success &= verifyMin(min, allData, true);
-        success &= verifyMax(max, allData, true);
-        success &= verifyDif(dif, allData, true);
-        success &= verifySum(sum, allData, true);
-        success &= verifyCnt(cnt, allData, true);
-        success &= verifyAvg(avg, allData, true);
-        success &= verifyTAgg(tagg, allData, true);
+        // give it one more try if it's not already passing.
+        if (!verificationResult.passed()) {
+            verificationResult = verifyAll(inputs, events);
+        }
+        success &= verificationResult.passed();
+
+        System.out.println(verificationResult.result());
 
         System.out.println(success ? "SUCCESS" : "FAILURE");
+        return verificationResult;
     }
 
-    private static boolean verifyMin(final Map<String, Integer> map, final Map<String, Set<Integer>> allData, final boolean print) {
-        if (map.isEmpty()) {
-            if (print) {
-                System.out.println("min is empty");
-            }
-            return false;
-        } else {
-            if (print) {
-                System.out.println("verifying min");
-            }
+    public static class VerificationResult {
+        private final boolean passed;
+        private final String result;
 
-            if (map.size() != allData.size()) {
-                if (print) {
-                    System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size());
-                }
-                return false;
-            }
-            for (final Map.Entry<String, Integer> entry : map.entrySet()) {
-                final int expected = getMin(entry.getKey());
-                if (expected != entry.getValue()) {
-                    if (print) {
-                        System.out.println("fail: key=" + entry.getKey() + " min=" + entry.getValue() + " expected=" + expected);
-                    }
-                    return false;
-                }
-            }
+        VerificationResult(final boolean passed, final String result) {
+            this.passed = passed;
+            this.result = result;
         }
-        return true;
-    }
 
-    private static boolean verifyMax(final Map<String, Integer> map, final Map<String, Set<Integer>> allData, final boolean print) {
-        if (map.isEmpty()) {
-            if (print) {
-                System.out.println("max is empty");
-            }
-            return false;
-        } else {
-            if (print) {
-                System.out.println("verifying max");
-            }
+        public boolean passed() {
+            return passed;
+        }
 
-            if (map.size() != allData.size()) {
-                if (print) {
-                    System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size());
-                }
-                return false;
-            }
-            for (final Map.Entry<String, Integer> entry : map.entrySet()) {
-                final int expected = getMax(entry.getKey());
-                if (expected != entry.getValue()) {
-                    if (print) {
-                        System.out.println("fail: key=" + entry.getKey() + " max=" + entry.getValue() + " expected=" + expected);
-                    }
-                    return false;
-                }
-            }
+        public String result() {
+            return result;
         }
-        return true;
     }
 
-    private static boolean verifyDif(final Map<String, Integer> map, final Map<String, Set<Integer>> allData, final boolean print) {
-        if (map.isEmpty()) {
-            if (print) {
-                System.out.println("dif is empty");
-            }
-            return false;
-        } else {
-            if (print) {
-                System.out.println("verifying dif");
-            }
-
-            if (map.size() != allData.size()) {
-                if (print) {
-                    System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size());
-                }
-                return false;
-            }
-            for (final Map.Entry<String, Integer> entry : map.entrySet()) {
-                final int min = getMin(entry.getKey());
-                final int max = getMax(entry.getKey());
-                final int expected = max - min;
-                if (entry.getValue() == null || expected != entry.getValue()) {
-                    if (print) {
-                        System.out.println("fail: key=" + entry.getKey() + " dif=" + entry.getValue() + " expected=" + expected);
-                    }
-                    return false;
-                }
-            }
+    private static VerificationResult verifyAll(final Map<String, Set<Integer>> inputs,
+                                                final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events) {
+        final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+        boolean pass;
+        try (final PrintStream resultStream = new PrintStream(byteArrayOutputStream)) {
+            pass = verifyTAgg(resultStream, inputs, events.get("tagg"));
+            pass &= verify(resultStream, "min", inputs, events, SmokeTestDriver::getMin);
+            pass &= verify(resultStream, "max", inputs, events, SmokeTestDriver::getMax);
+            pass &= verify(resultStream, "dif", inputs, events, key -> getMax(key).intValue() - getMin(key).intValue());
+            pass &= verify(resultStream, "sum", inputs, events, SmokeTestDriver::getSum);
+            pass &= verify(resultStream, "cnt", inputs, events, key1 -> getMax(key1).intValue() - getMin(key1).intValue() + 1L);
+            pass &= verify(resultStream, "avg", inputs, events, SmokeTestDriver::getAvg);
         }
-        return true;
+        return new VerificationResult(pass, new String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8));
     }
 
-    private static boolean verifyCnt(final Map<String, Long> map, final Map<String, Set<Integer>> allData, final boolean print) {
-        if (map.isEmpty()) {
-            if (print) {
-                System.out.println("cnt is empty");
-            }
+    private static boolean verify(final PrintStream resultStream,
+                                  final String topic,
+                                  final Map<String, Set<Integer>> inputData,
+                                  final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events,
+                                  final Function<String, Number> keyToExpectation) {
+        final Map<String, LinkedList<ConsumerRecord<String, Number>>> observedInputEvents = events.get("data");
+        final Map<String, LinkedList<ConsumerRecord<String, Number>>> outputEvents = events.getOrDefault(topic, emptyMap());
+        if (outputEvents.isEmpty()) {
+            resultStream.println(topic + " is empty");
             return false;
         } else {
-            if (print) {
-                System.out.println("verifying cnt");
-            }
+            resultStream.printf("verifying %s with %d keys%n", topic, outputEvents.size());
 
-            if (map.size() != allData.size()) {
-                if (print) {
-                    System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size());
-                }
+            if (outputEvents.size() != inputData.size()) {
+                resultStream.printf("fail: resultCount=%d expectedCount=%s%n\tresult=%s%n\texpected=%s%n",
+                                    outputEvents.size(), inputData.size(), outputEvents.keySet(), inputData.keySet());
                 return false;
             }
-            for (final Map.Entry<String, Long> entry : map.entrySet()) {
-                final int min = getMin(entry.getKey());
-                final int max = getMax(entry.getKey());
-                final long expected = (max - min) + 1L;
-                if (expected != entry.getValue()) {
-                    if (print) {
-                        System.out.println("fail: key=" + entry.getKey() + " cnt=" + entry.getValue() + " expected=" + expected);
-                    }
+            for (final Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : outputEvents.entrySet()) {
+                final String key = entry.getKey();
+                final Number expected = keyToExpectation.apply(key);
+                final Number actual = entry.getValue().getLast().value();
+                if (!expected.equals(actual)) {
+                    resultStream.printf("%s fail: key=%s actual=%s expected=%s%n\t inputEvents=%n%s%n\toutputEvents=%n%s%n",
+                                        topic,
+                                        key,
+                                        actual,
+                                        expected,
+                                        indent("\t\t", observedInputEvents.get(key)),
+                                        indent("\t\t", entry.getValue()));
                     return false;
                 }
             }
+            return true;
         }
-        return true;
     }
 
-    private static boolean verifySum(final Map<String, Long> map, final Map<String, Set<Integer>> allData, final boolean print) {
-        if (map.isEmpty()) {
-            if (print) {
-                System.out.println("sum is empty");
-            }
-            return false;
-        } else {
-            if (print) {
-                System.out.println("verifying sum");
-            }
-
-            if (map.size() != allData.size()) {
-                if (print) {
-                    System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size());
-                }
-                return false;
-            }
-            for (final Map.Entry<String, Long> entry : map.entrySet()) {
-                final int min = getMin(entry.getKey());
-                final int max = getMax(entry.getKey());
-                final long expected = ((long) min + (long) max) * (max - min + 1L) / 2L;
-                if (expected != entry.getValue()) {
-                    if (print) {
-                        System.out.println("fail: key=" + entry.getKey() + " sum=" + entry.getValue() + " expected=" + expected);
-                    }
-                    return false;
-                }
-            }
+    private static String indent(@SuppressWarnings("SameParameterValue") final String prefix,
+                                 final LinkedList<ConsumerRecord<String, Number>> list) {
+        final StringBuilder stringBuilder = new StringBuilder();
+        for (final ConsumerRecord<String, Number> record : list) {
+            stringBuilder.append(prefix).append(record).append('\n');
         }
-        return true;
+        return stringBuilder.toString();
     }
 
-    private static boolean verifyAvg(final Map<String, Double> map, final Map<String, Set<Integer>> allData, final boolean print) {
-        if (map.isEmpty()) {
-            if (print) {
-                System.out.println("avg is empty");
-            }
-            return false;
-        } else {
-            if (print) {
-                System.out.println("verifying avg");
-            }
+    private static Long getSum(final String key) {
+        final int min = getMin(key).intValue();
+        final int max = getMax(key).intValue();
+        return ((long) min + (long) max) * (max - min + 1L) / 2L;
+    }
 
-            if (map.size() != allData.size()) {
-                if (print) {
-                    System.out.println("fail: resultCount=" + map.size() + " expectedCount=" + allData.size());
-                }
-                return false;
-            }
-            for (final Map.Entry<String, Double> entry : map.entrySet()) {
-                final int min = getMin(entry.getKey());
-                final int max = getMax(entry.getKey());
-                final double expected = ((long) min + (long) max) / 2.0;
-
-                if (entry.getValue() == null || expected != entry.getValue()) {
-                    if (print) {
-                        System.out.println("fail: key=" + entry.getKey() + " avg=" + entry.getValue() + " expected=" + expected);
-                    }
-                    return false;
-                }
-            }
-        }
-        return true;
+    private static Double getAvg(final String key) {
+        final int min = getMin(key).intValue();
+        final int max = getMax(key).intValue();
+        return ((long) min + (long) max) / 2.0;
     }
 
 
-    private static boolean verifyTAgg(final Map<String, Long> map, final Map<String, Set<Integer>> allData, final boolean print) {
-        if (map.isEmpty()) {
-            if (print) {
-                System.out.println("tagg is empty");
-            }
+    private static boolean verifyTAgg(final PrintStream resultStream,
+                                      final Map<String, Set<Integer>> allData,
+                                      final Map<String, LinkedList<ConsumerRecord<String, Number>>> taggEvents) {
+        if (taggEvents == null) {
+            resultStream.println("tagg is missing");
+            return false;
+        } else if (taggEvents.isEmpty()) {
+            resultStream.println("tagg is empty");
             return false;
         } else {
-            if (print) {
-                System.out.println("verifying tagg");
-            }
+            resultStream.println("verifying tagg");
 
             // generate expected answer
             final Map<String, Long> expected = new HashMap<>();
             for (final String key : allData.keySet()) {
-                final int min = getMin(key);
-                final int max = getMax(key);
+                final int min = getMin(key).intValue();
+                final int max = getMax(key).intValue();
                 final String cnt = Long.toString(max - min + 1L);
 
-                if (expected.containsKey(cnt)) {
-                    expected.put(cnt, expected.get(cnt) + 1L);
-                } else {
-                    expected.put(cnt, 1L);
-                }
+                expected.put(cnt, expected.getOrDefault(cnt, 0L) + 1);
             }
 
             // check the result
-            for (final Map.Entry<String, Long> entry : map.entrySet()) {
+            for (final Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : taggEvents.entrySet()) {
                 final String key = entry.getKey();
                 Long expectedCount = expected.remove(key);
                 if (expectedCount == null) {
                     expectedCount = 0L;
                 }
 
-                if (entry.getValue().longValue() != expectedCount.longValue()) {
-                    if (print) {
-                        System.out.println("fail: key=" + key + " tagg=" + entry.getValue() + " expected=" + expected.get(key));
-                    }
+                if (entry.getValue().getLast().value().longValue() != expectedCount) {
+                    resultStream.println("fail: key=" + key + " tagg=" + entry.getValue() + " expected=" + expected.get(key));
+                    resultStream.println("\t outputEvents: " + entry.getValue());
                     return false;
                 }
             }
@@ -615,26 +503,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
         return true;
     }
 
-    private static int getMin(final String key) {
+    private static Number getMin(final String key) {
         return Integer.parseInt(key.split("-")[0]);
     }
 
-    private static int getMax(final String key) {
+    private static Number getMax(final String key) {
         return Integer.parseInt(key.split("-")[1]);
     }
 
-    private static int getMinFromWKey(final String key) {
-        return getMin(key.split("@")[0]);
-    }
-
-    private static int getMaxFromWKey(final String key) {
-        return getMax(key.split("@")[0]);
-    }
-
-    private static long getStartFromWKey(final String key) {
-        return Long.parseLong(key.split("@")[1]);
-    }
-
     private static List<TopicPartition> getAllPartitions(final KafkaConsumer<?, ?> consumer, final String... topics) {
         final ArrayList<TopicPartition> partitions = new ArrayList<>();
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
index 9e62e3f..aa58d44 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
@@ -29,16 +29,17 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
 import java.io.File;
+import java.time.Instant;
 
 public class SmokeTestUtil {
 
     final static int END = Integer.MAX_VALUE;
 
     static ProcessorSupplier<Object, Object> printProcessorSupplier(final String topic) {
-        return printProcessorSupplier(topic, false);
+        return printProcessorSupplier(topic, "");
     }
 
-    private static ProcessorSupplier<Object, Object> printProcessorSupplier(final String topic, final boolean printOffset) {
+    static ProcessorSupplier<Object, Object> printProcessorSupplier(final String topic, final String name) {
         return new ProcessorSupplier<Object, Object>() {
             @Override
             public Processor<Object, Object> get() {
@@ -56,7 +57,7 @@ public class SmokeTestUtil {
                     public void process(final Object key, final Object value) {
                         numRecordsProcessed++;
                         if (numRecordsProcessed % 100 == 0) {
-                            System.out.println(System.currentTimeMillis());
+                            System.out.printf("%s: %s%n", name, Instant.now());
                             System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
                         }
                     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
index f34471b..4c2f6d2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.UUID;
 
 public class StreamsSmokeTest {
 
@@ -56,9 +57,6 @@ public class StreamsSmokeTest {
         System.out.println("disableAutoTerminate=" + disableAutoTerminate);
 
         switch (command) {
-            case "standalone":
-                SmokeTestDriver.main(args);
-                break;
             case "run":
                 // this starts the driver (data generation and result verification)
                 final int numKeys = 10;
@@ -66,14 +64,14 @@ public class StreamsSmokeTest {
                 if (disableAutoTerminate) {
                     SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey, false);
                 } else {
-                    final Map<String, Set<Integer>> allData = SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey);
+                    final Map<String, Set<Integer>> allData = SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey, true);
                     SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey);
                 }
                 break;
             case "process":
                 // this starts a KafkaStreams client
-                final SmokeTestClient client = new SmokeTestClient(streamsProperties);
-                client.start();
+                final SmokeTestClient client = new SmokeTestClient(UUID.randomUUID().toString());
+                client.start(streamsProperties);
                 break;
             case "close-deadlock-test":
                 final ShutdownDeadlockTest test = new ShutdownDeadlockTest(kafka);


Mime
View raw message