kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: (re)add equals/hashCode to *Windows (#5510)
Date Wed, 15 Aug 2018 21:30:31 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a4a65ab  MINOR: (re)add equals/hashCode to *Windows (#5510)
a4a65ab is described below

commit a4a65abcd3e5a01e0910afb10a214f4bb47577a3
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Wed Aug 15 16:30:24 2018 -0500

    MINOR: (re)add equals/hashCode to *Windows (#5510)
    
    Andy Coates <big-andy-coates@users.noreply.github.com>, Bill Bejeck <bill@confluent.io>,
Guozhang Wang <wangguoz@gmail.com>
---
 .../apache/kafka/streams/kstream/JoinWindows.java  | 25 ++++++
 .../kafka/streams/kstream/SessionWindows.java      | 27 ++++++-
 .../apache/kafka/streams/kstream/TimeWindows.java  | 25 ++++++
 .../kafka/streams/kstream/UnlimitedWindows.java    | 22 ++++++
 .../org/apache/kafka/streams/kstream/Windows.java  | 36 +++++++++
 .../kafka/streams/kstream/JoinWindowsTest.java     | 91 ++++++++++++++++++++++
 .../kafka/streams/kstream/SessionWindowsTest.java  | 38 +++++++++
 .../kafka/streams/kstream/TimeWindowsTest.java     | 77 ++++++++++++++++++
 .../streams/kstream/UnlimitedWindowsTest.java      | 17 ++++
 9 files changed, 357 insertions(+), 1 deletion(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index bd31175..f91182e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * The window specifications used for joins.
@@ -168,4 +169,28 @@ public final class JoinWindows extends Windows<Window> {
     public long maintainMs() {
         return Math.max(super.maintainMs(), size());
     }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        if (!super.equals(o)) return false;
+        final JoinWindows that = (JoinWindows) o;
+        return beforeMs == that.beforeMs &&
+            afterMs == that.afterMs;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), beforeMs, afterMs);
+    }
+
+    @Override
+    public String toString() {
+        return "JoinWindows{" +
+            "beforeMs=" + beforeMs +
+            ", afterMs=" + afterMs +
+            ", super=" + super.toString() +
+            '}';
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
index 96aea0a..c4dd91d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
@@ -20,6 +20,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
 
 import java.time.Duration;
+import java.util.Objects;
 
 
 /**
@@ -111,7 +112,7 @@ public final class SessionWindows {
             throw new IllegalArgumentException("Window retention time (durationMs) cannot
be smaller than window gap.");
         }
 
-        return new SessionWindows(gapMs, durationMs, null);
+        return new SessionWindows(gapMs, durationMs, grace);
     }
 
     /**
@@ -168,4 +169,28 @@ public final class SessionWindows {
         return Math.max(maintainDurationMs, gapMs);
     }
 
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        final SessionWindows that = (SessionWindows) o;
+        return gapMs == that.gapMs &&
+            maintainDurationMs == that.maintainDurationMs &&
+            Objects.equals(grace, that.grace);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(gapMs, maintainDurationMs, grace);
+    }
+
+    @Override
+    public String toString() {
+        return "SessionWindows{" +
+            "gapMs=" + gapMs +
+            ", maintainDurationMs=" + maintainDurationMs +
+            ", grace=" + grace +
+            '}';
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index 808b006..8b62a43 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * The fixed-size time-based window specifications used for aggregations.
@@ -157,4 +158,28 @@ public final class TimeWindows extends Windows<TimeWindow> {
     public long maintainMs() {
         return Math.max(super.maintainMs(), sizeMs);
     }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        if (!super.equals(o)) return false;
+        final TimeWindows that = (TimeWindows) o;
+        return sizeMs == that.sizeMs &&
+            advanceMs == that.advanceMs;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), sizeMs, advanceMs);
+    }
+
+    @Override
+    public String toString() {
+        return "TimeWindows{" +
+            "sizeMs=" + sizeMs +
+            ", advanceMs=" + advanceMs +
+            ", super=" + super.toString() +
+            '}';
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
index e795a2c..cf4482f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
@@ -21,6 +21,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * The unlimited window specifications used for aggregations.
@@ -129,4 +130,25 @@ public final class UnlimitedWindows extends Windows<UnlimitedWindow>
{
         throw new IllegalArgumentException("Grace period cannot be set for UnlimitedWindows.");
     }
 
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        if (!super.equals(o)) return false;
+        final UnlimitedWindows that = (UnlimitedWindows) o;
+        return startMs == that.startMs;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), startMs);
+    }
+
+    @Override
+    public String toString() {
+        return "UnlimitedWindows{" +
+            "startMs=" + startMs +
+            ", super=" + super.toString() +
+            '}';
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index adfc88a..406e44f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -21,12 +21,16 @@ import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 
 import java.time.Duration;
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * The window specification interface for fixed size windows that is used to define window
boundaries and grace period.
  *
  * Grace period defines how long to wait on late events, where lateness is defined as (stream_time
- record_timestamp).
  *
+ * Warning: It may be unsafe to use objects of this class in set- or map-like collections,
+ * since the equals and hashCode methods depend on mutable fields.
+ *
  * @param <W> type of the window instance
  * @see TimeWindows
  * @see UnlimitedWindows
@@ -155,4 +159,36 @@ public abstract class Windows<W extends Window> {
      * @return the size of the specified windows
      */
     public abstract long size();
+
+    /**
+     * Warning: It may be unsafe to use objects of this class in set- or map-like collections,
+     * since the equals and hashCode methods depend on mutable fields.
+     */
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        final Windows<?> windows = (Windows<?>) o;
+        return maintainMs() == windows.maintainMs() &&
+            segments == windows.segments &&
+            Objects.equals(gracePeriodMs(), windows.gracePeriodMs());
+    }
+
+    /**
+     * Warning: It may be unsafe to use objects of this class in set- or map-like collections,
+     * since the equals and hashCode methods depend on mutable fields.
+     */
+    @Override
+    public int hashCode() {
+        return Objects.hash(maintainMs(), segments, gracePeriodMs());
+    }
+
+    @Override
+    public String toString() {
+        return "Windows{" +
+            "maintainDurationMs=" + maintainMs() +
+            ", segments=" + segments +
+            ", grace=" + gracePeriodMs() +
+            '}';
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
index 5576b93..ae79371 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.fail;
 
 
@@ -102,4 +103,94 @@ public class JoinWindowsTest {
         }
     }
 
+    @Test
+    public void equalsAndHashcodeShouldBeValidForPositiveCases() {
+        assertEquals(JoinWindows.of(3), JoinWindows.of(3));
+        assertEquals(JoinWindows.of(3).hashCode(), JoinWindows.of(3).hashCode());
+
+        assertEquals(JoinWindows.of(3).after(2), JoinWindows.of(3).after(2));
+        assertEquals(JoinWindows.of(3).after(2).hashCode(), JoinWindows.of(3).after(2).hashCode());
+
+        assertEquals(JoinWindows.of(3).before(2), JoinWindows.of(3).before(2));
+        assertEquals(JoinWindows.of(3).before(2).hashCode(), JoinWindows.of(3).before(2).hashCode());
+
+        assertEquals(JoinWindows.of(3).grace(2), JoinWindows.of(3).grace(2));
+        assertEquals(JoinWindows.of(3).grace(2).hashCode(), JoinWindows.of(3).grace(2).hashCode());
+
+        assertEquals(JoinWindows.of(3).until(60), JoinWindows.of(3).until(60));
+        assertEquals(JoinWindows.of(3).until(60).hashCode(), JoinWindows.of(3).until(60).hashCode());
+
+        assertEquals(
+            JoinWindows.of(3).before(1).after(2).grace(3).until(60),
+            JoinWindows.of(3).before(1).after(2).grace(3).until(60)
+        );
+        assertEquals(
+            JoinWindows.of(3).before(1).after(2).grace(3).until(60).hashCode(),
+            JoinWindows.of(3).before(1).after(2).grace(3).until(60).hashCode()
+        );
+        // JoinWindows is a little weird in that before and after set the same fields as
of.
+        assertEquals(
+            JoinWindows.of(9).before(1).after(2).grace(3).until(60),
+            JoinWindows.of(3).before(1).after(2).grace(3).until(60)
+        );
+        assertEquals(
+            JoinWindows.of(9).before(1).after(2).grace(3).until(60).hashCode(),
+            JoinWindows.of(3).before(1).after(2).grace(3).until(60).hashCode()
+        );
+    }
+
+    @Test
+    public void equalsAndHashcodeShouldBeValidForNegativeCases() {
+        assertNotEquals(JoinWindows.of(9), JoinWindows.of(3));
+        assertNotEquals(JoinWindows.of(9).hashCode(), JoinWindows.of(3).hashCode());
+
+        assertNotEquals(JoinWindows.of(3).after(9), JoinWindows.of(3).after(2));
+        assertNotEquals(JoinWindows.of(3).after(9).hashCode(), JoinWindows.of(3).after(2).hashCode());
+
+        assertNotEquals(JoinWindows.of(3).before(9), JoinWindows.of(3).before(2));
+        assertNotEquals(JoinWindows.of(3).before(9).hashCode(), JoinWindows.of(3).before(2).hashCode());
+
+        assertNotEquals(JoinWindows.of(3).grace(9), JoinWindows.of(3).grace(2));
+        assertNotEquals(JoinWindows.of(3).grace(9).hashCode(), JoinWindows.of(3).grace(2).hashCode());
+
+        assertNotEquals(JoinWindows.of(3).until(90), JoinWindows.of(3).until(60));
+        assertNotEquals(JoinWindows.of(3).until(90).hashCode(), JoinWindows.of(3).until(60).hashCode());
+
+
+        assertNotEquals(
+            JoinWindows.of(3).before(9).after(2).grace(3).until(60),
+            JoinWindows.of(3).before(1).after(2).grace(3).until(60)
+        );
+        assertNotEquals(
+            JoinWindows.of(3).before(9).after(2).grace(3).until(60).hashCode(),
+            JoinWindows.of(3).before(1).after(2).grace(3).until(60).hashCode()
+        );
+
+        assertNotEquals(
+            JoinWindows.of(3).before(1).after(9).grace(3).until(60),
+            JoinWindows.of(3).before(1).after(2).grace(3).until(60)
+        );
+        assertNotEquals(
+            JoinWindows.of(3).before(1).after(9).grace(3).until(60).hashCode(),
+            JoinWindows.of(3).before(1).after(2).grace(3).until(60).hashCode()
+        );
+
+        assertNotEquals(
+            JoinWindows.of(3).before(1).after(2).grace(9).until(60),
+            JoinWindows.of(3).before(1).after(2).grace(3).until(60)
+        );
+        assertNotEquals(
+            JoinWindows.of(3).before(1).after(2).grace(9).until(60).hashCode(),
+            JoinWindows.of(3).before(1).after(2).grace(3).until(60).hashCode()
+        );
+
+        assertNotEquals(
+            JoinWindows.of(3).before(1).after(2).grace(3).until(90),
+            JoinWindows.of(3).before(1).after(2).grace(3).until(60)
+        );
+        assertNotEquals(
+            JoinWindows.of(3).before(1).after(2).grace(3).until(90).hashCode(),
+            JoinWindows.of(3).before(1).after(2).grace(3).until(60).hashCode()
+        );
+    }
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
index c464c75..2acd5d2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.fail;
 
 public class SessionWindowsTest {
@@ -77,4 +78,41 @@ public class SessionWindowsTest {
             // expected
         }
     }
+
+    @Test
+    public void equalsAndHashcodeShouldBeValidForPositiveCases() {
+        assertEquals(SessionWindows.with(1), SessionWindows.with(1));
+        assertEquals(SessionWindows.with(1).hashCode(), SessionWindows.with(1).hashCode());
+
+        assertEquals(SessionWindows.with(1).grace(6), SessionWindows.with(1).grace(6));
+        assertEquals(SessionWindows.with(1).grace(6).hashCode(), SessionWindows.with(1).grace(6).hashCode());
+
+        assertEquals(SessionWindows.with(1).until(7), SessionWindows.with(1).until(7));
+        assertEquals(SessionWindows.with(1).until(7).hashCode(), SessionWindows.with(1).until(7).hashCode());
+
+        assertEquals(SessionWindows.with(1).grace(6).until(7), SessionWindows.with(1).grace(6).until(7));
+        assertEquals(SessionWindows.with(1).grace(6).until(7).hashCode(), SessionWindows.with(1).grace(6).until(7).hashCode());
+    }
+
+    @Test
+    public void equalsAndHashcodeShouldBeValidForNegativeCases() {
+        assertNotEquals(SessionWindows.with(9), SessionWindows.with(1));
+        assertNotEquals(SessionWindows.with(9).hashCode(), SessionWindows.with(1).hashCode());
+
+        assertNotEquals(SessionWindows.with(1).grace(9), SessionWindows.with(1).grace(6));
+        assertNotEquals(SessionWindows.with(1).grace(9).hashCode(), SessionWindows.with(1).grace(6).hashCode());
+
+        assertNotEquals(SessionWindows.with(1).until(9), SessionWindows.with(1).until(7));
+        assertNotEquals(SessionWindows.with(1).until(9).hashCode(), SessionWindows.with(1).until(7).hashCode());
+
+
+        assertNotEquals(SessionWindows.with(2).grace(6).until(7), SessionWindows.with(1).grace(6).until(7));
+        assertNotEquals(SessionWindows.with(2).grace(6).until(7).hashCode(), SessionWindows.with(1).grace(6).until(7).hashCode());
+
+        assertNotEquals(SessionWindows.with(1).grace(0).until(7), SessionWindows.with(1).grace(6).until(7));
+        assertNotEquals(SessionWindows.with(1).grace(0).until(7).hashCode(), SessionWindows.with(1).grace(6).until(7).hashCode());
+
+        assertNotEquals(SessionWindows.with(1).grace(6).until(70), SessionWindows.with(1).grace(6).until(7));
+        assertNotEquals(SessionWindows.with(1).grace(6).until(70).hashCode(), SessionWindows.with(1).grace(6).until(7).hashCode());
+    }
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
index b8d3bfd..d426b28 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
@@ -22,6 +22,7 @@ import org.junit.Test;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.fail;
 
 public class TimeWindowsTest {
@@ -146,4 +147,80 @@ public class TimeWindowsTest {
         assertEquals(new TimeWindow(12L, 24L), matched.get(12L));
     }
 
+
+    @Test
+    public void equalsAndHashcodeShouldBeValidForPositiveCases() {
+        assertEquals(TimeWindows.of(3), TimeWindows.of(3));
+        assertEquals(TimeWindows.of(3).hashCode(), TimeWindows.of(3).hashCode());
+
+        assertEquals(TimeWindows.of(3).advanceBy(1), TimeWindows.of(3).advanceBy(1));
+        assertEquals(TimeWindows.of(3).advanceBy(1).hashCode(), TimeWindows.of(3).advanceBy(1).hashCode());
+
+        assertEquals(TimeWindows.of(3).grace(1), TimeWindows.of(3).grace(1));
+        assertEquals(TimeWindows.of(3).grace(1).hashCode(), TimeWindows.of(3).grace(1).hashCode());
+
+        assertEquals(TimeWindows.of(3).until(4), TimeWindows.of(3).until(4));
+        assertEquals(TimeWindows.of(3).until(4).hashCode(), TimeWindows.of(3).until(4).hashCode());
+
+        assertEquals(
+            TimeWindows.of(3).advanceBy(1).grace(1).until(4),
+            TimeWindows.of(3).advanceBy(1).grace(1).until(4)
+        );
+        assertEquals(
+            TimeWindows.of(3).advanceBy(1).grace(1).until(4).hashCode(),
+            TimeWindows.of(3).advanceBy(1).grace(1).until(4).hashCode()
+        );
+    }
+
+    @Test
+    public void equalsAndHashcodeShouldBeValidForNegativeCases() {
+        assertNotEquals(TimeWindows.of(9), TimeWindows.of(3));
+        assertNotEquals(TimeWindows.of(9).hashCode(), TimeWindows.of(3).hashCode());
+
+        assertNotEquals(TimeWindows.of(3).advanceBy(2), TimeWindows.of(3).advanceBy(1));
+        assertNotEquals(TimeWindows.of(3).advanceBy(2).hashCode(), TimeWindows.of(3).advanceBy(1).hashCode());
+
+        assertNotEquals(TimeWindows.of(3).grace(2), TimeWindows.of(3).grace(1));
+        assertNotEquals(TimeWindows.of(3).grace(2).hashCode(), TimeWindows.of(3).grace(1).hashCode());
+
+        assertNotEquals(TimeWindows.of(3).until(9), TimeWindows.of(3).until(4));
+        assertNotEquals(TimeWindows.of(3).until(9).hashCode(), TimeWindows.of(3).until(4).hashCode());
+
+
+        assertNotEquals(
+            TimeWindows.of(4).advanceBy(2).grace(2).until(4),
+            TimeWindows.of(3).advanceBy(2).grace(2).until(4)
+        );
+        assertNotEquals(
+            TimeWindows.of(4).advanceBy(2).grace(2).until(4).hashCode(),
+            TimeWindows.of(3).advanceBy(2).grace(2).until(4).hashCode()
+        );
+
+        assertNotEquals(
+            TimeWindows.of(3).advanceBy(1).grace(2).until(4),
+            TimeWindows.of(3).advanceBy(2).grace(2).until(4)
+        );
+        assertNotEquals(
+            TimeWindows.of(3).advanceBy(1).grace(2).until(4).hashCode(),
+            TimeWindows.of(3).advanceBy(2).grace(2).until(4).hashCode()
+        );
+
+        assertNotEquals(
+            TimeWindows.of(3).advanceBy(2).grace(1).until(4),
+            TimeWindows.of(3).advanceBy(2).grace(2).until(4)
+        );
+        assertNotEquals(
+            TimeWindows.of(3).advanceBy(2).grace(1).until(4).hashCode(),
+            TimeWindows.of(3).advanceBy(2).grace(2).until(4).hashCode()
+        );
+
+        assertNotEquals(
+            TimeWindows.of(3).advanceBy(2).grace(2).until(9),
+            TimeWindows.of(3).advanceBy(2).grace(2).until(4)
+        );
+        assertNotEquals(
+            TimeWindows.of(3).advanceBy(2).grace(2).until(9).hashCode(),
+            TimeWindows.of(3).advanceBy(2).grace(2).until(4).hashCode()
+        );
+    }
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java
index 9798a81..5d139c8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java
@@ -22,6 +22,7 @@ import org.junit.Test;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -85,4 +86,20 @@ public class UnlimitedWindowsTest {
         assertTrue(matchedWindows.isEmpty());
     }
 
+    @Test
+    public void equalsAndHashcodeShouldBeValidForPositiveCases() {
+        assertEquals(UnlimitedWindows.of(), UnlimitedWindows.of());
+        assertEquals(UnlimitedWindows.of().hashCode(), UnlimitedWindows.of().hashCode());
+
+        assertEquals(UnlimitedWindows.of().startOn(1), UnlimitedWindows.of().startOn(1));
+        assertEquals(UnlimitedWindows.of().startOn(1).hashCode(), UnlimitedWindows.of().startOn(1).hashCode());
+
+    }
+
+    @Test
+    public void equalsAndHashcodeShouldBeValidForNegativeCases() {
+        assertNotEquals(UnlimitedWindows.of().startOn(9), UnlimitedWindows.of().startOn(1));
+        assertNotEquals(UnlimitedWindows.of().startOn(9).hashCode(), UnlimitedWindows.of().startOn(1).hashCode());
+    }
+
 }
\ No newline at end of file


Mime
View raw message