kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-3500: Handle null keys and values in KafkaOffsetBackingStore.
Date Wed, 27 Jul 2016 02:43:55 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 dc93f3bf4 -> 817f23768


KAFKA-3500: Handle null keys and values in KafkaOffsetBackingStore.

Author: Ewen Cheslack-Postava <me@ewencp.org>

Reviewers: Ismael Juma, Jason Gustafson, Gwen Shapira

Closes #1662 from ewencp/kafka-3500-kafka-offset-backing-store-null

(cherry picked from commit 4059f07216a07db0cdd88b46db40914069171838)
Signed-off-by: Gwen Shapira <cshapi@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/817f2376
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/817f2376
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/817f2376

Branch: refs/heads/0.10.0
Commit: 817f237683750c815036c09a0c0fa64d4f45bff7
Parents: dc93f3b
Author: Ewen Cheslack-Postava <me@ewencp.org>
Authored: Tue Jul 26 19:43:32 2016 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Tue Jul 26 19:43:49 2016 -0700

----------------------------------------------------------------------
 .../storage/KafkaOffsetBackingStore.java        |  7 +-
 .../storage/KafkaOffsetBackingStoreTest.java    | 71 ++++++++++++++++++++
 2 files changed, 76 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/817f2376/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index e8984fb..9219986 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -118,8 +118,11 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
     public Future<Void> set(final Map<ByteBuffer, ByteBuffer> values, final Callback<Void>
callback) {
         SetCallbackFuture producerCallback = new SetCallbackFuture(values.size(), callback);
 
-        for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet())
-            offsetLog.send(entry.getKey().array(), entry.getValue().array(), producerCallback);
+        for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet()) {
+            ByteBuffer key = entry.getKey();
+            ByteBuffer value = entry.getValue();
+            offsetLog.send(key == null ? null : key.array(), value == null ? null : value.array(),
producerCallback);
+        }
 
         return producerCallback;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/817f2376/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
index 38e0f7b..4a244f0 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
@@ -53,6 +53,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -271,6 +272,76 @@ public class KafkaOffsetBackingStoreTest {
     }
 
     @Test
+    public void testGetSetNull() throws Exception {
+        expectConfigure();
+        expectStart(Collections.EMPTY_LIST);
+
+        // Set offsets
+        Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture();
+        storeLog.send(EasyMock.isNull(byte[].class), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0));
+        PowerMock.expectLastCall();
+        Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture();
+        storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.isNull(byte[].class), EasyMock.capture(callback1));
+        PowerMock.expectLastCall();
+
+        // Second get() should get the produced data and return the new values
+        final Capture<Callback<Void>> secondGetReadToEndCallback = EasyMock.newCapture();
+        storeLog.readToEnd(EasyMock.capture(secondGetReadToEndCallback));
+        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC,
0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (byte[]) null, TP0_VALUE.array()));
+                capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC,
1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), (byte[]) null));
+                secondGetReadToEndCallback.getValue().onCompletion(null, null);
+                return null;
+            }
+        });
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        store.configure(DEFAULT_DISTRIBUTED_CONFIG);
+        store.start();
+
+        // Set offsets using null keys and values
+        Map<ByteBuffer, ByteBuffer> toSet = new HashMap<>();
+        toSet.put(null, TP0_VALUE);
+        toSet.put(TP1_KEY, null);
+        final AtomicBoolean invoked = new AtomicBoolean(false);
+        Future<Void> setFuture = store.set(toSet, new Callback<Void>() {
+            @Override
+            public void onCompletion(Throwable error, Void result) {
+                invoked.set(true);
+            }
+        });
+        assertFalse(setFuture.isDone());
+        // Out of order callbacks shouldn't matter, should still require all to be invoked
before invoking the callback
+        // for the store's set callback
+        callback1.getValue().onCompletion(null, null);
+        assertFalse(invoked.get());
+        callback0.getValue().onCompletion(null, null);
+        setFuture.get(10000, TimeUnit.MILLISECONDS);
+        assertTrue(invoked.get());
+
+        // Getting data should read to end of our published data and return it
+        final AtomicBoolean secondGetInvokedAndPassed = new AtomicBoolean(false);
+        store.get(Arrays.asList(null, TP1_KEY), new Callback<Map<ByteBuffer, ByteBuffer>>()
{
+            @Override
+            public void onCompletion(Throwable error, Map<ByteBuffer, ByteBuffer> result)
{
+                assertEquals(TP0_VALUE, result.get(null));
+                assertNull(result.get(TP1_KEY));
+                secondGetInvokedAndPassed.set(true);
+            }
+        }).get(10000, TimeUnit.MILLISECONDS);
+        assertTrue(secondGetInvokedAndPassed.get());
+
+        store.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testSetFailure() throws Exception {
         expectConfigure();
         expectStart(Collections.EMPTY_LIST);


Mime
View raw message