kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6036: Follow-up; cleanup sendOldValues logic ForwardingCacheFlushListener (#6017)
Date Sun, 09 Dec 2018 23:33:28 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ab156fd  KAFKA-6036: Follow-up; cleanup sendOldValues logic ForwardingCacheFlushListener
 (#6017)
ab156fd is described below

commit ab156fded1548b6cf06aff56bb5193c37a8f7e51
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Sun Dec 9 15:33:17 2018 -0800

    KAFKA-6036: Follow-up; cleanup sendOldValues logic ForwardingCacheFlushListener  (#6017)
    
    This is a follow-up PR from the previous PR #5779, where KTabeSource always get old values
from the store even if sendOldValues. It gets me to make a pass over all the KTable/KStreamXXX
processor to push the sendOldValues at the callers in order to avoid unnecessary store reads.
    
    More details: ForwardingCacheFlushListener and TupleForwarder both need sendOldValues
as parameters.
    a. For ForwardingCacheFlushListener it is not needed at all, since its callers XXXCachedStore
already use the sendOldValues values passed from TupleForwarder to avoid getting old values
from underlying stores.
    b. For TupleForwarder, it actually only need to pass the boolean flag to the cached store;
and then it does not need to keep it as its own variable since the cached store already respects
the boolean to pass null or the actual value..
    
    The only other minor bug I found from the pass in on KTableJoinMerge, where we always
pass old values and ignores sendOldValues.
    
    Reviewers: Matthias J. Sax <mjsax@apache.org>
---
 .../kstream/internals/ForwardingCacheFlushListener.java        | 10 ++--------
 .../kafka/streams/kstream/internals/KStreamAggregate.java      |  6 +++---
 .../apache/kafka/streams/kstream/internals/KStreamReduce.java  |  4 ++--
 .../kstream/internals/KStreamSessionWindowAggregate.java       |  6 ++----
 .../streams/kstream/internals/KStreamWindowAggregate.java      |  4 ++--
 .../kafka/streams/kstream/internals/KTableAggregate.java       |  4 ++--
 .../apache/kafka/streams/kstream/internals/KTableFilter.java   |  2 +-
 .../streams/kstream/internals/KTableKTableJoinMerger.java      | 10 +++++++---
 .../kafka/streams/kstream/internals/KTableMapValues.java       |  2 +-
 .../apache/kafka/streams/kstream/internals/KTableReduce.java   |  4 ++--
 .../apache/kafka/streams/kstream/internals/KTableSource.java   |  2 +-
 .../kafka/streams/kstream/internals/KTableTransformValues.java |  2 +-
 .../apache/kafka/streams/kstream/internals/TupleForwarder.java |  8 +-------
 13 files changed, 27 insertions(+), 37 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java
index f30ab79..4065ced 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java
@@ -22,13 +22,11 @@ import org.apache.kafka.streams.processor.internals.ProcessorNode;
 
 class ForwardingCacheFlushListener<K, V> implements CacheFlushListener<K, V>
{
     private final InternalProcessorContext context;
-    private final boolean sendOldValues;
     private final ProcessorNode myNode;
 
-    ForwardingCacheFlushListener(final ProcessorContext context, final boolean sendOldValues)
{
+    ForwardingCacheFlushListener(final ProcessorContext context) {
         this.context = (InternalProcessorContext) context;
         myNode = this.context.currentNode();
-        this.sendOldValues = sendOldValues;
     }
 
     @Override
@@ -36,11 +34,7 @@ class ForwardingCacheFlushListener<K, V> implements CacheFlushListener<K,
V> {
         final ProcessorNode prev = context.currentNode();
         context.setCurrentNode(myNode);
         try {
-            if (sendOldValues) {
-                context.forward(key, new Change<>(newValue, oldValue));
-            } else {
-                context.forward(key, new Change<>(newValue, null));
-            }
+            context.forward(key, new Change<>(newValue, oldValue));
         } finally {
             context.setCurrentNode(prev);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 1b3a8f4..648c50b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -54,8 +54,8 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
     private class KStreamAggregateProcessor extends AbstractProcessor<K, V> {
 
         private KeyValueStore<K, T> store;
-        private TupleForwarder<K, T> tupleForwarder;
         private StreamsMetricsImpl metrics;
+        private TupleForwarder<K, T> tupleForwarder;
 
         @SuppressWarnings("unchecked")
         @Override
@@ -63,7 +63,7 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
             store = (KeyValueStore<K, T>) context.getStateStore(storeName);
-            tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K,
V>(context, sendOldValues), sendOldValues);
+            tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K,
V>(context), sendOldValues);
         }
 
 
@@ -92,7 +92,7 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
 
             // update the store with the new value
             store.put(key, newAgg);
-            tupleForwarder.maybeForward(key, newAgg, oldAgg);
+            tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null);
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index 9f404ea..09e4fab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -61,7 +61,7 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K,
K, V,
             metrics = (StreamsMetricsImpl) context.metrics();
 
             store = (KeyValueStore<K, V>) context.getStateStore(storeName);
-            tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K,
V>(context, sendOldValues), sendOldValues);
+            tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K,
V>(context), sendOldValues);
         }
 
 
@@ -89,7 +89,7 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K,
K, V,
 
             // update the store with the new value
             store.put(key, newAgg);
-            tupleForwarder.maybeForward(key, newAgg, oldAgg);
+            tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null);
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index b89399b..13f4a6e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -92,7 +92,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
             lateRecordDropSensor = Sensors.lateRecordDropSensor(internalProcessorContext);
 
             store = (SessionStore<K, Agg>) context.getStateStore(storeName);
