kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3902: Optimize KTable.filter in Streams DSL to avoid forwarding if both old and new values are null
Date Fri, 01 Jul 2016 23:47:27 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 29c3c4ba0 -> 7b01f848a


KAFKA-3902: Optimize KTable.filter in Streams DSL to avoid forwarding if both old and new
values are null

The contribution is my original work and that I license the work to the project under the
project's open source license.

Contributors: Guozhang Wang, Phil Derome
guozhangwang

Added checkEmpty to validate processor does nothing  and added a inhibit check for filter
to fix issue.

Author: Philippe Derome <phderome@gmail.com>
Author: Phil Derome <phderome@gmail.com>
Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1556 from phderome/DEROME-3902

(cherry picked from commit 2098529b44cad78731e478aa8af2b49e9c94db7d)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7b01f848
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7b01f848
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7b01f848

Branch: refs/heads/0.10.0
Commit: 7b01f848a03d7f93a89012bf58fe40fb96c42247
Parents: 29c3c4b
Author: Philippe Derome <phderome@gmail.com>
Authored: Fri Jul 1 16:47:10 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Jul 1 16:47:23 2016 -0700

----------------------------------------------------------------------
 .../streams/kstream/internals/KTableFilter.java |  3 ++
 .../kstream/internals/KTableFilterTest.java     | 39 +++++++++++++++++++-
 .../kafka/test/MockProcessorSupplier.java       |  6 +++
 3 files changed, 46 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7b01f848/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index 080fd9d..ff0c67f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -77,6 +77,9 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K,
V, V> {
             V newValue = computeValue(key, change.newValue);
             V oldValue = sendOldValues ? computeValue(key, change.oldValue) : null;
 
+            if (sendOldValues && oldValue == null && newValue == null)
+                return; // unnecessary to forward here.
+
             context().forward(key, new Change<>(newValue, oldValue));
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7b01f848/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index a3af133..e328bae 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -24,6 +24,8 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockReducer;
+import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -254,7 +256,7 @@ public class KTableFilterTest {
         driver.process(topic1, "C", 1);
 
         proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
-        proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
+        proc2.checkEmptyAndClearProcessResult();
 
         driver.process(topic1, "A", 2);
         driver.process(topic1, "B", 2);
@@ -271,7 +273,40 @@ public class KTableFilterTest {
         driver.process(topic1, "B", null);
 
         proc1.checkAndClearProcessResult("A:(null<-3)", "B:(null<-2)");
-        proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-2)");
+        proc2.checkAndClearProcessResult("B:(null<-2)");
     }
 
+    @Test
+    public void testSkipNullOnMaterialization() throws IOException {
+        // Do not explicitly set enableSendingOldValues. Let a further downstream stateful
operator trigger it instead.
+        KStreamBuilder builder = new KStreamBuilder();
+
+        String topic1 = "topic1";
+
+        KTableImpl<String, String, String> table1 =
+            (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde,
topic1);
+        KTableImpl<String, String, String> table2 = (KTableImpl<String, String,
String>) table1.filter(
+            new Predicate<String, String>() {
+                @Override
+                public boolean test(String key, String value) {
+                    return value.equalsIgnoreCase("accept");
+                }
+            }).groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
+            .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, "mock-result");
+
+        MockProcessorSupplier<String, String> proc1 = new MockProcessorSupplier<>();
+        MockProcessorSupplier<String, String> proc2 = new MockProcessorSupplier<>();
+
+        builder.addProcessor("proc1", proc1, table1.name);
+        builder.addProcessor("proc2", proc2, table2.name);
+
+        driver = new KStreamTestDriver(builder, stateDir, stringSerde, stringSerde);
+
+        driver.process(topic1, "A", "reject");
+        driver.process(topic1, "B", "reject");
+        driver.process(topic1, "C", "reject");
+
+        proc1.checkAndClearProcessResult("A:(reject<-null)", "B:(reject<-null)", "C:(reject<-null)");
+        proc2.checkEmptyAndClearProcessResult();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7b01f848/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
index 9cf0eb2..67d25f5 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
@@ -82,6 +82,12 @@ public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K,
V> {
         processed.clear();
     }
 
+    public void checkEmptyAndClearProcessResult() {
+
+        assertEquals("the number of outputs:", 0, processed.size());
+        processed.clear();
+    }
+
     public void checkAndClearPunctuateResult(long... expected) {
         assertEquals("the number of outputs:", expected.length, punctuated.size());
 


Mime
View raw message