kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: HOTFIX: Check JoinWindow boundaries
Date Thu, 30 Jun 2016 17:50:54 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a34f78dca -> 7e9f5a7ee


HOTFIX: Check JoinWindow boundaries

guozhangwang

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1575 from mjsax/hotfix2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7e9f5a7e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7e9f5a7e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7e9f5a7e

Branch: refs/heads/trunk
Commit: 7e9f5a7ee6dfe22107c8cc3c9f57daa288d6c392
Parents: a34f78d
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Thu Jun 30 10:50:51 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Jun 30 10:50:51 2016 -0700

----------------------------------------------------------------------
 .../kafka/streams/kstream/JoinWindows.java      | 14 +++------
 .../kafka/streams/kstream/JoinWindowsTest.java  | 32 +++++++++++++-------
 2 files changed, 26 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7e9f5a7e/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 53ddf3e..309a9e6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -42,7 +42,8 @@ import java.util.Map;
  * A join is symmetric in the sense, that a join specification on the first stream returns
the same result record as
  * a join specification on the second stream with flipped before and after values.
  * <p>
- * Both values (before and after) must not be negative and not zero at the same time.
+ * Both values (before and after) must not result in an "inverse" window,
+ * i.e., lower-interval-bound must not be larger than upper-interval.bound.
  */
 public class JoinWindows extends Windows<TimeWindow> {
 
@@ -54,14 +55,8 @@ public class JoinWindows extends Windows<TimeWindow> {
     private JoinWindows(String name, long before, long after) {
         super(name);
 
-        if (before < 0) {
-            throw new IllegalArgumentException("window size must be > 0 (you provided
before as " + before + ")");
-        }
-        if (after < 0) {
-            throw new IllegalArgumentException("window size must be > 0 (you provided
after as " + after + ")");
-        }
-        if (before == 0 && after == 0) {
-            throw new IllegalArgumentException("window size must be > 0 (you provided
0)");
+        if (before + after < 0) {
+            throw new IllegalArgumentException("Window interval (ie, before+after) must not
be negative");
         }
 
         this.after = after;
@@ -70,6 +65,7 @@ public class JoinWindows extends Windows<TimeWindow> {
 
     /**
      * Specifies that records of the same key are joinable if their timestamps are within
{@code timeDifference}.
+     * ({@code timeDifference} must not be negative)
      *
      * @param timeDifference    join window interval
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/7e9f5a7e/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
index d8fa7b4..20efd45 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
@@ -29,7 +29,7 @@ public class JoinWindowsTest {
 
     private static String anyName = "window";
     private static long anySize = 123L;
-    private static long anyOtherSize = 456L;
+    private static long anyOtherSize = 456L; // should be larger than anySize
 
     @Test
     public void shouldHaveSaneEqualsAndHashCode() {
@@ -66,6 +66,21 @@ public class JoinWindowsTest {
         assertNotEquals("must be false when window sizes are different", differentWindowSize3,
w1);
     }
 
+    @Test
+    public void validWindows() {
+        JoinWindows.of(anyName, anyOtherSize)   // [ -anyOtherSize ; anyOtherSize ]
+            .before(anySize)                    // [ -anySize ; anyOtherSize ]
+            .before(0)                          // [ 0 ; anyOtherSize ]
+            .before(-anySize)                   // [ anySize ; anyOtherSize ]
+            .before(-anyOtherSize);             // [ anyOtherSize ; anyOtherSize ]
+
+        JoinWindows.of(anyName, anyOtherSize)   // [ -anyOtherSize ; anyOtherSize ]
+            .after(anySize)                     // [ -anyOtherSize ; anySize ]
+            .after(0)                           // [ -anyOtherSize ; 0 ]
+            .after(-anySize)                    // [ -anyOtherSize ; -anySize ]
+            .after(-anyOtherSize);              // [ -anyOtherSize ; -anyOtherSize ]
+    }
+
     @Test(expected = IllegalArgumentException.class)
     public void nameMustNotBeEmpty() {
         JoinWindows.of("", anySize);
@@ -77,23 +92,18 @@ public class JoinWindowsTest {
     }
 
     @Test(expected = IllegalArgumentException.class)
-    public void windowSizeMustNotBeNegative() {
+    public void timeDifferenceMustNotBeNegative() {
         JoinWindows.of(anyName, -1);
     }
 
     @Test(expected = IllegalArgumentException.class)
-    public void beforeMustNotBeNegative() {
-        JoinWindows.of(anyName, anySize).before(-1);
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void afterSizeMustNotBeNegative() {
-        JoinWindows.of(anyName, anySize).after(-1);
+    public void afterBelowLower() {
+        JoinWindows.of(anyName, anySize).after(-anySize - 1);
     }
 
     @Test(expected = IllegalArgumentException.class)
-    public void windowSizeMustNotBeZero() {
-        JoinWindows.of(anyName, 0);
+    public void beforeOverUpper() {
+        JoinWindows.of(anyName, anySize).before(-anySize - 1);
     }
 
 }
\ No newline at end of file


Mime
View raw message