-            tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K,
V>(context, sendOldValues), sendOldValues);
+            tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K,
V>(context), sendOldValues);
         }
 
         @Override
@@ -135,7 +135,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements
KStreamAggProce
                 if (!mergedWindow.equals(newSessionWindow)) {
                     for (final KeyValue<Windowed<K>, Agg> session : merged) {
                         store.remove(session.key);
-                        tupleForwarder.maybeForward(session.key, null, session.value);
+                        tupleForwarder.maybeForward(session.key, null, sendOldValues ? session.value
: null);
                     }
                 }
 
@@ -151,10 +151,8 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements
KStreamAggProce
                 lateRecordDropSensor.record();
             }
         }
-
     }
 
-
     private SessionWindow mergeSessionWindow(final SessionWindow one, final SessionWindow
two) {
         final long start = one.start() < two.start() ? one.start() : two.start();
         final long end = one.end() > two.end() ? one.end() : two.end();
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index f292515..0edbe4e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -87,7 +87,7 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements
KStr
             lateRecordDropSensor = Sensors.lateRecordDropSensor(internalProcessorContext);
 
             windowStore = (WindowStore<K, Agg>) context.getStateStore(storeName);
-            tupleForwarder = new TupleForwarder<>(windowStore, context, new ForwardingCacheFlushListener<Windowed<K>,
V>(context, sendOldValues), sendOldValues);
+            tupleForwarder = new TupleForwarder<>(windowStore, context, new ForwardingCacheFlushListener<Windowed<K>,
V>(context), sendOldValues);
         }
 
         @Override
@@ -122,7 +122,7 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window>
implements KStr
 
                     // update the store with the new value
                     windowStore.put(key, newAgg, windowStart);
