kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5701: fix flaky RocksDBStore unit test
Date Mon, 07 Aug 2017 22:20:27 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 1f0ad0121 -> 57770dd23


KAFKA-5701: fix flaky RocksDBStore unit test

1. Remove separate thread from test failing periodically due to race condition.
2. Remove anonymous `AbstractNotifyingBatchingRestoreCallback` declare as concrete inner class
`RocksDBBatchingRestoreCallback` and set as package private variable.  Class is static so
it has to initialize it's dependency on `RocksDBStore`

Author: Bill Bejeck <bill@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #3640 from bbejeck/KAFKA-5701_fix_flaky_unit_test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/57770dd2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/57770dd2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/57770dd2

Branch: refs/heads/trunk
Commit: 57770dd235a82994f296971079a97281be0a130b
Parents: 1f0ad01
Author: Bill Bejeck <bill@confluent.io>
Authored: Mon Aug 7 15:20:24 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Aug 7 15:20:24 2017 -0700

----------------------------------------------------------------------
 .../streams/state/internals/RocksDBStore.java   | 52 ++++++++++-----
 .../state/internals/RocksDBStoreTest.java       | 70 +++++---------------
 2 files changed, 52 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/57770dd2/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index e68db14..fbaeef2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback;
+import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@@ -101,6 +102,8 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V>
{
 
     private volatile boolean prepareForBulkload = false;
     private ProcessorContext internalProcessorContext;
+    // visible for testing
+    volatile BatchingStateRestoreCallback batchingStateRestoreCallback = null;
 
     protected volatile boolean open = false;
 
@@ -180,27 +183,11 @@ public class RocksDBStore<K, V> implements KeyValueStore<K,
V> {
         // open the DB dir
         this.internalProcessorContext = context;
         openDB(context);
+        this.batchingStateRestoreCallback = new RocksDBBatchingRestoreCallback(this);
 
         // value getter should always read directly from rocksDB
         // since it is only for values that are already flushed
-        context.register(root, false, new AbstractNotifyingBatchingRestoreCallback() {
-            @Override
-            public void restoreAll(Collection<KeyValue<byte[], byte[]>> records)
{
-                restoreAllInternal(records);
-            }
-
-            @Override
-            public void onRestoreStart(TopicPartition topicPartition, String storeName,
-                                       long startingOffset, long endingOffset) {
-                toggleDbForBulkLoading(true);
-            }
-
-            @Override
-            public void onRestoreEnd(TopicPartition topicPartition, String storeName,
-                                     long totalRestored) {
-                toggleDbForBulkLoading(false);
-            }
-        });
+        context.register(root, false, this.batchingStateRestoreCallback);
 
         open = true;
     }
@@ -538,4 +525,33 @@ public class RocksDBStore<K, V> implements KeyValueStore<K,
V> {
             return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey)
<= 0;
         }
     }
+
+    private static class RocksDBBatchingRestoreCallback extends AbstractNotifyingBatchingRestoreCallback
{
+
+        private final RocksDBStore rocksDBStore;
+
+        RocksDBBatchingRestoreCallback(final RocksDBStore rocksDBStore) {
+            this.rocksDBStore = rocksDBStore;
+        }
+
+        @Override
+        public void restoreAll(final Collection<KeyValue<byte[], byte[]>> records)
{
+            rocksDBStore.restoreAllInternal(records);
+        }
+
+        @Override
+        public void onRestoreStart(final TopicPartition topicPartition,
+                                   final String storeName,
+                                   final long startingOffset,
+                                   final long endingOffset) {
+            rocksDBStore.toggleDbForBulkLoading(true);
+        }
+
+        @Override
+        public void onRestoreEnd(final TopicPartition topicPartition,
+                                 final String storeName,
+                                 final long totalRestored) {
+            rocksDBStore.toggleDbForBulkLoading(false);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/57770dd2/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 8786b15..831febd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -23,12 +23,12 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.state.RocksDBConfigSetter;
 import org.apache.kafka.test.MockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
-import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -41,10 +41,9 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -123,33 +122,30 @@ public class RocksDBStoreTest {
     }
 
     @Test
-    public void shouldTogglePrepareForBulkLoadDuringRestoreCalls() throws Exception {
+    public void shouldTogglePrepareForBulkloadSetting() {
+        subject.init(context, subject);
+        StateRestoreListener restoreListener = (StateRestoreListener) subject.batchingStateRestoreCallback;
+
+        restoreListener.onRestoreStart(null, null, 0, 0);
+        assertTrue("Should have set bulk loading to true", subject.isPrepareForBulkload());
+
+        restoreListener.onRestoreEnd(null, null, 0);
+        assertFalse("Should have set bulk loading to false", subject.isPrepareForBulkload());
+    }
+
+    @Test
+    public void shouldRestoreAll() throws Exception {
         final List<KeyValue<byte[], byte[]>> entries = new ArrayList<>();
         entries.add(new KeyValue<>("1".getBytes("UTF-8"), "a".getBytes("UTF-8")));
         entries.add(new KeyValue<>("2".getBytes("UTF-8"), "b".getBytes("UTF-8")));
         entries.add(new KeyValue<>("3".getBytes("UTF-8"), "c".getBytes("UTF-8")));
 
-        final AtomicReference<Exception> conditionNotMet = new AtomicReference<>();
-        final AtomicInteger conditionCheckCount = new AtomicInteger();
-
-        Thread conditionCheckThread = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                assertRocksDBTurnsOnBulkLoading(conditionCheckCount, conditionNotMet);
-
-                assertRockDBTurnsOffBulkLoad(conditionCheckCount, conditionNotMet);
-            }
-        });
-
         subject.init(context, subject);
-
-        conditionCheckThread.start();
         context.restore(subject.name(), entries);
 
-        conditionCheckThread.join(2000);
-
-        assertTrue(conditionNotMet.get() == null);
-        assertTrue(conditionCheckCount.get() == 2);
+        assertEquals(subject.get("1"), "a");
+        assertEquals(subject.get("2"), "b");
+        assertEquals(subject.get("3"), "c");
     }
 
 
@@ -207,36 +203,6 @@ public class RocksDBStoreTest {
         subject.flush();
     }
 
-    private void assertRockDBTurnsOffBulkLoad(AtomicInteger conditionCount,
-                                              AtomicReference<Exception> conditionNotMet)
{
-        try {
-            TestUtils.waitForCondition(new TestCondition() {
-                @Override
-                public boolean conditionMet() {
-                    return !subject.isPrepareForBulkload();
-                }
-            }, 1000L, "Did not revert bulk load setting");
-            conditionCount.getAndIncrement();
-        } catch (Exception e) {
-            conditionNotMet.set(e);
-        }
-    }
-
-    private void assertRocksDBTurnsOnBulkLoading(AtomicInteger conditionCount,
-                                                 AtomicReference<Exception> conditionNotMet)
{
-        try {
-            TestUtils.waitForCondition(new TestCondition() {
-                @Override
-                public boolean conditionMet() {
-                    return subject.isPrepareForBulkload();
-                }
-            }, 1000L, "Did not prepare for bulk load");
-            conditionCount.getAndIncrement();
-        } catch (Exception e) {
-            conditionNotMet.set(e);
-        }
-    }
-
     public static class MockRocksDbConfigSetter implements RocksDBConfigSetter {
         static boolean called;
 


Mime
View raw message