kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6560: Use single query for getters as well (#4814)
Date Wed, 04 Apr 2018 02:23:12 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9313e18  KAFKA-6560: Use single query for getters as well (#4814)
9313e18 is described below

commit 9313e18fbbd974a17bfd132fe8b9ea594b26a97a
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Tue Apr 3 19:23:06 2018 -0700

    KAFKA-6560: Use single query for getters as well (#4814)
    
    Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>,
John Roesler <john@confluent.io>
---
 .../kafka/streams/kstream/internals/KStreamWindowAggregate.java     | 6 +-----
 .../apache/kafka/streams/kstream/internals/KStreamWindowReduce.java | 6 +-----
 2 files changed, 2 insertions(+), 10 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 27f8408..6953f7c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -25,7 +25,6 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.streams.state.WindowStoreIterator;
 
 import java.util.Map;
 
@@ -132,10 +131,7 @@ public class KStreamWindowAggregate<K, V, T, W extends Window>
implements KStrea
             K key = windowedKey.key();
             W window = (W) windowedKey.window();
 
-            // this iterator should contain at most one element
-            try (WindowStoreIterator<T> iter = windowStore.fetch(key, window.start(),
window.start())) {
-                return iter.hasNext() ? iter.next().value : null;
-            }
+            return windowStore.fetch(key, window.start());
         }
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
index c3d95d8..1d8b32b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
@@ -24,7 +24,6 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.streams.state.WindowStoreIterator;
 
 import java.util.Map;
 
@@ -128,10 +127,7 @@ public class KStreamWindowReduce<K, V, W extends Window> implements
KStreamAggPr
             K key = windowedKey.key();
             W window = (W) windowedKey.window();
 
-            // this iterator should only contain one element
-            try (WindowStoreIterator<V> iter = windowStore.fetch(key, window.start(),
window.start())) {
-                return iter.hasNext() ? iter.next().value : null;
-            }
+            return windowStore.fetch(key, window.start());
         }
     }
 }

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message