kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: KAFKA-5717; InMemoryKeyValueStore should delete keys with null values during restore
Date Wed, 09 Aug 2017 19:03:45 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 d05112125 -> a75027429


KAFKA-5717; InMemoryKeyValueStore should delete keys with null values during restore

Fixed a bug in the InMemoryKeyValueStore restoration where a key with a `null` value is written
in to the map rather than being deleted.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Bill Bejeck <bbejeck@gmail.com>, Guozhang Wang <wangguoz@gmail.com>

Closes #3650 from dguy/kafka-5717

(cherry picked from commit c35c4798139bc30e3a380311e45a22ba56fcc918)
Signed-off-by: Damian Guy <damian.guy@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a7502742
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a7502742
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a7502742

Branch: refs/heads/0.11.0
Commit: a75027429bf4eff786a62b1223d42db26c42ba20
Parents: d051121
Author: Damian Guy <damian.guy@gmail.com>
Authored: Wed Aug 9 20:03:28 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Wed Aug 9 20:03:40 2017 +0100

----------------------------------------------------------------------
 .../state/internals/InMemoryKeyValueStore.java  |  4 ++--
 .../internals/InMemoryKeyValueStoreTest.java    | 24 ++++++++++++++++++++
 2 files changed, 26 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a7502742/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index 41c6de3..7e24969 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -75,9 +75,9 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K,
V> {
             context.register(root, true, new StateRestoreCallback() {
                 @Override
                 public void restore(byte[] key, byte[] value) {
-                    // check value for null, to avoid  deserialization error.
+                    // this is a delete
                     if (value == null) {
-                        put(serdes.keyFrom(key), null);
+                        delete(serdes.keyFrom(key));
                     } else {
                         put(serdes.keyFrom(key), serdes.valueFrom(value));
                     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a7502742/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
index 222ec71..541c003 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
@@ -20,6 +20,11 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
 
 public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
 
@@ -42,4 +47,23 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest
{
         store.init(context, store);
         return store;
     }
+
+    @Test
+    public void shouldRemoveKeysWithNullValues() {
+        store.close();
+        // Add any entries that will be restored to any store
+        // that uses the driver's context ...
+        driver.addEntryToRestoreLog(0, "zero");
+        driver.addEntryToRestoreLog(1, "one");
+        driver.addEntryToRestoreLog(2, "two");
+        driver.addEntryToRestoreLog(3, "three");
+        driver.addEntryToRestoreLog(0, null);
+
+        store = createKeyValueStore(driver.context(), Integer.class, String.class, true);
+        context.restore(store.name(), driver.restoredEntries());
+
+        assertEquals(3, driver.sizeOf(store));
+
+        assertThat(store.get(0), nullValue());
+    }
 }


Mime
View raw message