kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: KAFKA-5363 (KIP-167): implementing bulk load, restoration event notification
Date Fri, 28 Jul 2017 18:30:59 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 4003c9384 -> c50c941af


http://git-wip-us.apache.org/repos/asf/kafka/blob/c50c941a/streams/src/test/java/org/apache/kafka/streams/processor/internals/WrappedBatchingStateRestoreCallbackTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/WrappedBatchingStateRestoreCallbackTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/WrappedBatchingStateRestoreCallbackTest.java
new file mode 100644
index 0000000..d04d1d5
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/WrappedBatchingStateRestoreCallbackTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
+import org.apache.kafka.test.MockRestoreCallback;
+import org.junit.Test;
+
+import java.nio.charset.Charset;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+public class WrappedBatchingStateRestoreCallbackTest {
+
+    private final MockRestoreCallback mockRestoreCallback = new MockRestoreCallback();
+    private final byte[] key = "key".getBytes(Charset.forName("UTF-8"));
+    private final byte[] value = "value".getBytes(Charset.forName("UTF-8"));
+    private final Collection<KeyValue<byte[], byte[]>> records = Collections.singletonList(KeyValue.pair(key,
value));
+    private final BatchingStateRestoreCallback wrappedBatchingStateRestoreCallback = new
WrappedBatchingStateRestoreCallback(mockRestoreCallback);
+
+    @Test
+    public void shouldRestoreSinglePutsFromArray() {
+        wrappedBatchingStateRestoreCallback.restoreAll(records);
+        assertThat(mockRestoreCallback.restored, is(records));
+        KeyValue<byte[], byte[]> record = mockRestoreCallback.restored.get(0);
+        assertThat(record.key, is(key));
+        assertThat(record.value, is(value));
+    }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/c50c941a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 90a142d..8786b15 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -17,34 +17,36 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.state.RocksDBConfigSetter;
 import org.apache.kafka.test.MockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
+import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.rocksdb.Options;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.io.IOException;
-import org.rocksdb.Options;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class RocksDBStoreTest {
     private final File tempDir = TestUtils.tempDirectory();
@@ -121,6 +123,38 @@ public class RocksDBStoreTest {
     }
 
     @Test
+    public void shouldTogglePrepareForBulkLoadDuringRestoreCalls() throws Exception {
+        final List<KeyValue<byte[], byte[]>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>("1".getBytes("UTF-8"), "a".getBytes("UTF-8")));
+        entries.add(new KeyValue<>("2".getBytes("UTF-8"), "b".getBytes("UTF-8")));
+        entries.add(new KeyValue<>("3".getBytes("UTF-8"), "c".getBytes("UTF-8")));
+
+        final AtomicReference<Exception> conditionNotMet = new AtomicReference<>();
+        final AtomicInteger conditionCheckCount = new AtomicInteger();
+
+        Thread conditionCheckThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                assertRocksDBTurnsOnBulkLoading(conditionCheckCount, conditionNotMet);
+
+                assertRockDBTurnsOffBulkLoad(conditionCheckCount, conditionNotMet);
+            }
+        });
+
+        subject.init(context, subject);
+
+        conditionCheckThread.start();
+        context.restore(subject.name(), entries);
+
+        conditionCheckThread.join(2000);
+
+        assertTrue(conditionNotMet.get() == null);
+        assertTrue(conditionCheckCount.get() == 2);
+    }
+
+
+
+    @Test
     public void shouldThrowNullPointerExceptionOnNullPut() {
         subject.init(context, subject);
         try {
@@ -173,6 +207,36 @@ public class RocksDBStoreTest {
         subject.flush();
     }
 
+    private void assertRockDBTurnsOffBulkLoad(AtomicInteger conditionCount,
+                                              AtomicReference<Exception> conditionNotMet)
{
+        try {
+            TestUtils.waitForCondition(new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    return !subject.isPrepareForBulkload();
+                }
+            }, 1000L, "Did not revert bulk load setting");
+            conditionCount.getAndIncrement();
+        } catch (Exception e) {
+            conditionNotMet.set(e);
+        }
+    }
+
+    private void assertRocksDBTurnsOnBulkLoading(AtomicInteger conditionCount,
+                                                 AtomicReference<Exception> conditionNotMet)
{
+        try {
+            TestUtils.waitForCondition(new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    return subject.isPrepareForBulkload();
+                }
+            }, 1000L, "Did not prepare for bulk load");
+            conditionCount.getAndIncrement();
+        } catch (Exception e) {
+            conditionNotMet.set(e);
+        }
+    }
+
     public static class MockRocksDbConfigSetter implements RocksDBConfigSetter {
         static boolean called;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c50c941a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 50d1ebf..62cb09c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -40,6 +40,7 @@ import org.apache.kafka.streams.state.ReadOnlyWindowStore;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -198,7 +199,7 @@ public class StreamThreadStateStoreProviderTest {
             Collections.singletonList(new TopicPartition(topicName, taskId.partition)),
             topology,
             clientSupplier.consumer,
-            new StoreChangelogReader(clientSupplier.restoreConsumer, Time.SYSTEM, 5000),
+            new StoreChangelogReader(clientSupplier.restoreConsumer, Time.SYSTEM, 5000, new
MockStateRestoreListener()),
             streamsConfig,
             new MockStreamsMetrics(new Metrics()),
             stateDirectory,

http://git-wip-us.apache.org/repos/asf/kafka/blob/c50c941a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index 515d35d..b34b201 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -24,22 +24,27 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.CompositeRestoreListener;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.processor.internals.RecordContext;
+import org.apache.kafka.streams.processor.internals.WrappedBatchingStateRestoreCallback;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -281,10 +286,20 @@ public class MockProcessorContext implements InternalProcessorContext,
RecordCol
     }
 
     public void restore(final String storeName, final Iterable<KeyValue<byte[], byte[]>>
changeLog) {
-        final StateRestoreCallback restoreCallback = restoreFuncs.get(storeName);
-        for (final KeyValue<byte[], byte[]> entry : changeLog) {
-            restoreCallback.restore(entry.key, entry.value);
+
+        final BatchingStateRestoreCallback restoreCallback = getBatchingRestoreCallback(restoreFuncs.get(storeName));
+        final StateRestoreListener restoreListener = getStateRestoreListener(restoreCallback);
+
+        restoreListener.onRestoreStart(null, storeName, 0L, 0L);
+
+        List<KeyValue<byte[], byte[]>> records = new ArrayList<>();
+        for (KeyValue<byte[], byte[]> keyValue : changeLog) {
+            records.add(keyValue);
         }
+
+        restoreCallback.restoreAll(records);
+
+        restoreListener.onRestoreEnd(null, storeName, 0L);
     }
 
     @Override
@@ -305,4 +320,21 @@ public class MockProcessorContext implements InternalProcessorContext,
RecordCol
     public void close() {
         metrics.close();
     }
+
+    private StateRestoreListener getStateRestoreListener(StateRestoreCallback restoreCallback)
{
+        if (restoreCallback instanceof StateRestoreListener) {
+            return (StateRestoreListener) restoreCallback;
+        }
+
+        return CompositeRestoreListener.NO_OP_STATE_RESTORE_LISTENER;
+    }
+
+    private BatchingStateRestoreCallback getBatchingRestoreCallback(StateRestoreCallback
restoreCallback) {
+        if (restoreCallback instanceof BatchingStateRestoreCallback) {
+            return (BatchingStateRestoreCallback) restoreCallback;
+        }
+
+        return new WrappedBatchingStateRestoreCallback(restoreCallback);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c50c941a/streams/src/test/java/org/apache/kafka/test/MockStateRestoreListener.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateRestoreListener.java b/streams/src/test/java/org/apache/kafka/test/MockStateRestoreListener.java
new file mode 100644
index 0000000..e7b328c
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/MockStateRestoreListener.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.test;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.AbstractNotifyingRestoreCallback;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class MockStateRestoreListener extends AbstractNotifyingRestoreCallback {
+
+    // verifies store name called for each state
+    public final Map<String, String> storeNameCalledStates = new HashMap<>();
+    public final List<KeyValue<byte[], byte[]>> restored = new ArrayList<>();
+    public long restoreStartOffset;
+    public long restoreEndOffset;
+    public long restoredBatchOffset;
+    public long numBatchRestored;
+    public long totalNumRestored;
+    public TopicPartition restoreTopicPartition;
+
+    public static final String RESTORE_START = "restore_start";
+    public static final String RESTORE_BATCH = "restore_batch";
+    public static final String RESTORE_END = "restore_end";
+
+    @Override
+    public void restore(byte[] key, byte[] value) {
+        restored.add(KeyValue.pair(key, value));
+    }
+
+    @Override
+    public void onRestoreStart(final TopicPartition topicPartition, final String storeName,
+                               final long startingOffset, final long endingOffset) {
+        restoreTopicPartition = topicPartition;
+        storeNameCalledStates.put(RESTORE_START, storeName);
+        restoreStartOffset = startingOffset;
+        restoreEndOffset = endingOffset;
+    }
+
+    @Override
+    public void onBatchRestored(final TopicPartition topicPartition, final String storeName,
+                                final long batchEndOffset, final long numRestored) {
+        restoreTopicPartition = topicPartition;
+        storeNameCalledStates.put(RESTORE_BATCH, storeName);
+        restoredBatchOffset = batchEndOffset;
+        numBatchRestored = numRestored;
+
+    }
+
+    @Override
+    public void onRestoreEnd(final TopicPartition topicPartition, final String storeName,
+                             final long totalRestored) {
+        restoreTopicPartition = topicPartition;
+        storeNameCalledStates.put(RESTORE_END, storeName);
+        totalNumRestored = totalRestored;
+    }
+
+    @Override
+    public String toString() {
+        return "MockStateRestoreListener{" +
+               "storeNameCalledStates=" + storeNameCalledStates +
+               ", restored=" + restored +
+               ", restoreStartOffset=" + restoreStartOffset +
+               ", restoreEndOffset=" + restoreEndOffset +
+               ", restoredBatchOffset=" + restoredBatchOffset +
+               ", numBatchRestored=" + numBatchRestored +
+               ", totalNumRestored=" + totalNumRestored +
+               ", restoreTopicPartition=" + restoreTopicPartition +
+               '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c50c941a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index d026c60..ce25e67 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
@@ -152,6 +153,7 @@ public class ProcessorTopologyTestDriver {
     private final Map<String, Queue<ProducerRecord<byte[], byte[]>>> outputRecordsByTopic
= new HashMap<>();
     private final Set<String> internalTopics = new HashSet<>();
     private final Map<String, TopicPartition> globalPartitionsByTopic = new HashMap<>();
+    private final StateRestoreListener stateRestoreListener = new MockStateRestoreListener();
     private StreamTask task;
     private GlobalStateUpdateTask globalStateTask;
 
@@ -221,7 +223,8 @@ public class ProcessorTopologyTestDriver {
                                   new StoreChangelogReader(
                                       createRestoreConsumer(topology.storeToChangelogTopic()),
                                       Time.SYSTEM,
-                                      5000),
+                                      5000,
+                                      stateRestoreListener),
                                   config,
                                   streamsMetrics, stateDirectory,
                                   cache,


Mime
View raw message