-                    tupleForwarder.maybeForward(new Windowed<>(key, entry.getValue()),
newAgg, oldAgg);
+                    tupleForwarder.maybeForward(new Windowed<>(key, entry.getValue()),
newAgg, sendOldValues ? oldAgg : null);
                 } else {
                     log.debug(
                         "Skipping record for expired window. key=[{}] topic=[{}] partition=[{}]
offset=[{}] timestamp=[{}] window=[{},{}) expiration=[{}]",
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index b60f9ab..b04a729 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -62,7 +62,7 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K,
V, T
         public void init(final ProcessorContext context) {
             super.init(context);
             store = (KeyValueStore<K, T>) context.getStateStore(storeName);
-            tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K,
V>(context, sendOldValues), sendOldValues);
+            tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K,
V>(context), sendOldValues);
         }
 
         /**
@@ -95,7 +95,7 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K,
V, T
 
             // update the store with the new value
             store.put(key, newAgg);
-            tupleForwarder.maybeForward(key, newAgg, oldAgg);
+            tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null);
         }
 
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index 75fba99..d1e524c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -70,7 +70,7 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K,
V, V> {
             super.init(context);
             if (queryableName != null) {
                 store = (KeyValueStore<K, V>) context.getStateStore(queryableName);
-                tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K,
V>(context, sendOldValues), sendOldValues);
+                tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K,
V>(context), sendOldValues);
             }
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
index 2ed70bd..78c1dc6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
@@ -89,7 +89,7 @@ class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K,
V, V> {
             if (queryableName != null) {
                 store = (KeyValueStore<K, V>) context.getStateStore(queryableName);
                 tupleForwarder = new TupleForwarder<>(store, context,
-                    new ForwardingCacheFlushListener<K, V>(context, sendOldValues),
+                    new ForwardingCacheFlushListener<K, V>(context),
                     sendOldValues);
             }
         }
@@ -98,9 +98,13 @@ class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K,
V, V> {
         public void process(final K key, final Change<V> value) {
             if (queryableName != null) {
                 store.put(key, value.newValue);
-                tupleForwarder.maybeForward(key, value.newValue, value.oldValue);
+                tupleForwarder.maybeForward(key, value.newValue, sendOldValues ? value.oldValue
: null);
             } else {
-                context().forward(key, value);
+                if (sendOldValues) {
+                    context().forward(key, value);
+                } else {
+                    context().forward(key, new Change<>(value.newValue, null));
+                }
             }
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index c2c84d5..2317947 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -90,7 +90,7 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K,
V, V1> {
             super.init(context);
             if (queryableName != null) {
                 store = (KeyValueStore<K, V1>) context.getStateStore(queryableName);
-                tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K,
V1>(context, sendOldValues), sendOldValues);
+                tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K,
V1>(context), sendOldValues);
             }
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
index 069b360..38c5a11 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -57,7 +57,7 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K,
V, V> {
         public void init(final ProcessorContext context) {
             super.init(context);
             store = (KeyValueStore<K, V>) context.getStateStore(storeName);
-            tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K,
V>(context, sendOldValues), sendOldValues);
+            tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K,
V>(context), sendOldValues);
         }
 
         /**
@@ -89,7 +89,7 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K,
V, V> {
 
             // update the store with the new value
             store.put(key, newAgg);
-            tupleForwarder.maybeForward(key, newAgg, oldAgg);
+            tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null);
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
index 274d96e..6fc57bc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -77,7 +77,7 @@ public class KTableSource<K, V> implements ProcessorSupplier<K,
V> {
             metrics = (StreamsMetricsImpl) context.metrics();
             if (queryableName != null) {
                 store = (KeyValueStore<K, V>) context.getStateStore(queryableName);
-                tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K,
V>(context, sendOldValues), sendOldValues);
+                tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K,
V>(context), sendOldValues);
             }
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
index b3e84d7..88cea4f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
@@ -91,7 +91,7 @@ class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K,
V, V
             valueTransformer.init(new ForwardingDisabledProcessorContext(context));
 
             if (queryableName != null) {
-                final ForwardingCacheFlushListener<K, V1> flushListener = new ForwardingCacheFlushListener<>(context,
sendOldValues);
+                final ForwardingCacheFlushListener<K, V1> flushListener = new ForwardingCacheFlushListener<>(context);
                 store = (KeyValueStore<K, V1>) context.getStateStore(queryableName);
                 tupleForwarder = new TupleForwarder<>(store, context, flushListener,
sendOldValues);
             }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
index ff3ef44..aec0d16 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
@@ -32,7 +32,6 @@ import org.apache.kafka.streams.state.internals.WrappedStateStore;
 class TupleForwarder<K, V> {
     private final CachedStateStore cachedStateStore;
     private final ProcessorContext context;
-    private final boolean sendOldValues;
 
     @SuppressWarnings("unchecked")
     TupleForwarder(final StateStore store,
@@ -41,7 +40,6 @@ class TupleForwarder<K, V> {
                    final boolean sendOldValues) {
         this.cachedStateStore = cachedStateStore(store);
         this.context = context;
-        this.sendOldValues = sendOldValues;
         if (this.cachedStateStore != null) {
             cachedStateStore.setFlushListener(flushListener, sendOldValues);
         }
@@ -61,11 +59,7 @@ class TupleForwarder<K, V> {
                              final V newValue,
                              final V oldValue) {
         if (cachedStateStore == null) {
-            if (sendOldValues) {
-                context.forward(key, new Change<>(newValue, oldValue));
-            } else {
-                context.forward(key, new Change<>(newValue, null));
-            }
+            context.forward(key, new Change<>(newValue, oldValue));
         }
     }
 }


Mime
View raw message