kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3887: KAFKA-3817 follow-up to avoid forwarding value if it is null in KTableRepartition
Date Fri, 08 Jul 2016 15:36:43 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 ded91fbce -> 4c502ed83


KAFKA-3887: KAFKA-3817 follow-up to avoid forwarding value if it is null in KTableRepartition

Also handle Null value in SmokeTestUtil.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Damian Guy <damian.guy@gmail.com>

Closes #1597 from guozhangwang/KHotfix-check-null

(cherry picked from commit 730bf9a37a08b2ca41dcda52d2c70e92e85980f7)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4c502ed8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4c502ed8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4c502ed8

Branch: refs/heads/0.10.0
Commit: 4c502ed83dc4ec3455875e6b13486719d689211e
Parents: ded91fb
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Fri Jul 8 08:36:20 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Jul 8 08:36:38 2016 -0700

----------------------------------------------------------------------
 .../kafka/streams/kstream/internals/KTableRepartitionMap.java   | 5 +++--
 .../java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java  | 2 +-
 2 files changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4c502ed8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index bba1857..ac7c00e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -76,8 +76,9 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
             if (key == null)
                 throw new StreamsException("Record key for the grouping KTable should not
be null.");
 
-            KeyValue<K1, V1> newPair = mapper.apply(key, change.newValue);
-            KeyValue<K1, V1> oldPair = mapper.apply(key, change.oldValue);
+            // if the value is null, we do not need to forward its selected key-value further
+            KeyValue<K1, V1> newPair = change.newValue == null ? null : mapper.apply(key,
change.newValue);
+            KeyValue<K1, V1> oldPair = change.oldValue == null ? null : mapper.apply(key,
change.oldValue);
 
             // if the selected repartition key or value is null, skip
             if (newPair != null && newPair.key != null && newPair.value !=
null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/4c502ed8/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
index b0d7a0b..f1c237e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
@@ -87,7 +87,7 @@ public class SmokeTestUtil {
             return new KeyValueMapper<String, Long, KeyValue<String, Long>>()
{
                 @Override
                 public KeyValue<String, Long> apply(String key, Long value) {
-                    return new KeyValue<>(Long.toString(value), 1L);
+                    return new KeyValue<>(value == null ? null : Long.toString(value),
1L);
                 }
             };
         }


Mime
View raw message