kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6826 avoid range scans when forwarding values during aggregation (#4927)
Date Wed, 25 Apr 2018 15:36:21 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 34a1f70  KAFKA-6826 avoid range scans when forwarding values during aggregation (#4927)
34a1f70 is described below

commit 34a1f7099b65f43037602eb13cbffd7df276e290
Author: Xavier Léauté <xvrl@users.noreply.github.com>
AuthorDate: Wed Apr 25 08:36:14 2018 -0700

    KAFKA-6826 avoid range scans when forwarding values during aggregation (#4927)
    
    Reviewers: Matthias J Sax <matthias@confluentio>, Bill Bejeck <bill@confluent.io>,
John Roesler <john@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../kafka/streams/state/internals/CachingWindowStore.java      | 10 ++++------
 1 file changed, 4 insertions(+), 6 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 9ef41ce..58111a6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -219,13 +219,11 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore
impl
     }
     
     private V fetchPrevious(final Bytes key, final long timestamp) {
-        try (final WindowStoreIterator<byte[]> iter = underlying.fetch(key, timestamp,
timestamp)) {
-            if (!iter.hasNext()) {
-                return null;
-            } else {
-                return serdes.valueFrom(iter.next().value);
-            }
+        final byte[] value = underlying.fetch(key, timestamp);
+        if (value != null) {
+            return serdes.valueFrom(value);
         }
+        return null;
     }
     
     @Override

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message