kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: HOTFIX: Check hasNext in KStreamWindowReduce
Date Sat, 18 Jun 2016 18:25:39 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 4c6d7ed95 -> 0b0925a16


HOTFIX: Check hasNext in KStreamWindowReduce

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Damian Guy, Matthias J. Sax

Closes #1520 from guozhangwang/KHotfix-iter-hasNext-window-value-getter


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0b0925a1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0b0925a1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0b0925a1

Branch: refs/heads/trunk
Commit: 0b0925a16f6cc94ad96fbc4dc2bcf48bf96557e6
Parents: 4c6d7ed
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Sat Jun 18 11:25:33 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Sat Jun 18 11:25:33 2016 -0700

----------------------------------------------------------------------
 .../kafka/streams/kstream/internals/KStreamWindowReduce.java   | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0b0925a1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
index 46d99a8..763ccdd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
@@ -135,13 +135,13 @@ public class KStreamWindowReduce<K, V, W extends Window> implements
KStreamAggPr
         return new KTableValueGetterSupplier<Windowed<K>, V>() {
 
             public KTableValueGetter<Windowed<K>, V> get() {
-                return new KStreamAggregateValueGetter();
+                return new KStreamWindowReduceValueGetter();
             }
 
         };
     }
 
-    private class KStreamAggregateValueGetter implements KTableValueGetter<Windowed<K>,
V> {
+    private class KStreamWindowReduceValueGetter implements KTableValueGetter<Windowed<K>,
V> {
 
         private WindowStore<K, V> windowStore;
 
@@ -159,7 +159,7 @@ public class KStreamWindowReduce<K, V, W extends Window> implements
KStreamAggPr
 
             // this iterator should only contain one element
             try (WindowStoreIterator<V> iter = windowStore.fetch(key, window.start(),
window.start())) {
-                return iter.next().value;
+                return iter.hasNext() ? iter.next().value : null;
             }
         }
     }


Mime
View raw message