kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch trunk updated: MINOR: Align KTableAgg and KTableReduce (#6712)
Date Sat, 11 May 2019 09:55:14 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7c20257  MINOR: Align KTableAgg and KTableReduce (#6712)
7c20257 is described below

commit 7c20257d98c7037efc74e4159d4b0adc1741d891
Author: Matthias J. Sax <matthias@confluent.io>
AuthorDate: Sat May 11 11:54:58 2019 +0200

    MINOR: Align KTableAgg and KTableReduce (#6712)
    
    Reviewers: John Roesler <john@confluent.io>, Bill Bejeck <bill@confluent.io>,
Jeff Kim <kimkb2011@gmail.com>, Guozhang Wang <guozhang@confluent.io>
---
 .../streams/kstream/internals/KStreamReduce.java   |  1 -
 .../streams/kstream/internals/KTableAggregate.java | 27 ++++++++++++++--------
 .../streams/kstream/internals/KTableReduce.java    | 23 ++++++++++--------
 3 files changed, 31 insertions(+), 20 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index 09e4fab..cd67283 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -59,7 +59,6 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K,
K, V,
         public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
-
             store = (KeyValueStore<K, V>) context.getStateStore(storeName);
             tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K,
V>(context), sendOldValues);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index c53224e..1c44486 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -75,22 +75,29 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K,
V, T
                 throw new StreamsException("Record key for KTable aggregate operator with
state " + storeName + " should not be null.");
             }
 
-            T oldAgg = store.get(key);
-
-            if (oldAgg == null) {
-                oldAgg = initializer.apply();
-            }
-
-            T newAgg = oldAgg;
+            final T oldAgg = store.get(key);
+            final T intermediateAgg;
 
             // first try to remove the old value
-            if (value.oldValue != null) {
-                newAgg = remove.apply(key, value.oldValue, newAgg);
+            if (value.oldValue != null && oldAgg != null) {
+                intermediateAgg = remove.apply(key, value.oldValue, oldAgg);
+            } else {
+                intermediateAgg = oldAgg;
             }
 
             // then try to add the new value
+            final T newAgg;
             if (value.newValue != null) {
-                newAgg = add.apply(key, value.newValue, newAgg);
+                final T initializedAgg;
+                if (intermediateAgg == null) {
+                    initializedAgg = initializer.apply();
+                } else {
+                    initializedAgg = intermediateAgg;
+                }
+
+                newAgg = add.apply(key, value.newValue, initializedAgg);
+            } else {
+                newAgg = intermediateAgg;
             }
 
             // update the store with the new value
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
index 38c5a11..70db6443 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -71,20 +71,25 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K,
V, V> {
             }
 
             final V oldAgg = store.get(key);
-            V newAgg = oldAgg;
+            final V intermediateAgg;
 
-            // first try to add the new value
+            // first try to remove the old value
+            if (value.oldValue != null && oldAgg != null) {
+                intermediateAgg = removeReducer.apply(oldAgg, value.oldValue);
+            } else {
+                intermediateAgg = oldAgg;
+            }
+
+            // then try to add the new value
+            final V newAgg;
             if (value.newValue != null) {
-                if (newAgg == null) {
+                if (intermediateAgg == null) {
                     newAgg = value.newValue;
                 } else {
-                    newAgg = addReducer.apply(newAgg, value.newValue);
+                    newAgg = addReducer.apply(intermediateAgg, value.newValue);
                 }
-            }
-
-            // then try to remove the old value
-            if (value.oldValue != null) {
-                newAgg = removeReducer.apply(newAgg, value.oldValue);
+            } else {
+                newAgg = intermediateAgg;
             }
 
             // update the store with the new value


Mime
View raw message