kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: MINOR: add null check for aggregate and reduce operators
Date Fri, 01 Apr 2016 20:15:02 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 6badd89ad -> fd07af3fd


MINOR: add null check for aggregate and reduce operators

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Yasuhiro Matsuda, Gwen Shapira

Closes #1175 from guozhangwang/KSNullPointerException

(cherry picked from commit ae939467e8aec38f47e2474e74e7ab7ea29c2840)
Signed-off-by: Gwen Shapira <cshapi@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fd07af3f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fd07af3f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fd07af3f

Branch: refs/heads/0.10.0
Commit: fd07af3fda837a79c5c754575edb7ad25c36666d
Parents: 6badd89
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Fri Apr 1 13:14:47 2016 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Fri Apr 1 13:14:58 2016 -0700

----------------------------------------------------------------------
 .../kafka/streams/kstream/internals/KStreamAggregate.java       | 5 +++++
 .../apache/kafka/streams/kstream/internals/KStreamReduce.java   | 5 +++++
 .../apache/kafka/streams/kstream/internals/KTableAggregate.java | 5 +++++
 .../apache/kafka/streams/kstream/internals/KTableReduce.java    | 5 +++++
 4 files changed, 20 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fd07af3f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index f41bfa6..871a12d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.processor.AbstractProcessor;
@@ -62,6 +63,10 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
 
         @Override
         public void process(K key, V value) {
+            // the keys should never be null
+            if (key == null)
+                throw new StreamsException("Record key for KStream aggregate operator with
state " + storeName + " should not be null.");
+
             T oldAgg = store.get(key);
 
             if (oldAgg == null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/fd07af3f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index 0ec0465..e37fe34 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -59,6 +60,10 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K,
K, V,
 
         @Override
         public void process(K key, V value) {
+            // the keys should never be null
+            if (key == null)
+                throw new StreamsException("Record key for KStream reduce operator with state
" + storeName + " should not be null.");
+
             V oldAgg = store.get(key);
             V newAgg = oldAgg;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fd07af3f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index 6ce776a..806c6e9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.processor.AbstractProcessor;
@@ -64,6 +65,10 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K,
V, T
 
         @Override
         public void process(K key, Change<V> value) {
+            // the keys should never be null
+            if (key == null)
+                throw new StreamsException("Record key for KTable aggregate operator with
state " + storeName + " should not be null.");
+
             T oldAgg = store.get(key);
 
             if (oldAgg == null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/fd07af3f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
index 0d1b55a..d56b3ae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -61,6 +62,10 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K,
V, V> {
 
         @Override
         public void process(K key, Change<V> value) {
+            // the keys should never be null
+            if (key == null)
+                throw new StreamsException("Record key for KTable reduce operator with state
" + storeName + " should not be null.");
+
             V oldAgg = store.get(key);
             V newAgg = oldAgg;
 


Mime
View raw message