kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: cherry-pick dummy
Date Thu, 23 Jun 2016 21:30:04 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 36f8de324 -> b669b2786


cherry-pick dummy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b669b278
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b669b278
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b669b278

Branch: refs/heads/0.10.0
Commit: b669b2786c69d2dfa719033eb7c02ac444f115f4
Parents: 36f8de3
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Thu Jun 23 14:16:42 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Jun 23 14:29:54 2016 -0700

----------------------------------------------------------------------
 .../kafka/streams/kstream/JoinWindows.java      | 20 ++--
 .../kafka/streams/kstream/JoinWindowsTest.java  | 99 ++++++++++++++++++++
 .../kafka/streams/kstream/TimeWindowsTest.java  | 30 +++---
 .../kstream/internals/KStreamImplTest.java      |  5 +-
 .../internals/KStreamKStreamJoinTest.java       |  9 +-
 .../internals/KStreamKStreamLeftJoinTest.java   |  6 +-
 6 files changed, 140 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b669b278/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index f45c064..53ddf3e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -41,6 +41,8 @@ import java.util.Map;
  * </ul>
  * A join is symmetric in the sense, that a join specification on the first stream returns
the same result record as
  * a join specification on the second stream with flipped before and after values.
+ * <p>
+ * Both values (before and after) must not be negative and not zero at the same time.
  */
 public class JoinWindows extends Windows<TimeWindow> {
 
@@ -52,21 +54,27 @@ public class JoinWindows extends Windows<TimeWindow> {
     private JoinWindows(String name, long before, long after) {
         super(name);
 
+        if (before < 0) {
+            throw new IllegalArgumentException("window size must be > 0 (you provided
before as " + before + ")");
+        }
+        if (after < 0) {
+            throw new IllegalArgumentException("window size must be > 0 (you provided
after as " + after + ")");
+        }
+        if (before == 0 && after == 0) {
+            throw new IllegalArgumentException("window size must be > 0 (you provided
0)");
+        }
+
         this.after = after;
         this.before = before;
     }
 
-    public static JoinWindows of(String name) {
-        return new JoinWindows(name, 0L, 0L);
-    }
-
     /**
      * Specifies that records of the same key are joinable if their timestamps are within
{@code timeDifference}.
      *
      * @param timeDifference    join window interval
      */
-    public JoinWindows within(long timeDifference) {
-        return new JoinWindows(this.name, timeDifference, timeDifference);
+    public static JoinWindows of(String name, long timeDifference) {
+        return new JoinWindows(name, timeDifference, timeDifference);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/b669b278/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
new file mode 100644
index 0000000..d8fa7b4
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+
+public class JoinWindowsTest {
+
+    private static String anyName = "window";
+    private static long anySize = 123L;
+    private static long anyOtherSize = 456L;
+
+    @Test
+    public void shouldHaveSaneEqualsAndHashCode() {
+        JoinWindows w1 = JoinWindows.of("w1", anySize);
+        JoinWindows w2 = JoinWindows.of("w2", anySize);
+
+        // Reflexive
+        assertEquals(w1, w1);
+        assertEquals(w1.hashCode(), w1.hashCode());
+
+        // Symmetric
+        assertEquals(w1, w2);
+        assertEquals(w2, w1);
+        assertEquals(w1.hashCode(), w2.hashCode());
+
+        JoinWindows w3 = JoinWindows.of("w3", w2.after).before(anyOtherSize);
+        JoinWindows w4 = JoinWindows.of("w4", anyOtherSize).after(w2.after);
+        assertEquals(w3, w4);
+        assertEquals(w4, w3);
+        assertEquals(w3.hashCode(), w4.hashCode());
+
+        // Inequality scenarios
+        assertNotEquals("must be false for null", null, w1);
+        assertNotEquals("must be false for different window types", UnlimitedWindows.of("irrelevant"),
w1);
+        assertNotEquals("must be false for different types", new Object(), w1);
+
+        JoinWindows differentWindowSize = JoinWindows.of("differentWindowSize", w1.after
+ 1);
+        assertNotEquals("must be false when window sizes are different", differentWindowSize,
w1);
+
+        JoinWindows differentWindowSize2 = JoinWindows.of("differentWindowSize", w1.after).after(w1.after
+ 1);
+        assertNotEquals("must be false when window sizes are different", differentWindowSize2,
w1);
+
+        JoinWindows differentWindowSize3 = JoinWindows.of("differentWindowSize", w1.after).before(w1.before
+ 1);
+        assertNotEquals("must be false when window sizes are different", differentWindowSize3,
w1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void nameMustNotBeEmpty() {
+        JoinWindows.of("", anySize);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void nameMustNotBeNull() {
+        JoinWindows.of(null, anySize);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void windowSizeMustNotBeNegative() {
+        JoinWindows.of(anyName, -1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void beforeMustNotBeNegative() {
+        JoinWindows.of(anyName, anySize).before(-1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void afterSizeMustNotBeNegative() {
+        JoinWindows.of(anyName, anySize).after(-1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void windowSizeMustNotBeZero() {
+        JoinWindows.of(anyName, 0);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/b669b278/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
index 62b12a9..5acd6e2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
@@ -25,8 +25,7 @@ import org.junit.Test;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotEquals;
 
 public class TimeWindowsTest {
 
@@ -39,31 +38,30 @@ public class TimeWindowsTest {
         TimeWindows w2 = TimeWindows.of("w2", w1.size);
 
         // Reflexive
-        assertTrue(w1.equals(w1));
-        assertTrue(w1.hashCode() == w1.hashCode());
+        assertEquals(w1, w1);
+        assertEquals(w1.hashCode(), w1.hashCode());
 
         // Symmetric
-        assertTrue(w1.equals(w2));
-        assertTrue(w1.hashCode() == w2.hashCode());
-        assertTrue(w2.hashCode() == w1.hashCode());
+        assertEquals(w1, w2);
+        assertEquals(w2, w1);
+        assertEquals(w1.hashCode(), w2.hashCode());
 
         // Transitive
         TimeWindows w3 = TimeWindows.of("w3", w2.size);
-        assertTrue(w2.equals(w3));
-        assertTrue(w2.hashCode() == w3.hashCode());
-        assertTrue(w1.equals(w3));
-        assertTrue(w1.hashCode() == w3.hashCode());
+        assertEquals(w2, w3);
+        assertEquals(w1, w3);
+        assertEquals(w1.hashCode(), w3.hashCode());
 
         // Inequality scenarios
-        assertFalse("must be false for null", w1.equals(null));
-        assertFalse("must be false for different window types", w1.equals(UnlimitedWindows.of("irrelevant")));
-        assertFalse("must be false for different types", w1.equals(new Object()));
+        assertNotEquals("must be false for null", null, w1);
+        assertNotEquals("must be false for different window types", UnlimitedWindows.of("irrelevant"),
w1);
+        assertNotEquals("must be false for different types", new Object(), w1);
 
         TimeWindows differentWindowSize = TimeWindows.of("differentWindowSize", w1.size +
1);
-        assertFalse("must be false when window sizes are different", w1.equals(differentWindowSize));
+        assertNotEquals("must be false when window sizes are different", differentWindowSize,
w1);
 
         TimeWindows differentAdvanceInterval = w1.advanceBy(w1.advance - 1);
-        assertFalse("must be false when advance intervals are different", w1.equals(differentAdvanceInterval));
+        assertNotEquals("must be false when advance intervals are different", differentAdvanceInterval,
w1);
     }
 
     @Test(expected = IllegalArgumentException.class)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b669b278/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 3d45d1d..6242702 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -103,19 +103,20 @@ public class KStreamImplTest {
                 }
         );
 
+        final int anyWindowSize = 1;
         KStream<String, Integer> stream4 = streams2[0].join(streams3[0], new ValueJoiner<Integer,
Integer, Integer>() {
             @Override
             public Integer apply(Integer value1, Integer value2) {
                 return value1 + value2;
             }
-        }, JoinWindows.of("join-0"), stringSerde, intSerde, intSerde);
+        }, JoinWindows.of("join-0", anyWindowSize), stringSerde, intSerde, intSerde);
 
         KStream<String, Integer> stream5 = streams2[1].join(streams3[1], new ValueJoiner<Integer,
Integer, Integer>() {
             @Override
             public Integer apply(Integer value1, Integer value2) {
                 return value1 + value2;
             }
-        }, JoinWindows.of("join-1"), stringSerde, intSerde, intSerde);
+        }, JoinWindows.of("join-1", anyWindowSize), stringSerde, intSerde, intSerde);
 
         stream4.to("topic-5");
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b669b278/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 6b0828a..aa7d117 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -77,7 +77,8 @@ public class KStreamKStreamJoinTest {
         processor = new MockProcessorSupplier<>();
         stream1 = builder.stream(intSerde, stringSerde, topic1);
         stream2 = builder.stream(intSerde, stringSerde, topic2);
-        joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").within(100),
intSerde, stringSerde, stringSerde);
+        joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test",
100),
+                intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
         Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
@@ -175,7 +176,8 @@ public class KStreamKStreamJoinTest {
         processor = new MockProcessorSupplier<>();
         stream1 = builder.stream(intSerde, stringSerde, topic1);
         stream2 = builder.stream(intSerde, stringSerde, topic2);
-        joined = stream1.outerJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").within(100),
intSerde, stringSerde, stringSerde);
+        joined = stream1.outerJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test",
100),
+                intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
         Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
@@ -275,7 +277,8 @@ public class KStreamKStreamJoinTest {
         processor = new MockProcessorSupplier<>();
         stream1 = builder.stream(intSerde, stringSerde, topic1);
         stream2 = builder.stream(intSerde, stringSerde, topic2);
-        joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").within(100),
intSerde, stringSerde, stringSerde);
+        joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test",
100),
+                intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
         Collection<Set<String>> copartitionGroups = builder.copartitionGroups();

http://git-wip-us.apache.org/repos/asf/kafka/blob/b669b278/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 65a4b54..5b12a30 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -78,7 +78,8 @@ public class KStreamKStreamLeftJoinTest {
         processor = new MockProcessorSupplier<>();
         stream1 = builder.stream(intSerde, stringSerde, topic1);
         stream2 = builder.stream(intSerde, stringSerde, topic2);
-        joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").within(100),
intSerde, stringSerde);
+
+        joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test",
100), intSerde, stringSerde);
         joined.process(processor);
 
         Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
@@ -156,7 +157,8 @@ public class KStreamKStreamLeftJoinTest {
         processor = new MockProcessorSupplier<>();
         stream1 = builder.stream(intSerde, stringSerde, topic1);
         stream2 = builder.stream(intSerde, stringSerde, topic2);
-        joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").within(100),
intSerde, stringSerde);
+
+        joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test",
100), intSerde, stringSerde);
         joined.process(processor);
 
         Collection<Set<String>> copartitionGroups = builder.copartitionGroups();


Mime
View raw message