kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: HOTFIX: fix for standby tasks using batching restore
Date Mon, 07 Aug 2017 16:54:48 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk f15cdc73d -> 1f0ad0121


HOTFIX: fix for standby tasks using batching restore

Author: Bill Bejeck <bill@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #3625 from bbejeck/HOTFIX_need_to_correct_stanby_task_restoration_to_use_new_restore_api


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1f0ad012
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1f0ad012
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1f0ad012

Branch: refs/heads/trunk
Commit: 1f0ad0121e03a2e4f6c0cf3551b14c75085bd34f
Parents: f15cdc7
Author: Bill Bejeck <bill@confluent.io>
Authored: Mon Aug 7 09:54:43 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Aug 7 09:54:43 2017 -0700

----------------------------------------------------------------------
 .../internals/ProcessorStateManager.java        | 27 ++++++--
 .../internals/CompositeRestoreListenerTest.java | 26 ++------
 .../internals/ProcessorStateManagerTest.java    | 69 +++++++++++++++++++-
 .../test/MockBatchingStateRestoreListener.java  | 48 ++++++++++++++
 4 files changed, 139 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1f0ad012/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 9f45ddd..623fd3b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -18,9 +18,11 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
@@ -204,19 +206,16 @@ public class ProcessorStateManager implements StateManager {
                                                              final List<ConsumerRecord<byte[],
byte[]>> records) {
         final long limit = offsetLimit(storePartition);
         List<ConsumerRecord<byte[], byte[]>> remainingRecords = null;
+        final List<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<>();
 
         // restore states from changelog records
-        final StateRestoreCallback restoreCallback = restoreCallbacks.get(storePartition.topic());
+        final BatchingStateRestoreCallback restoreCallback = getBatchingRestoreCallback(restoreCallbacks.get(storePartition.topic()));
 
         long lastOffset = -1L;
         int count = 0;
         for (final ConsumerRecord<byte[], byte[]> record : records) {
             if (record.offset() < limit) {
-                try {
-                    restoreCallback.restore(record.key(), record.value());
-                } catch (final Exception e) {
-                    throw new ProcessorStateException(String.format("%s exception caught
while trying to restore state from %s", logPrefix, storePartition), e);
-                }
+                restoreRecords.add(KeyValue.pair(record.key(), record.value()));
                 lastOffset = record.offset();
             } else {
                 if (remainingRecords == null) {
@@ -228,6 +227,14 @@ public class ProcessorStateManager implements StateManager {
             count++;
         }
 
+        if (!restoreRecords.isEmpty()) {
+            try {
+                restoreCallback.restoreAll(restoreRecords);
+            } catch (final Exception e) {
+                throw new ProcessorStateException(String.format("%s exception caught while
trying to restore state from %s", logPrefix, storePartition), e);
+            }
+        }
+
         // record the restored offset for its change log partition
         restoredOffsets.put(storePartition, lastOffset + 1);
 
@@ -358,4 +365,12 @@ public class ProcessorStateManager implements StateManager {
     public StateStore getGlobalStore(final String name) {
         return globalStores.get(name);
     }
+
+    private BatchingStateRestoreCallback getBatchingRestoreCallback(StateRestoreCallback
callback) {
+        if (callback instanceof BatchingStateRestoreCallback) {
+            return (BatchingStateRestoreCallback) callback;
+        }
+
+        return new WrappedBatchingStateRestoreCallback(callback);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f0ad012/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
index 4a7877a..88aba94 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.test.MockBatchingStateRestoreListener;
 import org.apache.kafka.test.MockStateRestoreListener;
 import org.junit.Test;
 
@@ -39,9 +40,7 @@ import static org.junit.Assert.assertTrue;
 public class CompositeRestoreListenerTest {
 
     private final MockStateRestoreCallback stateRestoreCallback = new MockStateRestoreCallback();
-    private final MockBatchingStateRestoreCallback
-        batchingStateRestoreCallback =
-        new MockBatchingStateRestoreCallback();
+    private final MockBatchingStateRestoreListener batchingStateRestoreCallback = new MockBatchingStateRestoreListener();
     private final MockNoListenBatchingStateRestoreCallback
         noListenBatchingStateRestoreCallback =
         new MockNoListenBatchingStateRestoreCallback();
@@ -71,7 +70,7 @@ public class CompositeRestoreListenerTest {
     public void shouldRestoreInBatchMode() {
         setUpCompositeRestoreListener(batchingStateRestoreCallback);
         compositeRestoreListener.restoreAll(records);
-        assertThat(batchingStateRestoreCallback.restoredRecords, is(records));
+        assertThat(batchingStateRestoreCallback.getRestoredRecords(), is(records));
     }
 
     @Test
@@ -132,7 +131,7 @@ public class CompositeRestoreListenerTest {
         compositeRestoreListener.onBatchRestored(topicPartition, storeName, batchOffset,
numberRestored);
         compositeRestoreListener.onRestoreEnd(topicPartition, storeName, numberRestored);
 
-        assertThat(batchingStateRestoreCallback.restoredRecords, is(records));
+        assertThat(batchingStateRestoreCallback.getRestoredRecords(), is(records));
         assertStateRestoreOnEndNotification(batchingStateRestoreCallback);
     }
 
@@ -196,23 +195,6 @@ public class CompositeRestoreListenerTest {
         }
     }
 
-    private static class MockBatchingStateRestoreCallback extends MockStateRestoreListener
-        implements BatchingStateRestoreCallback {
-
-        Collection<KeyValue<byte[], byte[]>> restoredRecords;
-
-        @Override
-        public void restoreAll(final Collection<KeyValue<byte[], byte[]>> records)
{
-            restoredRecords = records;
-        }
-
-        @Override
-        public void restore(final byte[] key, final byte[] value) {
-            throw new IllegalStateException("Should not be called");
-
-        }
-    }
-
     private static class MockNoListenBatchingStateRestoreCallback implements BatchingStateRestoreCallback
{
 
         Collection<KeyValue<byte[], byte[]>> restoredRecords;

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f0ad012/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index f454216..369987e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -21,10 +21,12 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
+import org.apache.kafka.test.MockBatchingStateRestoreListener;
 import org.apache.kafka.test.MockChangelogReader;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.TestUtils;
@@ -36,6 +38,7 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileLock;
+import java.nio.charset.Charset;
 import java.nio.file.StandardOpenOption;
 import java.util.Collections;
 import java.util.HashMap;
@@ -44,6 +47,7 @@ import java.util.Set;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -68,6 +72,10 @@ public class ProcessorStateManagerTest {
     private final TaskId taskId = new TaskId(0, 1);
     private final MockChangelogReader changelogReader = new MockChangelogReader();
     private final MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(storeName,
true);
+    private final byte[] key = new byte[]{0x0, 0x0, 0x0, 0x1};
+    private final byte[] value = "the-value".getBytes(Charset.forName("UTF-8"));
+    private final ConsumerRecord<byte[], byte[]> consumerRecord = new ConsumerRecord<>(changelogTopic,
0, 0, key, value);
+
     private File baseDir;
     private File checkpointFile;
     private OffsetCheckpoint checkpoint;
@@ -88,11 +96,48 @@ public class ProcessorStateManagerTest {
     }
 
     @Test
+    public void shouldRestoreStoreWithBatchingRestoreSpecification() throws Exception {
+        final TaskId taskId = new TaskId(0, 2);
+        final MockBatchingStateRestoreListener batchingRestoreCallback = new MockBatchingStateRestoreListener();
+
+        final KeyValue<byte[], byte[]> expectedKeyValue = KeyValue.pair(key, value);
+
+        final MockStateStoreSupplier.MockStateStore persistentStore = getPersistentStore();
+        final ProcessorStateManager stateMgr = getStandByStateManager(taskId);
+
+        try {
+            stateMgr.register(persistentStore, true, batchingRestoreCallback);
+            stateMgr.updateStandbyStates(persistentStorePartition, Collections.singletonList(consumerRecord));
+            assertThat(batchingRestoreCallback.getRestoredRecords().size(), is(1));
+            assertTrue(batchingRestoreCallback.getRestoredRecords().contains(expectedKeyValue));
+        } finally {
+            stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
+        }
+    }
+
+    @Test
+    public void shouldRestoreStoreWithSinglePutRestoreSpecification() throws Exception {
+        final TaskId taskId = new TaskId(0, 2);
+        final Integer intKey = 1;
+
+        final MockStateStoreSupplier.MockStateStore persistentStore = getPersistentStore();
+        final ProcessorStateManager stateMgr = getStandByStateManager(taskId);
+
+        try {
+            stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
+            stateMgr.updateStandbyStates(persistentStorePartition, Collections.singletonList(consumerRecord));
+            assertThat(persistentStore.keys.size(), is(1));
+            assertTrue(persistentStore.keys.contains(intKey));
+        } finally {
+            stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
+        }
+    }
+
+    @Test
     public void testRegisterPersistentStore() throws IOException {
         final TaskId taskId = new TaskId(0, 2);
 
-        final MockStateStoreSupplier.MockStateStore persistentStore
-            = new MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent
store
+        final MockStateStoreSupplier.MockStateStore persistentStore = getPersistentStore();
         final ProcessorStateManager stateMgr = new ProcessorStateManager(
             taskId,
             noPartitions,
@@ -316,7 +361,6 @@ public class ProcessorStateManagerTest {
             false);
         stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
 
-
         stateMgr.checkpoint(Collections.singletonMap(persistentStorePartition, 10L));
         final Map<TopicPartition, Long> read = checkpoint.read();
         assertThat(read, equalTo(Collections.singletonMap(persistentStorePartition, 11L)));
@@ -509,4 +553,23 @@ public class ProcessorStateManagerTest {
         }
     }
 
+    private ProcessorStateManager getStandByStateManager(TaskId taskId) throws IOException
{
+        return new ProcessorStateManager(
+            taskId,
+            noPartitions,
+            true,
+            stateDirectory,
+            new HashMap<String, String>() {
+                {
+                    put(persistentStoreName, persistentStoreTopicName);
+                }
+            },
+            changelogReader,
+            false);
+    }
+
+    private MockStateStoreSupplier.MockStateStore getPersistentStore() {
+        return new MockStateStoreSupplier.MockStateStore("persistentStore", true);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f0ad012/streams/src/test/java/org/apache/kafka/test/MockBatchingStateRestoreListener.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockBatchingStateRestoreListener.java
b/streams/src/test/java/org/apache/kafka/test/MockBatchingStateRestoreListener.java
new file mode 100644
index 0000000..9e949b1
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/MockBatchingStateRestoreListener.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+public class MockBatchingStateRestoreListener extends MockStateRestoreListener implements
BatchingStateRestoreCallback {
+
+    private final Collection<KeyValue<byte[], byte[]>> restoredRecords = new
ArrayList<>();
+
+    @Override
+    public void restoreAll(final Collection<KeyValue<byte[], byte[]>> records)
{
+        restoredRecords.addAll(records);
+    }
+
+    @Override
+    public void restore(final byte[] key, final byte[] value) {
+        throw new IllegalStateException("Should not be called");
+
+    }
+
+    public void resetRestoredBatch() {
+        restoredRecords.clear();
+    }
+
+    public Collection<KeyValue<byte[], byte[]>> getRestoredRecords() {
+        return restoredRecords;
+    }
+}


Mime
View raw message