kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5047: NullPointerException while using GlobalKTable in KafkaStreams
Date Thu, 20 Apr 2017 21:24:50 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 14ab3dcc6 -> 3c471d25b


KAFKA-5047: NullPointerException while using GlobalKTable in KafkaStreams

Skip null keys when initializing GlobalKTables. This is inline with what happens during normal
processing.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Michael G. Noll, Eno Thereska, Matthias J. Sax, Guozhang Wang

Closes #2834 from dguy/kafka-5047


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3c471d25
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3c471d25
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3c471d25

Branch: refs/heads/trunk
Commit: 3c471d25b97b9a867ec6a2a1ed9467b8e4ba6b66
Parents: 14ab3dc
Author: Damian Guy <damian.guy@gmail.com>
Authored: Thu Apr 20 14:24:47 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Apr 20 14:24:47 2017 -0700

----------------------------------------------------------------------
 .../kafka/streams/kstream/GlobalKTable.java     |  2 +
 .../apache/kafka/streams/kstream/KTable.java    |  2 +
 .../internals/GlobalStateManagerImpl.java       |  4 +-
 .../internals/StoreChangelogReader.java         |  4 +-
 .../internals/GlobalStateManagerImplTest.java   | 23 ++++++++++
 .../processor/internals/StateRestorerTest.java  |  2 +-
 .../internals/StoreChangelogReaderTest.java     | 47 ++++++++++++++------
 .../apache/kafka/test/MockRestoreCallback.java  |  9 +++-
 8 files changed, 75 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3c471d25/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
index 4ec2ff9..d4be415 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
@@ -54,6 +54,8 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
  *}</pre>
  * Note that in contrast to {@link KTable} a {@code GlobalKTable}'s state holds a full copy
of the underlying topic,
  * thus all keys can be queried locally.
+ * <p>
+ * Records from the source topic that have null keys are dropped.
  *
  * @param <K> Type of primary keys
  * @param <V> Type of value changes

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c471d25/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 2833a01..290142b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -51,6 +51,8 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
  *     ReadOnlyKeyValueStore view = streams.store(queryableStoreName, QueryableStoreTypes.keyValueStore());
  *     view.get(key);
  *}</pre>
+ *<p>
+ * Records from the source topic that have null keys are dropped.
  *
  * @param <K> Type of primary keys
  * @param <V> Type of value changes

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c471d25/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index f11b200..f75c279 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -174,7 +174,9 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
                 final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
                 for (ConsumerRecord<byte[], byte[]> record : records) {
                     offset = record.offset() + 1;
-                    stateRestoreCallback.restore(record.key(), record.value());
+                    if (record.key() != null) {
+                        stateRestoreCallback.restore(record.key(), record.value());
+                    }
                 }
             }
             checkpointableOffsets.put(topicPartition, offset);

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c471d25/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 375992b..7a683f2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -212,7 +212,9 @@ public class StoreChangelogReader implements ChangelogReader {
             if (restorer.hasCompleted(offset, endOffset)) {
                 return offset;
             }
-            restorer.restore(record.key(), record.value());
+            if (record.key() != null) {
+                restorer.restore(record.key(), record.value());
+            }
         }
         return consumer.position(restorer.partition());
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c471d25/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 8f08051..5eb532e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -404,6 +404,29 @@ public class GlobalStateManagerImplTest {
         assertThat(updatedCheckpoint.get(t1), equalTo(101L));
     }
 
+    @Test
+    public void shouldSkipNullKeysWhenRestoring() throws Exception {
+        final HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
+        startOffsets.put(t1, 1L);
+        final HashMap<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(t1, 2L);
+        consumer.updatePartitions(t1.topic(), Collections.singletonList(new PartitionInfo(t1.topic(),
t1.partition(), null, null, null)));
+        consumer.assign(Collections.singletonList(t1));
+        consumer.updateEndOffsets(endOffsets);
+        consumer.updateBeginningOffsets(startOffsets);
+        consumer.addRecord(new ConsumerRecord<>(t1.topic(), t1.partition(), 1, (byte[])
null, "null".getBytes()));
+        final byte[] expectedKey = "key".getBytes();
+        final byte[] expectedValue = "value".getBytes();
+        consumer.addRecord(new ConsumerRecord<>(t1.topic(), t1.partition(), 2, expectedKey,
expectedValue));
+
+        stateManager.initialize(context);
+        final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
+        stateManager.register(store1, false, stateRestoreCallback);
+        final KeyValue<byte[], byte[]> restoredKv = stateRestoreCallback.restored.get(0);
+        assertThat(stateRestoreCallback.restored, equalTo(Collections.singletonList(KeyValue.pair(restoredKv.key,
restoredKv.value))));
+    }
+
+
     private Map<TopicPartition, Long> readOffsetsCheckpoint() throws IOException {
         final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(),
                                                                                 ProcessorStateManager.CHECKPOINT_FILE_NAME));

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c471d25/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
index a847a94..6968f33 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
@@ -33,7 +33,7 @@ public class StateRestorerTest {
     @Test
     public void shouldCallRestoreOnRestoreCallback() throws Exception {
         restorer.restore(new byte[0], new byte[0]);
-        assertThat(callback.restoreCount, equalTo(1));
+        assertThat(callback.restored.size(), equalTo(1));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c471d25/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index b42406f..2ff6d33 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -24,8 +24,11 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.test.MockRestoreCallback;
+import org.hamcrest.CoreMatchers;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -139,7 +142,7 @@ public class StoreChangelogReaderTest {
         changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE,
true));
 
         changelogReader.restore();
-        assertThat(callback.restoreCount, equalTo(messages));
+        assertThat(callback.restored.size(), equalTo(messages));
     }
 
     @Test
