kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3784: TimeWindows#windowsFor calculation is incorrect
Date Fri, 03 Jun 2016 20:22:02 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 419e6517c -> c0537b5f0


KAFKA-3784: TimeWindows#windowsFor calculation is incorrect

- Fixed the logic calculating the windows that are affected by a new …event in the case
of hopping windows and a small overlap.
- Added a unit test that tests for the issue

Author: Tom Rybak <trybak@gmail.com>

Reviewers: Michael G. Noll, Matthias J. Sax, Guozhang Wang

Closes #1462 from trybak/bugfix/KAFKA-3784-TimeWindows#windowsFor-false-positives

(cherry picked from commit 234fa5a6949c9a5bfb4f543989c2ece84fcce033)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c0537b5f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c0537b5f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c0537b5f

Branch: refs/heads/0.10.0
Commit: c0537b5f059e025fa268d92a58c27d98540b7c5a
Parents: 419e651
Author: Tom Rybak <trybak@gmail.com>
Authored: Fri Jun 3 13:21:40 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Jun 3 13:22:00 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/kafka/streams/kstream/TimeWindows.java   | 4 +---
 .../org/apache/kafka/streams/kstream/TimeWindowsTest.java    | 8 ++++++++
 2 files changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c0537b5f/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index e4ce883..001e92e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -99,9 +99,7 @@ public class TimeWindows extends Windows<TimeWindow> {
 
     @Override
     public Map<Long, TimeWindow> windowsFor(long timestamp) {
-        long enclosed = (size - 1) / advance;
-        long windowStart = Math.max(0, timestamp - timestamp % advance - enclosed * advance);
-
+        long windowStart = (Math.max(0, timestamp - this.size + this.advance) / this.advance)
* this.advance;
         Map<Long, TimeWindow> windows = new HashMap<>();
         while (windowStart <= timestamp) {
             TimeWindow window = new TimeWindow(windowStart, windowStart + this.size);

http://git-wip-us.apache.org/repos/asf/kafka/blob/c0537b5f/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
index e9ff235..62b12a9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
@@ -113,6 +113,14 @@ public class TimeWindowsTest {
     }
 
     @Test
+    public void windowsForBarelyOverlappingHoppingWindows() {
+        TimeWindows windows = TimeWindows.of(anyName, 6L).advanceBy(5L);
+        Map<Long, TimeWindow> matched = windows.windowsFor(7L);
+        assertEquals(1, matched.size());
+        assertEquals(new TimeWindow(5L, 11L), matched.get(5L));
+    }
+
+    @Test
     public void windowsForTumblingWindows() {
         TimeWindows windows = TimeWindows.of(anyName, 12L);
         Map<Long, TimeWindow> matched = windows.windowsFor(21L);


Mime
View raw message