kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4551: StreamsSmokeTest.test_streams intermittent failure
Date Mon, 09 Jan 2017 19:13:13 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 42a6b7166 -> 52e397962


KAFKA-4551: StreamsSmokeTest.test_streams intermittent failure

Remove use of TestTimestampExtractor as it causes the logs to roll and segments get deleted.
Remove the wcnt example as it is dependent on the TestTimestampExtractor - windowed counting
is covered elsewhere.
Change all aggregate operations to use TimeWindow as use of UnlimitedWindow was causing logs
to roll and segments being deleted.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Matthias J. Sax, Guozhang Wang, Eno Thereska

Closes #2319 from dguy/smoke-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/52e39796
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/52e39796
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/52e39796

Branch: refs/heads/trunk
Commit: 52e397962b624f3c881b6f99e71c94da32cf6a33
Parents: 42a6b71
Author: Damian Guy <damian.guy@gmail.com>
Authored: Mon Jan 9 11:13:09 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Jan 9 11:13:09 2017 -0800

----------------------------------------------------------------------
 .../kafka/streams/kstream/UnlimitedWindows.java |   4 +
 .../streams/smoketest/SmokeTestClient.java      |  25 +-
 .../streams/smoketest/SmokeTestDriver.java      | 318 ++++++++++---------
 .../smoketest/TestTimestampExtractor.java       |  37 ---
 4 files changed, 178 insertions(+), 206 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/52e39796/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
index 92f9ee9..3dc6f65 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
@@ -93,4 +93,8 @@ public class UnlimitedWindows extends Windows<UnlimitedWindow> {
         return (int) (start ^ (start >>> 32));
     }
 
+    @Override
+    public long maintainMs() {
+        return Long.MAX_VALUE;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/52e39796/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
index f920c51..0e36c45 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.smoketest;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
@@ -27,15 +26,13 @@ import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.TimeWindows;
-import org.apache.kafka.streams.kstream.UnlimitedWindows;
 import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.kstream.Windowed;
 
 import java.io.File;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 public class SmokeTestClient extends SmokeTestUtil {
 
@@ -84,7 +81,6 @@ public class SmokeTestClient extends SmokeTestUtil {
         props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
-        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TestTimestampExtractor.class.getName());
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
         props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
         props.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100);
@@ -124,7 +120,7 @@ public class SmokeTestClient extends SmokeTestUtil {
                         return (value < aggregate) ? value : aggregate;
                     }
                 },
-                UnlimitedWindows.of(),
+                TimeWindows.of(TimeUnit.DAYS.toMillis(1)),
                 intSerde, "uwin-min"
         ).toStream().map(
                 new Unwindow<String, Integer>()
@@ -146,7 +142,7 @@ public class SmokeTestClient extends SmokeTestUtil {
                         return (value > aggregate) ? value : aggregate;
                     }
                 },
-                UnlimitedWindows.of(),
+                TimeWindows.of(TimeUnit.DAYS.toMillis(2)),
                 intSerde, "uwin-max"
         ).toStream().map(
                 new Unwindow<String, Integer>()
@@ -168,7 +164,7 @@ public class SmokeTestClient extends SmokeTestUtil {
                         return (long) value + aggregate;
                     }
                 },
