kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3836: KStreamReduce and KTableReduce should not pass nulls to Deserializers
Date Wed, 06 Jul 2016 19:27:33 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 a462ebf2f -> ded91fbce


KAFKA-3836: KStreamReduce and KTableReduce should not pass nulls to Deserializers

Minor changes to check null changes.

Author: Jeyhun Karimov <je.karimov@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1591 from jeyhunkarimov/KAFKA-3836

(cherry picked from commit 7218648ae7ac1ce93f3ff25702b99c3af236fd0f)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ded91fbc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ded91fbc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ded91fbc

Branch: refs/heads/0.10.0
Commit: ded91fbce5c58e5bcc5a178e65a8652f88e1493d
Parents: a462ebf
Author: Jeyhun Karimov <je.karimov@gmail.com>
Authored: Wed Jul 6 12:27:10 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Jul 6 12:27:29 2016 -0700

----------------------------------------------------------------------
 .../internals/InMemoryKeyValueLoggedStore.java  |  8 ++++++--
 .../streams/state/internals/RocksDBStore.java   | 21 ++++++++++++++------
 2 files changed, 21 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ded91fbc/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
index efcdac7..084a85e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
@@ -65,8 +65,12 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K,
V> {
             @Override
             public void restore(byte[] key, byte[] value) {
 
-                // directly call inner functions so that the operation is not logged
-                inner.put(serdes.keyFrom(key), serdes.valueFrom(value));
+                // directly call inner functions so that the operation is not logged. Check
value for null, to avoid  deserialization error.
+                if (value == null) {
+                    inner.put(serdes.keyFrom(key), null);
+                } else {
+                    inner.put(serdes.keyFrom(key), serdes.valueFrom(value));
+                }
             }
         });
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ded91fbc/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 8f3bab0..d9b41cd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -237,17 +237,26 @@ public class RocksDBStore<K, V> implements KeyValueStore<K,
V> {
     public V get(K key) {
         if (cache != null) {
             RocksDBCacheEntry entry = cache.get(key);
-
             if (entry == null) {
-                V value = serdes.valueFrom(getInternal(serdes.rawKey(key)));
-                cache.put(key, new RocksDBCacheEntry(value));
-
-                return value;
+                byte[] byteValue = getInternal(serdes.rawKey(key));
+                //Check value for null, to avoid  deserialization error
+                if (byteValue == null) {
+                    return null;
+                } else {
+                    V value = serdes.valueFrom(byteValue);
+                    cache.put(key, new RocksDBCacheEntry(value));
+                    return value;
+                }
             } else {
                 return entry.value;
             }
         } else {
-            return serdes.valueFrom(getInternal(serdes.rawKey(key)));
+            byte[] byteValue = getInternal(serdes.rawKey(key));
+            if (byteValue == null) {
+                return null;
+            } else {
+                return serdes.valueFrom(byteValue);
+            }
         }
     }
 


Mime
View raw message