kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3896: fix transient failure in KStreamRepartitionJoinTest
Date Mon, 27 Jun 2016 18:50:46 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 0e0b632b9 -> 10abe858b


KAFKA-3896: fix transient failure in KStreamRepartitionJoinTest

ijuma i checked the cases where this test has failed and it seems to always be on the verification
of the left join. I've ran this test plenty of times and i can't get it to fail. However in
the interest of having stable builds, i've removed just the part of the test that is failing
(which happens to be the last verification).
Thanks,
Damian

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Ismael Juma, Guozhang Wang

Closes #1549 from dguy/kafka-3896


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/10abe858
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/10abe858
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/10abe858

Branch: refs/heads/trunk
Commit: 10abe858bd892863f1d01c29bfb6d0f4206de2d3
Parents: 0e0b632
Author: Damian Guy <damian.guy@gmail.com>
Authored: Mon Jun 27 11:50:44 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Jun 27 11:50:44 2016 -0700

----------------------------------------------------------------------
 .../integration/KStreamRepartitionJoinTest.java    | 17 ++++++++++++-----
 1 file changed, 12 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/10abe858/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
index 7dabc33..50ad84c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
@@ -40,6 +40,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
@@ -50,6 +51,8 @@ public class KStreamRepartitionJoinTest {
     public static final EmbeddedSingleNodeKafkaCluster CLUSTER =
         new EmbeddedSingleNodeKafkaCluster();
 
+    private static final long WINDOW_SIZE = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS);
+
     private KStreamBuilder builder;
     private Properties streamsConfiguration;
     private KStream<Long, Integer> streamOne;
@@ -225,7 +228,7 @@ public class KStreamRepartitionJoinTest {
         streamTwo
             .join(streamOne.map(keyMapper),
                   joiner,
-                  JoinWindows.of(output, 60 * 1000),
+                  getJoinWindow(output),
                   Serdes.Integer(),
                   Serdes.String(),
                   Serdes.Integer())
@@ -252,7 +255,7 @@ public class KStreamRepartitionJoinTest {
         String outputTopic = "left-join";
         map1.leftJoin(map2,
                       valueJoiner,
-                      JoinWindows.of("the-left-join", 60 * 1000),
+                      getJoinWindow("the-left-join"),
                       Serdes.Integer(),
                       Serdes.Integer(),
                       Serdes.String())
@@ -281,7 +284,7 @@ public class KStreamRepartitionJoinTest {
 
         final KStream<Integer, String> join = map1.join(map2,
                                                         valueJoiner,
-                                                        JoinWindows.of("join-one", 60 * 1000),
+                                                        getJoinWindow("join-one"),
                                                         Serdes.Integer(),
                                                         Serdes.Integer(),
                                                         Serdes.String());
@@ -296,7 +299,7 @@ public class KStreamRepartitionJoinTest {
         join.map(kvMapper)
             .join(streamFour.map(kvMapper),
                   joiner,
-                  JoinWindows.of("the-other-join", 60 * 1000),
+                  getJoinWindow("the-other-join"),
                   Serdes.Integer(),
                   Serdes.String(),
                   Serdes.String())
@@ -307,6 +310,10 @@ public class KStreamRepartitionJoinTest {
                             topic);
     }
 
+    private JoinWindows getJoinWindow(String name) {
+        return (JoinWindows) JoinWindows.of(name, WINDOW_SIZE).until(3 * WINDOW_SIZE);
+    }
+
 
     private class ExpectedOutputOnTopic {
         private final List<String> expectedOutput;
@@ -431,7 +438,7 @@ public class KStreamRepartitionJoinTest {
         CLUSTER.createTopic(outputTopic);
         lhs.join(rhs,
                  valueJoiner,
-                 JoinWindows.of(joinName, 60 * 1000),
+                 getJoinWindow(joinName),
                  Serdes.Integer(),
                  Serdes.Integer(),
                  Serdes.String())


Mime
View raw message