-                UnlimitedWindows.of(),
+                TimeWindows.of(TimeUnit.DAYS.toMillis(2)),
                 longSerde, "win-sum"
         ).toStream().map(
                 new Unwindow<String, Long>()
@@ -179,7 +175,7 @@ public class SmokeTestClient extends SmokeTestUtil {
         sumTable.toStream().process(SmokeTestUtil.<Long>printProcessorSupplier("sum"));
 
         // cnt
-        groupedData.count(UnlimitedWindows.of(), "uwin-cnt")
+        groupedData.count(TimeWindows.of(TimeUnit.DAYS.toMillis(2)), "uwin-cnt")
             .toStream().map(
                 new Unwindow<String, Long>()
         ).to(stringSerde, longSerde, "cnt");
@@ -206,17 +202,6 @@ public class SmokeTestClient extends SmokeTestUtil {
                 }
         ).to(stringSerde, doubleSerde, "avg");
 
-        // windowed count
-        groupedData.count(TimeWindows.of(WINDOW_SIZE), "tumbling-win-cnt")
-            .toStream().map(
-                new KeyValueMapper<Windowed<String>, Long, KeyValue<String, Long>>()
{
-                    @Override
-                    public KeyValue<String, Long> apply(Windowed<String> key,
Long value) {
-                        return new KeyValue<>(key.key() + "@" + key.window().start(),
value);
-                    }
-                }
-        ).to(stringSerde, longSerde, "wcnt");
-
         // test repartition
         Agg agg = new Agg();
         cntTable.groupBy(agg.selector(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/52e39796/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
index f9d30d5..2ce7e1a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
@@ -21,13 +21,17 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
 
 import java.io.File;
 import java.util.ArrayList;
@@ -39,9 +43,12 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 public class SmokeTestDriver extends SmokeTestUtil {
 
+    public static final int MAX_RECORD_EMPTY_RETRIES = 60;
+
     private static class ValueList {
         public final String key;
         private final int[] values;
@@ -71,10 +78,10 @@ public class SmokeTestDriver extends SmokeTestUtil {
     public static void main(String[] args) throws Exception {
         final String kafka = "localhost:9092";
         final String zookeeper = "localhost:2181";
-        final File stateDir = createDir("/tmp/kafka-streams-smoketest");
+        final File stateDir = TestUtils.tempDirectory();
 
-        final int numKeys = 10;
-        final int maxRecordsPerKey = 500;
+        final int numKeys = 20;
+        final int maxRecordsPerKey = 1000;
 
         Thread driver = new Thread() {
             public void run() {
@@ -116,7 +123,6 @@ public class SmokeTestDriver extends SmokeTestUtil {
         driver.join();
 
         System.out.println("driver stopped");
-
         streams2.close();
         streams3.close();
         streams4.close();
@@ -153,27 +159,31 @@ public class SmokeTestDriver extends SmokeTestUtil {
             if (value < 0) {
                 remaining--;
                 data[index] = data[remaining];
-                value = END;
-            }
+            } else {
 
-            ProducerRecord<byte[], byte[]> record =
-                    new ProducerRecord<>("data", stringSerde.serializer().serialize("",
key), intSerde.serializer().serialize("", value));
+                ProducerRecord<byte[], byte[]> record =
+                        new ProducerRecord<>("data", stringSerde.serializer().serialize("",
key), intSerde.serializer().serialize("", value));
+
+                producer.send(record, new Callback() {
+                    @Override
+                    public void onCompletion(final RecordMetadata metadata, final Exception
exception) {
+                        if (exception != null) {
+                            exception.printStackTrace();
+                            System.exit(-1);
+                        }
+                    }
+                });
 
-            producer.send(record);
 
-            if (value != END) {
                 numRecordsProduced++;
                 allData.get(key).add(value);
-
                 if (numRecordsProduced % 100 == 0)
                     System.out.println(numRecordsProduced + " records produced");
+                Utils.sleep(2);
 
-                Thread.sleep(10);
             }
         }
-
         producer.close();
-
         return Collections.unmodifiableMap(allData);
     }
 
@@ -220,33 +230,34 @@ public class SmokeTestDriver extends SmokeTestUtil {
             keys.add(key);
             received.put(key, new HashSet<Integer>());
         }
-
-        int retryCount = 0;
-        int maxRetry = 360; // max three minutes (500ms * 360) (before we reach the end of
records)
-
-        while (true) {
+        int retry = 0;
+        final long start = System.currentTimeMillis();
+        while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(3)) {
             ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
-            if (records.isEmpty()) {
-                retryCount++;
-                if (retryCount > maxRetry) break;
+            if (records.isEmpty() && recordsProcessed >= recordsGenerated) {
+                if (verifyMin(min, allData, false)
+                    && verifyMax(max, allData, false)
+                    && verifyDif(dif, allData, false)
+                    && verifySum(sum, allData, false)
+                    && verifyCnt(cnt, allData, false)
+                    && verifyAvg(avg, allData, false)
+                    && verifyTAgg(tagg, allData, false)) {
+                    break;
+                }
+                if (retry++ > MAX_RECORD_EMPTY_RETRIES) {
+                    break;
+                }
             } else {
-                retryCount = 0;
-
                 for (ConsumerRecord<byte[], byte[]> record : records) {
                     String key = stringSerde.deserializer().deserialize("", record.key());
                     switch (record.topic()) {
                         case "echo":
                             Integer value = intSerde.deserializer().deserialize("", record.value());
-                            if (value != null && value == END) {
-                                keys.remove(key);
-                                if (keys.isEmpty()) {
-                                    // we reached the end of records, set retry to 120 (max
60 seconds)
-                                    maxRetry = 120;
-                                }
-                            } else {
-                                recordsProcessed++;
-                                received.get(key).add(value);
+                            recordsProcessed++;
+                            if (recordsProcessed % 100 == 0) {
+                                System.out.println("Echo records processed = " + recordsProcessed);
                             }
+                            received.get(key).add(value);
                             break;
                         case "min":
                             min.put(key, intSerde.deserializer().deserialize("", record.value()));
@@ -279,7 +290,8 @@ public class SmokeTestDriver extends SmokeTestUtil {
             }
         }
         consumer.close();
-
+        final long finished = System.currentTimeMillis() - start;
+        System.out.println("Verification time=" + finished);
         System.out.println("-------------------");
         System.out.println("Result Verification");
         System.out.println("-------------------");
@@ -305,150 +317,189 @@ public class SmokeTestDriver extends SmokeTestUtil {
             System.out.println("missedRecords=" + missedCount);
         }
 
-        success &= verifyMin(min, allData);
-        success &= verifyMax(max, allData);
-        success &= verifyDif(dif, allData);
-        success &= verifySum(sum, allData);
-        success &= verifyCnt(cnt, allData);
-        success &= verifyAvg(avg, allData);
-        success &= verifyWCnt(wcnt, allData);
-        success &= verifyTAgg(tagg, allData);
+        success &= verifyMin(min, allData, true);
+        success &= verifyMax(max, allData, true);
+        success &= verifyDif(dif, allData, true);
+        success &= verifySum(sum, allData, true);
+        success &= verifyCnt(cnt, allData, true);
+        success &= verifyAvg(avg, allData, true);
+        success &= verifyTAgg(tagg, allData, true);
 
         System.out.println(success ? "SUCCESS" : "FAILURE");
     }
 
-    private static boolean verifyMin(Map<String, Integer> map, Map<String, Set<Integer>>
allData) {
-        boolean success = true;
+    private static boolean verifyMin(Map<String, Integer> map, Map<String, Set<Integer>>
allData, final boolean print) {
         if (map.isEmpty()) {
-            System.out.println("min is empty");
-            success = false;
+            if (print) {
+                System.out.println("min is empty");
+            }
+            return false;
         } else {
-            System.out.println("verifying min");
+            if (print) {
+                System.out.println("verifying min");
+            }
 
             if (map.size() != allData.size()) {
-                System.out.println("fail: resultCount=" + map.size() + " expectedCount="
+ allData.size());
-                success = false;
+                if (print) {
+                    System.out.println("fail: resultCount=" + map.size() + " expectedCount="
+ allData.size());
+                }
+                return false;
             }
             for (Map.Entry<String, Integer> entry : map.entrySet()) {
                 int expected = getMin(entry.getKey());
                 if (expected != entry.getValue()) {
-                    System.out.println("fail: key=" + entry.getKey() + " min=" + entry.getValue()
+ " expected=" + expected);
-                    success = false;
+                    if (print) {
+                        System.out.println("fail: key=" + entry.getKey() + " min=" + entry.getValue()
+ " expected=" + expected);
+                    }
+                    return false;
                 }
             }
         }
-        return success;
+        return true;
     }
 
-    private static boolean verifyMax(Map<String, Integer> map, Map<String, Set<Integer>>
allData) {
-        boolean success = true;
+    private static boolean verifyMax(Map<String, Integer> map, Map<String, Set<Integer>>
allData, final boolean print) {
         if (map.isEmpty()) {
-            System.out.println("max is empty");
-            success = false;
+            if (print) {
+                System.out.println("max is empty");
+            }
+            return false;
         } else {
-            System.out.println("verifying max");
+            if (print) {
+                System.out.println("verifying max");
+            }
 
             if (map.size() != allData.size()) {
-                System.out.println("fail: resultCount=" + map.size() + " expectedCount="
+ allData.size());
-                success = false;
+                if (print) {
+                    System.out.println("fail: resultCount=" + map.size() + " expectedCount="
+ allData.size());
+                }
+                return false;
             }
             for (Map.Entry<String, Integer> entry : map.entrySet()) {
                 int expected = getMax(entry.getKey());
                 if (expected != entry.getValue()) {
-                    System.out.println("fail: key=" + entry.getKey() + " max=" + entry.getValue()
+ " expected=" + expected);
-                    success = false;
+                    if (print) {
+                        System.out.println("fail: key=" + entry.getKey() + " max=" + entry.getValue()
+ " expected=" + expected);
+                    }
+                    return false;
                 }
             }
         }
-        return success;
+        return true;
     }
 
-    private static boolean verifyDif(Map<String, Integer> map, Map<String, Set<Integer>>
allData) {
-        boolean success = true;
+    private static boolean verifyDif(Map<String, Integer> map, Map<String, Set<Integer>>
allData, final boolean print) {
         if (map.isEmpty()) {
-            System.out.println("dif is empty");
-            success = false;
+            if (print) {
+                System.out.println("dif is empty");
+            }
+            return false;
         } else {
-            System.out.println("verifying dif");
+            if (print) {
+                System.out.println("verifying dif");
+            }
 
             if (map.size() != allData.size()) {
-                System.out.println("fail: resultCount=" + map.size() + " expectedCount="
+ allData.size());
-                success = false;
+                if (print) {
+                    System.out.println("fail: resultCount=" + map.size() + " expectedCount="
+ allData.size());
+                }
+                return false;
             }
             for (Map.Entry<String, Integer> entry : map.entrySet()) {
                 int min = getMin(entry.getKey());
                 int max = getMax(entry.getKey());
                 int expected = max - min;
                 if (entry.getValue() == null || expected != entry.getValue()) {
-                    System.out.println("fail: key=" + entry.getKey() + " dif=" + entry.getValue()
+ " expected=" + expected);
-                    success = false;
+                    if (print) {
+                        System.out.println("fail: key=" + entry.getKey() + " dif=" + entry.getValue()
+ " expected=" + expected);
+                    }
+                    return false;
                 }
             }
         }
-        return success;
+        return true;
     }
 
-    private static boolean verifyCnt(Map<String, Long> map, Map<String, Set<Integer>>
allData) {
-        boolean success = true;
+    private static boolean verifyCnt(Map<String, Long> map, Map<String, Set<Integer>>
allData, final boolean print) {
         if (map.isEmpty()) {
-            System.out.println("cnt is empty");
-            success = false;
+            if (print) {
+                System.out.println("cnt is empty");
+            }
+            return false;
         } else {
-            System.out.println("verifying cnt");
+            if (print) {
+                System.out.println("verifying cnt");
+            }
 
             if (map.size() != allData.size()) {
-                System.out.println("fail: resultCount=" + map.size() + " expectedCount="
+ allData.size());
-                success = false;
+                if (print) {
+                    System.out.println("fail: resultCount=" + map.size() + " expectedCount="
+ allData.size());
+                }
+                return false;
             }
             for (Map.Entry<String, Long> entry : map.entrySet()) {
                 int min = getMin(entry.getKey());
                 int max = getMax(entry.getKey());
                 long expected = (max - min) + 1L;
                 if (expected != entry.getValue()) {
-                    System.out.println("fail: key=" + entry.getKey() + " cnt=" + entry.getValue()
+ " expected=" + expected);
-                    success = false;
+                    if (print) {
+                        System.out.println("fail: key=" + entry.getKey() + " cnt=" + entry.getValue()
+ " expected=" + expected);
+                    }
+                    return false;
                 }
             }
         }
-        return success;
+        return true;
     }
 
-    private static boolean verifySum(Map<String, Long> map, Map<String, Set<Integer>>
allData) {
-        boolean success = true;
+    private static boolean verifySum(Map<String, Long> map, Map<String, Set<Integer>>
allData, final boolean print) {
         if (map.isEmpty()) {
-            System.out.println("sum is empty");
-            success = false;
+            if (print) {
+                System.out.println("sum is empty");
+            }
+            return false;
         } else {
-            System.out.println("verifying sum");
+            if (print) {
+                System.out.println("verifying sum");
+            }
 
             if (map.size() != allData.size()) {
-                System.out.println("fail: resultCount=" + map.size() + " expectedCount="
+ allData.size());
-                success = false;
+                if (print) {
+                    System.out.println("fail: resultCount=" + map.size() + " expectedCount="
+ allData.size());
+                }
+                return false;
             }
             for (Map.Entry<String, Long> entry : map.entrySet()) {
                 int min = getMin(entry.getKey());
                 int max = getMax(entry.getKey());
                 long expected = ((long) min + (long) max) * (max - min + 1L) / 2L;
                 if (expected != entry.getValue()) {
-                    System.out.println("fail: key=" + entry.getKey() + " sum=" + entry.getValue()
+ " expected=" + expected);
-                    success = false;
+                    if (print) {
+                        System.out.println("fail: key=" + entry.getKey() + " sum=" + entry.getValue()
+ " expected=" + expected);
+                    }
+                    return false;
                 }
             }
         }
-        return success;
+        return true;
     }
 
-    private static boolean verifyAvg(Map<String, Double> map, Map<String, Set<Integer>>
allData) {
-        boolean success = true;
+    private static boolean verifyAvg(Map<String, Double> map, Map<String, Set<Integer>>
allData, final boolean print) {
         if (map.isEmpty()) {
-            System.out.println("avg is empty");
-            success = false;
+            if (print) {
+                System.out.println("avg is empty");
+            }
+            return false;
         } else {
-            System.out.println("verifying avg");
+            if (print) {
+                System.out.println("verifying avg");
+            }
 
             if (map.size() != allData.size()) {
-                System.out.println("fail: resultCount=" + map.size() + " expectedCount="
+ allData.size());
-                success = false;
+                if (print) {
+                    System.out.println("fail: resultCount=" + map.size() + " expectedCount="
+ allData.size());
+                }
+                return false;
             }
             for (Map.Entry<String, Double> entry : map.entrySet()) {
                 int min = getMin(entry.getKey());
@@ -456,58 +507,27 @@ public class SmokeTestDriver extends SmokeTestUtil {
                 double expected = ((long) min + (long) max) / 2.0;
 
                 if (entry.getValue() == null || expected != entry.getValue()) {
-                    System.out.println("fail: key=" + entry.getKey() + " avg=" + entry.getValue()
+ " expected=" + expected);
-                    success = false;
+                    if (print) {
+                        System.out.println("fail: key=" + entry.getKey() + " avg=" + entry.getValue()
+ " expected=" + expected);
+                    }
+                    return false;
                 }
             }
         }
-        return success;
+        return true;
     }
 
-    private static boolean verifyWCnt(Map<String, Long> map, Map<String, Set<Integer>>
allData) {
-        boolean success = true;
-        if (map.isEmpty()) {
-            System.out.println("wcnt is empty");
-            success = false;
-        } else {
-            System.out.println("verifying wcnt");
-
-            int expectedSize = 0;
-            for (Set<Integer> values : allData.values()) {
-                int maxValue = Collections.max(values);
-                int minValue = Collections.min(values);
-                expectedSize += maxValue / WINDOW_SIZE + 1;
-                expectedSize -= minValue / WINDOW_SIZE;
-            }
-            if (map.size() != expectedSize) {
-                System.out.println("fail: resultCount=" + map.size() + " expectedCount="
+ expectedSize);
-                success = false;
-            }
-            for (Map.Entry<String, Long> entry : map.entrySet()) {
-                long minTime = getMinFromWKey(entry.getKey()) + START_TIME;
-                long maxTime = getMaxFromWKey(entry.getKey()) + START_TIME;
-                long winTime = getStartFromWKey(entry.getKey());
-
-                long expected = WINDOW_SIZE;
-                if (minTime > winTime) expected -= minTime - winTime;
-                if (maxTime < winTime + WINDOW_SIZE - 1) expected -= winTime + WINDOW_SIZE
- 1 - maxTime;
-
-                if (expected != entry.getValue()) {
-                    System.out.println("fail: key=" + entry.getKey() + " wcnt=" + entry.getValue()
+ " expected=" + expected);
-                    success = false;
-                }
-            }
-        }
-        return success;
-    }
 
-    private static boolean verifyTAgg(Map<String, Long> map, Map<String, Set<Integer>>
allData) {
-        boolean success = true;
+    private static boolean verifyTAgg(Map<String, Long> map, Map<String, Set<Integer>>
allData, final boolean print) {
         if (map.isEmpty()) {
-            System.out.println("tagg is empty");
-            success = false;
+            if (print) {
+                System.out.println("tagg is empty");
+            }
+            return false;
         } else {
-            System.out.println("verifying tagg");
+            if (print) {
+                System.out.println("verifying tagg");
+            }
 
             // generate expected answer
             Map<String, Long> expected = new HashMap<>();
@@ -531,15 +551,15 @@ public class SmokeTestDriver extends SmokeTestUtil {
                     expectedCount = 0L;
 
                 if (entry.getValue() != expectedCount) {
-                    System.out.println("fail: key=" + key + " tagg=" + entry.getValue() +
" expected=" + expected.get(key));
-                    success = false;
+                    if (print) {
+                        System.out.println("fail: key=" + key + " tagg=" + entry.getValue()
+ " expected=" + expected.get(key));
+                    }
+                    return false;
                 }
             }
-            for (Map.Entry<String, Long> entry : expected.entrySet()) {
-                System.out.println("fail: missingKey=" + entry.getKey() + " expected=" +
entry.getValue());
-            }
+
         }
-        return success;
+        return true;
     }
 
     private static int getMin(String key) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/52e39796/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java
b/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java
deleted file mode 100644
index 0cab7f5..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.smoketest;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.streams.processor.TimestampExtractor;
-
-public class TestTimestampExtractor implements TimestampExtractor {
-
-    private final long base = SmokeTestUtil.START_TIME;
-
-    @Override
-    public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp)
{
-        switch (record.topic()) {
-            case "data":
-                return base + (Integer) record.value();
-            default:
-                return System.currentTimeMillis();
-        }
-    }
-
-}


Mime
View raw message