@@ -149,7 +152,7 @@ public class StoreChangelogReaderTest {
         changelogReader.register(new StateRestorer(topicPartition, callback, 5L, Long.MAX_VALUE,
true));
 
         changelogReader.restore();
-        assertThat(callback.restoreCount, equalTo(5));
+        assertThat(callback.restored.size(), equalTo(5));
     }
 
     @Test
@@ -169,7 +172,7 @@ public class StoreChangelogReaderTest {
         changelogReader.register(restorer);
 
         changelogReader.restore();
-        assertThat(callback.restoreCount, equalTo(3));
+        assertThat(callback.restored.size(), equalTo(3));
         assertThat(restorer.restoredOffset(), equalTo(3L));
     }
 
@@ -189,9 +192,9 @@ public class StoreChangelogReaderTest {
 
         changelogReader.restore();
 
-        assertThat(callback.restoreCount, equalTo(10));
-        assertThat(callbackOne.restoreCount, equalTo(5));
-        assertThat(callbackTwo.restoreCount, equalTo(3));
+        assertThat(callback.restored.size(), equalTo(10));
+        assertThat(callbackOne.restored.size(), equalTo(5));
+        assertThat(callbackTwo.restored.size(), equalTo(3));
     }
 
     @Test
@@ -201,7 +204,7 @@ public class StoreChangelogReaderTest {
         changelogReader.register(restorer);
 
         changelogReader.restore();
-        assertThat(callback.restoreCount, equalTo(0));
+        assertThat(callback.restored.size(), equalTo(0));
         assertThat(restorer.restoredOffset(), equalTo(0L));
     }
 
@@ -214,7 +217,7 @@ public class StoreChangelogReaderTest {
         changelogReader.register(restorer);
 
         changelogReader.restore();
-        assertThat(callback.restoreCount, equalTo(0));
+        assertThat(callback.restored.size(), equalTo(0));
         assertThat(restorer.restoredOffset(), equalTo(endOffset));
     }
 
@@ -236,7 +239,30 @@ public class StoreChangelogReaderTest {
         assertThat(restoredOffsets, equalTo(Collections.<TopicPartition, Long>emptyMap()));
     }
 
+    @Test
+    public void shouldIgnoreNullKeysWhenRestoring() throws Exception {
+        assignPartition(3, topicPartition);
+        final byte[] bytes = new byte[0];
+        consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(),
0, bytes, bytes));
+        consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(),
1, (byte[]) null, bytes));
+        consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(),
2, bytes, bytes));
+        consumer.assign(Collections.singletonList(topicPartition));
+        changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE,
false));
+        changelogReader.restore();
+
+        assertThat(callback.restored, CoreMatchers.equalTo(Utils.mkList(KeyValue.pair(bytes,
bytes), KeyValue.pair(bytes, bytes))));
+    }
+
     private void setupConsumer(final long messages, final TopicPartition topicPartition)
{
+        assignPartition(messages, topicPartition);
+
+        for (int i = 0; i < messages; i++) {
+            consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(),
i, new byte[0], new byte[0]));
+        }
+        consumer.assign(Collections.<TopicPartition>emptyList());
+    }
+
+    private void assignPartition(final long messages, final TopicPartition topicPartition)
{
         consumer.updatePartitions(topicPartition.topic(),
                                   Collections.singletonList(
                                           new PartitionInfo(topicPartition.topic(),
@@ -247,11 +273,6 @@ public class StoreChangelogReaderTest {
         consumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 0L));
         consumer.updateEndOffsets(Collections.singletonMap(topicPartition, Math.max(0, messages)));
         consumer.assign(Collections.singletonList(topicPartition));
-
-        for (int i = 0; i < messages; i++) {
-            consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(),
i, new byte[0], new byte[0]));
-        }
-        consumer.assign(Collections.<TopicPartition>emptyList());
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c471d25/streams/src/test/java/org/apache/kafka/test/MockRestoreCallback.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockRestoreCallback.java b/streams/src/test/java/org/apache/kafka/test/MockRestoreCallback.java
index a69c150..096fa11 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockRestoreCallback.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockRestoreCallback.java
@@ -16,13 +16,18 @@
  */
 package org.apache.kafka.test;
 
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 
+import java.util.ArrayList;
+import java.util.List;
+
 public class MockRestoreCallback implements StateRestoreCallback {
-    public int restoreCount = 0;
+    public List<KeyValue<byte[], byte[]>> restored = new ArrayList<>();
+
 
     @Override
     public void restore(final byte[] key, final byte[] value) {
-        restoreCount++;
+        restored.add(KeyValue.pair(key, value));
     }
 }


Mime
View raw message