kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-3500: Handle null keys and values in KafkaOffsetBackingStore.
Date Mon, 25 Jul 2016 21:25:24 GMT
Repository: kafka
Updated Branches:
  refs/heads/kafka-3500-kafka-offset-backing-store-null [created] 55e1fc140


KAFKA-3500: Handle null keys and values in KafkaOffsetBackingStore.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/55e1fc14
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/55e1fc14
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/55e1fc14

Branch: refs/heads/kafka-3500-kafka-offset-backing-store-null
Commit: 55e1fc1408da05bb443c89a758f3c4f0e09056d6
Parents: aebab7c
Author: Ewen Cheslack-Postava <me@ewencp.org>
Authored: Mon Jul 25 13:57:41 2016 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Mon Jul 25 13:57:41 2016 -0700

----------------------------------------------------------------------
 .../storage/KafkaOffsetBackingStore.java        |  7 +-
 .../storage/KafkaOffsetBackingStoreTest.java    | 71 ++++++++++++++++++++
 2 files changed, 76 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/55e1fc14/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index e8984fb..9219986 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -118,8 +118,11 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
     public Future<Void> set(final Map<ByteBuffer, ByteBuffer> values, final Callback<Void>
callback) {
         SetCallbackFuture producerCallback = new SetCallbackFuture(values.size(), callback);
 
-        for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet())
-            offsetLog.send(entry.getKey().array(), entry.getValue().array(), producerCallback);
+        for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet()) {
+            ByteBuffer key = entry.getKey();
+            ByteBuffer value = entry.getValue();
+            offsetLog.send(key == null ? null : key.array(), value == null ? null : value.array(),
producerCallback);
+        }
 
         return producerCallback;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/55e1fc14/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
index 38e0f7b..4a244f0 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
@@ -53,6 +53,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -271,6 +272,76 @@ public class KafkaOffsetBackingStoreTest {
     }
 
     @Test
+    public void testGetSetNull() throws Exception {
+        expectConfigure();
+        expectStart(Collections.EMPTY_LIST);
+
+        // Set offsets
+        Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture();
+        storeLog.send(EasyMock.isNull(byte[].class), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0));
+        PowerMock.expectLastCall();
+        Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture();
+        storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.isNull(byte[].class), EasyMock.capture(callback1));
+        PowerMock.expectLastCall();
+
+        // Second get() should get the produced data and return the new values
+        final Capture<Callback<Void>> secondGetReadToEndCallback = EasyMock.newCapture();
+        storeLog.readToEnd(EasyMock.capture(secondGetReadToEndCallback));
+        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC,
0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (byte[]) null, TP0_VALUE.array()));
+                capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC,
1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), (byte[]) null));
+                secondGetReadToEndCallback.getValue().onCompletion(null, null);
+                return null;
+            }
+        });
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        store.configure(DEFAULT_DISTRIBUTED_CONFIG);
+        store.start();
+
+        // Set offsets using null keys and values
+        Map<ByteBuffer, ByteBuffer> toSet = new HashMap<>();
+        toSet.put(null, TP0_VALUE);
+        toSet.put(TP1_KEY, null);
+        final AtomicBoolean invoked = new AtomicBoolean(false);
+        Future<Void> setFuture = store.set(toSet, new Callback<Void>() {
+            @Override
+            public void onCompletion(Throwable error, Void result) {
+                invoked.set(true);
+            }
+        });
+        assertFalse(setFuture.isDone());
+        // Out of order callbacks shouldn't matter, should still require all to be invoked
before invoking the callback
+        // for the store's set callback
+        callback1.getValue().onCompletion(null, null);
+        assertFalse(invoked.get());
+        callback0.getValue().onCompletion(null, null);
+        setFuture.get(10000, TimeUnit.MILLISECONDS);
+        assertTrue(invoked.get());
+
+        // Getting data should read to end of our published data and return it
+        final AtomicBoolean secondGetInvokedAndPassed = new AtomicBoolean(false);
+        store.get(Arrays.asList(null, TP1_KEY), new Callback<Map<ByteBuffer, ByteBuffer>>()
{
+            @Override
+            public void onCompletion(Throwable error, Map<ByteBuffer, ByteBuffer> result)
{
+                assertEquals(TP0_VALUE, result.get(null));
+                assertNull(result.get(TP1_KEY));
+                secondGetInvokedAndPassed.set(true);
+            }
+        }).get(10000, TimeUnit.MILLISECONDS);
+        assertTrue(secondGetInvokedAndPassed.get());
+
+        store.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testSetFailure() throws Exception {
         expectConfigure();
         expectStart(Collections.EMPTY_LIST);


Mime
View raw message