kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/5] kafka git commit: KAFKA-3121: Remove aggregatorSupplier and add Reduce functions
Date Thu, 21 Jan 2016 00:10:47 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk e4ef8e664 -> 959cf09e8


http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
new file mode 100644
index 0000000..80ad67f
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -0,0 +1,676 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.state.Entry;
+import org.apache.kafka.streams.state.Serdes;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.streams.state.WindowStoreUtil;
+import org.apache.kafka.test.MockProcessorContext;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class RocksDBWindowStoreTest {
+
+    private final ByteArraySerializer byteArraySerializer = new ByteArraySerializer();
+    private final ByteArrayDeserializer byteArrayDeserializer = new ByteArrayDeserializer();
+    private final int numSegments = 3;
+    private final long segmentSize = RocksDBWindowStore.MIN_SEGMENT_INTERVAL;
+    private final long retentionPeriod = segmentSize * (numSegments - 1);
+    private final long windowSize = 3;
+    private final Serdes<Integer, String> serdes = Serdes.withBuiltinTypes("", Integer.class,
String.class);
+
+    protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context,
Serdes<K, V> serdes) {
+        StateStoreSupplier supplier = new RocksDBWindowStoreSupplier<>("window", retentionPeriod,
numSegments, true, serdes, null);
+        WindowStore<K, V> store = (WindowStore<K, V>) supplier.get();
+        store.init(context);
+        return store;
+    }
+
+    @Test
+    public void testPutAndFetch() throws IOException {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer,
byteArraySerializer);
+            RecordCollector recordCollector = new RecordCollector(producer) {
+                @Override
+                public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1>
keySerializer, Serializer<V1> valueSerializer) {
+                    changeLog.add(new Entry<>(
+                                    keySerializer.serialize(record.topic(), record.key()),
+                                    valueSerializer.serialize(record.topic(), record.value()))
+                    );
+                }
+            };
+
+            MockProcessorContext context = new MockProcessorContext(
+                    null, baseDir,
+                    byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+                    recordCollector);
+
+            WindowStore<Integer, String> store = createWindowStore(context, serdes);
+            try {
+                long startTime = segmentSize - 4L;
+
+                context.setTime(startTime + 0L);
+                store.put(0, "zero");
+                context.setTime(startTime + 1L);
+                store.put(1, "one");
+                context.setTime(startTime + 2L);
+                store.put(2, "two");
+                context.setTime(startTime + 3L);
+                // (3, "three") is not put
+                context.setTime(startTime + 4L);
+                store.put(4, "four");
+                context.setTime(startTime + 5L);
+                store.put(5, "five");
+
+                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L -
windowSize, startTime + 0L + windowSize)));
+                assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L -
windowSize, startTime + 1L + windowSize)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L -
windowSize, startTime + 2L + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L - windowSize,
startTime + 3L + windowSize)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L -
windowSize, startTime + 4L + windowSize)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L -
windowSize, startTime + 5L + windowSize)));
+
+                context.setTime(startTime + 3L);
+                store.put(2, "two+1");
+                context.setTime(startTime + 4L);
+                store.put(2, "two+2");
+                context.setTime(startTime + 5L);
+                store.put(2, "two+3");
+                context.setTime(startTime + 6L);
+                store.put(2, "two+4");
+                context.setTime(startTime + 7L);
+                store.put(2, "two+5");
+                context.setTime(startTime + 8L);
+                store.put(2, "two+6");
+
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L - windowSize,
startTime - 2L + windowSize)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L -
windowSize, startTime - 1L + windowSize)));
+                assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime
- windowSize, startTime + windowSize)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2,
startTime + 1L - windowSize, startTime + 1L + windowSize)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2,
startTime + 2L - windowSize, startTime + 2L + windowSize)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2,
startTime + 3L - windowSize, startTime + 3L + windowSize)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5"),
toList(store.fetch(2, startTime + 4L - windowSize, startTime + 4L + windowSize)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5",
"two+6"), toList(store.fetch(2, startTime + 5L - windowSize, startTime + 5L + windowSize)));
+                assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6"),
toList(store.fetch(2, startTime + 6L - windowSize, startTime + 6L + windowSize)));
+                assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2,
startTime + 7L - windowSize, startTime + 7L + windowSize)));
+                assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2,
startTime + 8L - windowSize, startTime + 8L + windowSize)));
+                assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2,
startTime + 9L - windowSize, startTime + 9L + windowSize)));
+                assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime
+ 10L - windowSize, startTime + 10L + windowSize)));
+                assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L
- windowSize, startTime + 11L + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L - windowSize,
startTime + 12L + windowSize)));
+
+                // Flush the store and verify all current entries were properly flushed ...
+                store.flush();
+
+                Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog,
startTime);
+
+                assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
+                assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
+                assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6",
"two+5@7", "two+6@8"), entriesByKey.get(2));
+                assertNull(entriesByKey.get(3));
+                assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
+                assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
+                assertNull(entriesByKey.get(6));
+
+            } finally {
+                store.close();
+            }
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    @Test
+    public void testPutAndFetchBefore() throws IOException {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer,
byteArraySerializer);
+            RecordCollector recordCollector = new RecordCollector(producer) {
+                @Override
+                public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1>
keySerializer, Serializer<V1> valueSerializer) {
+                    changeLog.add(new Entry<>(
+                                    keySerializer.serialize(record.topic(), record.key()),
+                                    valueSerializer.serialize(record.topic(), record.value()))
+                    );
+                }
+            };
+
+            MockProcessorContext context = new MockProcessorContext(
+                    null, baseDir,
+                    byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+                    recordCollector);
+
+            WindowStore<Integer, String> store = createWindowStore(context, serdes);
+            try {
+                long startTime = segmentSize - 4L;
+
+                context.setTime(startTime + 0L);
+                store.put(0, "zero");
+                context.setTime(startTime + 1L);
+                store.put(1, "one");
+                context.setTime(startTime + 2L);
+                store.put(2, "two");
+                context.setTime(startTime + 3L);
+                // (3, "three") is not put
+                context.setTime(startTime + 4L);
+                store.put(4, "four");
+                context.setTime(startTime + 5L);
+                store.put(5, "five");
+
+                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L -
windowSize, startTime + 0L)));
+                assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L -
windowSize, startTime + 1L)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L -
windowSize, startTime + 2L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L - windowSize,
startTime + 3L)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L -
windowSize, startTime + 4L)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L -
windowSize, startTime + 5L)));
+
+                context.setTime(startTime + 3L);
+                store.put(2, "two+1");
+                context.setTime(startTime + 4L);
+                store.put(2, "two+2");
+                context.setTime(startTime + 5L);
+                store.put(2, "two+3");
+                context.setTime(startTime + 6L);
+                store.put(2, "two+4");
+                context.setTime(startTime + 7L);
+                store.put(2, "two+5");
+                context.setTime(startTime + 8L);
+                store.put(2, "two+6");
+
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 1L - windowSize,
startTime - 1L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 0L - windowSize,
startTime + 0L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 1L - windowSize,
startTime + 1L)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L -
windowSize, startTime + 2L)));
+                assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime
+ 3L - windowSize, startTime + 3L)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2,
startTime + 4L - windowSize, startTime + 4L)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2,
startTime + 5L - windowSize, startTime + 5L)));
+                assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2,
startTime + 6L - windowSize, startTime + 6L)));
+                assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2,
startTime + 7L - windowSize, startTime + 7L)));
+                assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2,
startTime + 8L - windowSize, startTime + 8L)));
+                assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2,
startTime + 9L - windowSize, startTime + 9L)));
+                assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime
+ 10L - windowSize, startTime + 10L)));
+                assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L
- windowSize, startTime + 11L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L - windowSize,
startTime + 12L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 13L - windowSize,
startTime + 13L)));
+
+                // Flush the store and verify all current entries were properly flushed ...
+                store.flush();
+
+                Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog,
startTime);
+
+                assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
+                assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
+                assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6",
"two+5@7", "two+6@8"), entriesByKey.get(2));
+                assertNull(entriesByKey.get(3));
+                assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
+                assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
+                assertNull(entriesByKey.get(6));
+
+            } finally {
+                store.close();
+            }
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    @Test
+    public void testPutAndFetchAfter() throws IOException {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer,
byteArraySerializer);
+            RecordCollector recordCollector = new RecordCollector(producer) {
+                @Override
+                public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1>
keySerializer, Serializer<V1> valueSerializer) {
+                    changeLog.add(new Entry<>(
+                                    keySerializer.serialize(record.topic(), record.key()),
+                                    valueSerializer.serialize(record.topic(), record.value()))
+                    );
+                }
+            };
+
+            MockProcessorContext context = new MockProcessorContext(
+                    null, baseDir,
+                    byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+                    recordCollector);
+
+            WindowStore<Integer, String> store = createWindowStore(context, serdes);
+            try {
+                long startTime = segmentSize - 4L;
+
+                context.setTime(startTime + 0L);
+                store.put(0, "zero");
+                context.setTime(startTime + 1L);
+                store.put(1, "one");
+                context.setTime(startTime + 2L);
+                store.put(2, "two");
+                context.setTime(startTime + 3L);
+                // (3, "three") is not put
+                context.setTime(startTime + 4L);
+                store.put(4, "four");
+                context.setTime(startTime + 5L);
+                store.put(5, "five");
+
+                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L,
startTime + 0L + windowSize)));
+                assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L, startTime
+ 1L + windowSize)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L, startTime
+ 2L + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L, startTime
+ 3L + windowSize)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L,
startTime + 4L + windowSize)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L,
startTime + 5L + windowSize)));
+
+                context.setTime(startTime + 3L);
+                store.put(2, "two+1");
+                context.setTime(startTime + 4L);
+                store.put(2, "two+2");
+                context.setTime(startTime + 5L);
+                store.put(2, "two+3");
+                context.setTime(startTime + 6L);
+                store.put(2, "two+4");
+                context.setTime(startTime + 7L);
+                store.put(2, "two+5");
+                context.setTime(startTime + 8L);
+                store.put(2, "two+6");
+
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L, startTime
- 2L + windowSize)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L, startTime
- 1L + windowSize)));
+                assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime,
startTime + windowSize)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2,
startTime + 1L, startTime + 1L + windowSize)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2,
startTime + 2L, startTime + 2L + windowSize)));
+                assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2,
startTime + 3L, startTime + 3L + windowSize)));
+                assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2,
startTime + 4L, startTime + 4L + windowSize)));
+                assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2,
startTime + 5L, startTime + 5L + windowSize)));
+                assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2,
startTime + 6L, startTime + 6L + windowSize)));
+                assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime
+ 7L, startTime + 7L + windowSize)));
+                assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 8L,
startTime + 8L + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 9L, startTime
+ 9L + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 10L, startTime
+ 10L + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 11L, startTime
+ 11L + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L, startTime
+ 12L + windowSize)));
+
+                // Flush the store and verify all current entries were properly flushed ...
+                store.flush();
+
+                Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog,
startTime);
+
+                assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
+                assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
+                assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6",
"two+5@7", "two+6@8"), entriesByKey.get(2));
+                assertNull(entriesByKey.get(3));
+                assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
+                assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
+                assertNull(entriesByKey.get(6));
+
+            } finally {
+                store.close();
+            }
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    @Test
+    public void testPutSameKeyTimestamp() throws IOException {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer,
byteArraySerializer);
+            RecordCollector recordCollector = new RecordCollector(producer) {
+                @Override
+                public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1>
keySerializer, Serializer<V1> valueSerializer) {
+                    changeLog.add(new Entry<>(
+                                    keySerializer.serialize(record.topic(), record.key()),
+                                    valueSerializer.serialize(record.topic(), record.value()))
+                    );
+                }
+            };
+
+            MockProcessorContext context = new MockProcessorContext(
+                    null, baseDir,
+                    byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+                    recordCollector);
+
+            WindowStore<Integer, String> store = createWindowStore(context, serdes);
+            try {
+                long startTime = segmentSize - 4L;
+
+                context.setTime(startTime);
+                store.put(0, "zero");
+
+                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime - windowSize,
startTime + windowSize)));
+
+                context.setTime(startTime);
+                store.put(0, "zero");
+                context.setTime(startTime);
+                store.put(0, "zero+");
+                context.setTime(startTime);
+                store.put(0, "zero++");
+
+                assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0,
startTime - windowSize, startTime + windowSize)));
+                assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0,
startTime + 1L - windowSize, startTime + 1L + windowSize)));
+                assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0,
startTime + 2L - windowSize, startTime + 2L + windowSize)));
+                assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0,
startTime + 3L - windowSize, startTime + 3L + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime + 4L - windowSize,
startTime + 4L + windowSize)));
+
+                // Flush the store and verify all current entries were properly flushed ...
+                store.flush();
+
+                Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog,
startTime);
+
+                assertEquals(Utils.mkSet("zero@0", "zero@0", "zero+@0", "zero++@0"), entriesByKey.get(0));
+
+            } finally {
+                store.close();
+            }
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    @Test
+    public void testRolling() throws IOException {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer,
byteArraySerializer);
+            RecordCollector recordCollector = new RecordCollector(producer) {
+                @Override
+                public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1>
keySerializer, Serializer<V1> valueSerializer) {
+                    changeLog.add(new Entry<>(
+                                    keySerializer.serialize(record.topic(), record.key()),
+                                    valueSerializer.serialize(record.topic(), record.value()))
+                    );
+                }
+            };
+
+            MockProcessorContext context = new MockProcessorContext(
+                    null, baseDir,
+                    byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+                    recordCollector);
+
+            WindowStore<Integer, String> store = createWindowStore(context, serdes);
+            RocksDBWindowStore<Integer, String> inner =
+                    (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer,
String>) store).inner();
+            try {
+                long startTime = segmentSize * 2;
+                long incr = segmentSize / 2;
+
+                context.setTime(startTime);
+                store.put(0, "zero");
+                assertEquals(Utils.mkSet(2L), inner.segmentIds());
+
+                context.setTime(startTime + incr);
+                store.put(1, "one");
+                assertEquals(Utils.mkSet(2L), inner.segmentIds());
+
+                context.setTime(startTime + incr * 2);
+                store.put(2, "two");
+                assertEquals(Utils.mkSet(2L, 3L), inner.segmentIds());
+
+                context.setTime(startTime + incr * 3);
+                // (3, "three") is not put
+                assertEquals(Utils.mkSet(2L, 3L), inner.segmentIds());
+
+                context.setTime(startTime + incr * 4);
+                store.put(4, "four");
+                assertEquals(Utils.mkSet(2L, 3L, 4L), inner.segmentIds());
+
+                context.setTime(startTime + incr * 5);
+                store.put(5, "five");
+                assertEquals(Utils.mkSet(2L, 3L, 4L), inner.segmentIds());
+
+                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime - windowSize,
startTime + windowSize)));
+                assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + incr
- windowSize, startTime + incr + windowSize)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr
* 2 - windowSize, startTime + incr * 2 + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 -
windowSize, startTime + incr * 3 + windowSize)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr
* 4 - windowSize, startTime + incr * 4 + windowSize)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr
* 5 - windowSize, startTime + incr * 5 + windowSize)));
+
+                context.setTime(startTime + incr * 6);
+                store.put(6, "six");
+                assertEquals(Utils.mkSet(3L, 4L, 5L), inner.segmentIds());
+
+                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize,
startTime + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize,
startTime + incr + windowSize)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr
* 2 - windowSize, startTime + incr * 2 + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 -
windowSize, startTime + incr * 3 + windowSize)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr
* 4 - windowSize, startTime + incr * 4 + windowSize)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr
* 5 - windowSize, startTime + incr * 5 + windowSize)));
+                assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr
* 6 - windowSize, startTime + incr * 6 + windowSize)));
+
+
+                context.setTime(startTime + incr * 7);
+                store.put(7, "seven");
+                assertEquals(Utils.mkSet(3L, 4L, 5L), inner.segmentIds());
+
+                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize,
startTime + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize,
startTime + incr + windowSize)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr
* 2 - windowSize, startTime + incr * 2 + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 -
windowSize, startTime + incr * 3 + windowSize)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr
* 4 - windowSize, startTime + incr * 4 + windowSize)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr
* 5 - windowSize, startTime + incr * 5 + windowSize)));
+                assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr
* 6 - windowSize, startTime + incr * 6 + windowSize)));
+                assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr
* 7 - windowSize, startTime + incr * 7 + windowSize)));
+
+                context.setTime(startTime + incr * 8);
+                store.put(8, "eight");
+                assertEquals(Utils.mkSet(4L, 5L, 6L), inner.segmentIds());
+
+                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize,
startTime + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize,
startTime + incr + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + incr * 2 -
windowSize, startTime + incr * 2 + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 -
windowSize, startTime + incr * 3 + windowSize)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr
* 4 - windowSize, startTime + incr * 4 + windowSize)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr
* 5 - windowSize, startTime + incr * 5 + windowSize)));
+                assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr
* 6 - windowSize, startTime + incr * 6 + windowSize)));
+                assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr
* 7 - windowSize, startTime + incr * 7 + windowSize)));
+                assertEquals(Utils.mkList("eight"), toList(store.fetch(8, startTime + incr
* 8 - windowSize, startTime + incr * 8 + windowSize)));
+
+                // check segment directories
+                store.flush();
+                assertEquals(
+                        Utils.mkSet(inner.directorySuffix(4L), inner.directorySuffix(5L),
inner.directorySuffix(6L)),
+                        segmentDirs(baseDir)
+                );
+            } finally {
+                store.close();
+            }
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    @Test
+    public void testRestore() throws IOException {
+        final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+        long startTime = segmentSize * 2;
+        long incr = segmentSize / 2;
+
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer,
byteArraySerializer);
+            RecordCollector recordCollector = new RecordCollector(producer) {
+                @Override
+                public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1>
keySerializer, Serializer<V1> valueSerializer) {
+                    changeLog.add(new Entry<>(
+                                    keySerializer.serialize(record.topic(), record.key()),
+                                    valueSerializer.serialize(record.topic(), record.value()))
+                    );
+                }
+            };
+
+            MockProcessorContext context = new MockProcessorContext(
+                    null, baseDir,
+                    byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+                    recordCollector);
+
+            WindowStore<Integer, String> store = createWindowStore(context, serdes);
+            try {
+                context.setTime(startTime);
+                store.put(0, "zero");
+                context.setTime(startTime + incr);
+                store.put(1, "one");
+                context.setTime(startTime + incr * 2);
+                store.put(2, "two");
+                context.setTime(startTime + incr * 3);
+                store.put(3, "three");
+                context.setTime(startTime + incr * 4);
+                store.put(4, "four");
+                context.setTime(startTime + incr * 5);
+                store.put(5, "five");
+                context.setTime(startTime + incr * 6);
+                store.put(6, "six");
+                context.setTime(startTime + incr * 7);
+                store.put(7, "seven");
+                context.setTime(startTime + incr * 8);
+                store.put(8, "eight");
+                store.flush();
+
+            } finally {
+                store.close();
+            }
+
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+
+        File baseDir2 = Files.createTempDirectory("test").toFile();
+        try {
+            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer,
byteArraySerializer);
+            RecordCollector recordCollector = new RecordCollector(producer) {
+                @Override
+                public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1>
keySerializer, Serializer<V1> valueSerializer) {
+                    changeLog.add(new Entry<>(
+                                    keySerializer.serialize(record.topic(), record.key()),
+                                    valueSerializer.serialize(record.topic(), record.value()))
+                    );
+                }
+            };
+
+            MockProcessorContext context = new MockProcessorContext(
+                    null, baseDir,
+                    byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+                    recordCollector);
+
+            WindowStore<Integer, String> store = createWindowStore(context, serdes);
+            RocksDBWindowStore<Integer, String> inner =
+                    (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer,
String>) store).inner();
+
+            try {
+                context.restore("window", changeLog);
+
+                assertEquals(Utils.mkSet(4L, 5L, 6L), inner.segmentIds());
+
+                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize,
startTime + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize,
startTime + incr + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + incr * 2 -
windowSize, startTime + incr * 2 + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 -
windowSize, startTime + incr * 3 + windowSize)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr
* 4 - windowSize, startTime + incr * 4 + windowSize)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr
* 5 - windowSize, startTime + incr * 5 + windowSize)));
+                assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr
* 6 - windowSize, startTime + incr * 6 + windowSize)));
+                assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr
* 7 - windowSize, startTime + incr * 7 + windowSize)));
+                assertEquals(Utils.mkList("eight"), toList(store.fetch(8, startTime + incr
* 8 - windowSize, startTime + incr * 8 + windowSize)));
+
+                // check segment directories
+                store.flush();
+                assertEquals(
+                        Utils.mkSet(inner.directorySuffix(4L), inner.directorySuffix(5L),
inner.directorySuffix(6L)),
+                        segmentDirs(baseDir)
+                );
+            } finally {
+                store.close();
+            }
+
+
+        } finally {
+            Utils.delete(baseDir2);
+        }
+    }
+
+    private <E> List<E> toList(WindowStoreIterator<E> iterator) {
+        ArrayList<E> list = new ArrayList<>();
+        while (iterator.hasNext()) {
+            list.add(iterator.next().value);
+        }
+        return list;
+    }
+
+    private Set<String> segmentDirs(File baseDir) {
+        File rocksDbDir = new File(baseDir, "rocksdb");
+        String[] subdirs = rocksDbDir.list();
+
+        HashSet<String> set = new HashSet<>();
+
+        for (String subdir : subdirs) {
+            if (subdir.startsWith("window-"))
+            set.add(subdir.substring(7));
+        }
+        return set;
+    }
+
+    private Map<Integer, Set<String>> entriesByKey(List<Entry<byte[], byte[]>>
changeLog, long startTime) {
+        HashMap<Integer, Set<String>> entriesByKey = new HashMap<>();
+
+        for (Entry<byte[], byte[]> entry : changeLog) {
+            long timestamp = WindowStoreUtil.timestampFromBinaryKey(entry.key());
+            Integer key = WindowStoreUtil.keyFromBinaryKey(entry.key(), serdes);
+            String value = entry.value() == null ? null : serdes.valueFrom(entry.value());
+
+            Set<String> entries = entriesByKey.get(key);
+            if (entries == null) {
+                entries = new HashSet<>();
+                entriesByKey.put(key, entries);
+            }
+            entries.add(value + "@" + (timestamp - startTime));
+        }
+
+        return entriesByKey;
+    }
+}


Mime
View raw message