kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: Stabilize flaky smoke system tests before KIP-91
Date Tue, 19 Dec 2017 01:35:41 GMT
Repository: kafka
Updated Branches:
  refs/heads/1.0 45a7bb02f -> d2cbd36e2


MINOR: Stabilize flaky smoke system tests before KIP-91

This is a workaround until KIP-91 is merged. We tried increasing the timeout multiple times
already but tests are still flaky.

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Bill Bejeck <bill@confluent.io>, Apurva Mehta <apurva@confluent.io>,
Guozhang Wang <wangguoz@gmail.com>

Closes #4329 from mjsax/hotfix-system-tests

(cherry picked from commit 22f742cdd2899b76c1b4222863ee02ad3bc749a1)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d2cbd36e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d2cbd36e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d2cbd36e

Branch: refs/heads/1.0
Commit: d2cbd36e2a9642414e0b1ca786425380b04b2e31
Parents: 45a7bb0
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Mon Dec 18 17:34:50 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Dec 18 17:35:38 2017 -0800

----------------------------------------------------------------------
 .../kafka/streams/tests/SmokeTestDriver.java    | 53 ++++++++++++++++----
 1 file changed, 43 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d2cbd36e/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index a5aef2a..fdc9326 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.utils.Exit;
@@ -155,6 +156,8 @@ public class SmokeTestDriver extends SmokeTestUtil {
 
         int remaining = data.length;
 
+        List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>();
+
         while (remaining > 0) {
             int index = rand.nextInt(remaining);
             String key = data[index].key;
@@ -168,29 +171,59 @@ public class SmokeTestDriver extends SmokeTestUtil {
                 ProducerRecord<byte[], byte[]> record =
                         new ProducerRecord<>("data", stringSerde.serializer().serialize("",
key), intSerde.serializer().serialize("", value));
 
-                producer.send(record, new Callback() {
-                    @Override
-                    public void onCompletion(final RecordMetadata metadata, final Exception
exception) {
-                        if (exception != null) {
-                            exception.printStackTrace();
-                            Exit.exit(1);
-                        }
-                    }
-                });
-
+                producer.send(record, new TestCallback(record, needRetry));
 
                 numRecordsProduced++;
                 allData.get(key).add(value);
                 if (numRecordsProduced % 100 == 0)
                     System.out.println(numRecordsProduced + " records produced");
                 Utils.sleep(2);
+            }
+        }
+        producer.flush();
 
+        int remainingRetries = 5;
+        while (!needRetry.isEmpty()) {
+            final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
+            for (final ProducerRecord<byte[], byte[]> record : needRetry) {
+                producer.send(record, new TestCallback(record, needRetry2));
+            }
+            producer.flush();
+            needRetry = needRetry2;
+
+            if (--remainingRetries == 0 && !needRetry.isEmpty()) {
+                System.err.println("Failed to produce all records after multiple retries");
+                Exit.exit(1);
             }
         }
+
         producer.close();
         return Collections.unmodifiableMap(allData);
     }
 
+    private static class TestCallback implements Callback {
+        private final ProducerRecord<byte[], byte[]> originalRecord;
+        private final List<ProducerRecord<byte[], byte[]>> needRetry;
+
+        TestCallback(final ProducerRecord<byte[], byte[]> originalRecord,
+                     final List<ProducerRecord<byte[], byte[]>> needRetry) {
+            this.originalRecord = originalRecord;
+            this.needRetry = needRetry;
+        }
+
+        @Override
+        public void onCompletion(final RecordMetadata metadata, final Exception exception)
{
+            if (exception != null) {
+                if (exception instanceof TimeoutException) {
+                    needRetry.add(originalRecord);
+                } else {
+                    exception.printStackTrace();
+                    Exit.exit(1);
+                }
+            }
+        }
+    }
+
     private static void shuffle(int[] data, int windowSize) {
         Random rand = new Random();
         for (int i = 0; i < data.length; i++) {


Mime
View raw message