kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/5] kafka git commit: KAFKA-3452: Support session windows
Date Fri, 06 Jan 2017 18:12:34 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 00964ec74 -> e0de3a421


http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index b15ebab..fc30740 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -60,13 +60,14 @@ public class RocksDBWindowStoreTest {
     private final Serde<byte[]> byteArraySerde = Serdes.ByteArray();
     private final String windowName = "window";
     private final int numSegments = 3;
-    private final long segmentSize = RocksDBWindowStore.MIN_SEGMENT_INTERVAL;
+    private final long segmentSize = Segments.MIN_SEGMENT_INTERVAL;
     private final long retentionPeriod = segmentSize * (numSegments - 1);
     private final long windowSize = 3;
     private final Serde<Integer> intSerde = Serdes.Integer();
     private final Serde<String> stringSerde = Serdes.String();
     private final StateSerdes<Integer, String> serdes = new StateSerdes<>("",
intSerde, stringSerde);
     private static final long DEFAULT_CACHE_SIZE_BYTES = 1 * 1024 * 1024L;
+    private final Segments segments = new Segments(windowName, retentionPeriod, numSegments);
 
     @SuppressWarnings("unchecked")
     protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context,
final boolean enableCaching, final boolean retainDuplicates) {
@@ -449,7 +450,7 @@ public class RocksDBWindowStoreTest {
                 recordCollector, new ThreadCache(DEFAULT_CACHE_SIZE_BYTES));
 
             WindowStore<Integer, String> store = createWindowStore(context, true, false);
-            assertTrue(store instanceof CachingWindowStore);
+            assertTrue(store instanceof CachedStateStore);
         } finally {
             Utils.delete(baseDir);
         }
@@ -480,30 +481,36 @@ public class RocksDBWindowStoreTest {
             RocksDBWindowStore<Integer, String> inner =
                     (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer,
String>) store).inner();
             try {
+                // to validate segments
+                final Segments segments = new Segments(windowName, retentionPeriod, numSegments);
                 long startTime = segmentSize * 2;
                 long incr = segmentSize / 2;
                 context.setRecordContext(createRecordContext(startTime));
                 store.put(0, "zero");
-                assertEquals(Utils.mkSet(2L), inner.segmentIds());
+                assertEquals(Utils.mkSet(segments.segmentName(2)), segmentDirs(baseDir));
 
                 context.setRecordContext(createRecordContext(startTime + incr));
                 store.put(1, "one");
-                assertEquals(Utils.mkSet(2L), inner.segmentIds());
+                assertEquals(Utils.mkSet(segments.segmentName(2)), segmentDirs(baseDir));
 
                 context.setRecordContext(createRecordContext(startTime + incr * 2));
                 store.put(2, "two");
-                assertEquals(Utils.mkSet(2L, 3L), inner.segmentIds());
+                assertEquals(Utils.mkSet(segments.segmentName(2),
+                                         segments.segmentName(3)), segmentDirs(baseDir));
 
-                // (3, "three") is not put
-                assertEquals(Utils.mkSet(2L, 3L), inner.segmentIds());
 
                 context.setRecordContext(createRecordContext(startTime + incr * 4));
                 store.put(4, "four");
-                assertEquals(Utils.mkSet(2L, 3L, 4L), inner.segmentIds());
+                assertEquals(Utils.mkSet(segments.segmentName(2),
+                                         segments.segmentName(3),
+                                         segments.segmentName(4)), segmentDirs(baseDir));
+
 
                 context.setRecordContext(createRecordContext(startTime + incr * 5));
                 store.put(5, "five");
-                assertEquals(Utils.mkSet(2L, 3L, 4L), inner.segmentIds());
+                assertEquals(Utils.mkSet(segments.segmentName(2),
+                                         segments.segmentName(3),
+                                         segments.segmentName(4)), segmentDirs(baseDir));
 
                 assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime - windowSize,
startTime + windowSize)));
                 assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + incr
- windowSize, startTime + incr + windowSize)));
@@ -514,7 +521,10 @@ public class RocksDBWindowStoreTest {
 
                 context.setRecordContext(createRecordContext(startTime + incr * 6));
                 store.put(6, "six");
-                assertEquals(Utils.mkSet(3L, 4L, 5L), inner.segmentIds());
+                assertEquals(Utils.mkSet(segments.segmentName(3),
+                                         segments.segmentName(4),
+                                         segments.segmentName(5)), segmentDirs(baseDir));
+
 
                 assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize,
startTime + windowSize)));
                 assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize,
startTime + incr + windowSize)));
@@ -527,7 +537,9 @@ public class RocksDBWindowStoreTest {
 
                 context.setRecordContext(createRecordContext(startTime + incr * 7));
                 store.put(7, "seven");
-                assertEquals(Utils.mkSet(3L, 4L, 5L), inner.segmentIds());
+                assertEquals(Utils.mkSet(segments.segmentName(3),
+                                         segments.segmentName(4),
+                                         segments.segmentName(5)), segmentDirs(baseDir));
 
                 assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize,
startTime + windowSize)));
                 assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize,
startTime + incr + windowSize)));
@@ -540,7 +552,10 @@ public class RocksDBWindowStoreTest {
 
                 context.setRecordContext(createRecordContext(startTime + incr * 8));
                 store.put(8, "eight");
-                assertEquals(Utils.mkSet(4L, 5L, 6L), inner.segmentIds());
+                assertEquals(Utils.mkSet(segments.segmentName(4),
+                                         segments.segmentName(5),
+                                         segments.segmentName(6)), segmentDirs(baseDir));
+
 
                 assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize,
startTime + windowSize)));
                 assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize,
startTime + incr + windowSize)));
@@ -554,10 +569,10 @@ public class RocksDBWindowStoreTest {
 
                 // check segment directories
                 store.flush();
-                assertEquals(
-                        Utils.mkSet(inner.segmentName(4L), inner.segmentName(5L), inner.segmentName(6L)),
-                        segmentDirs(baseDir)
-                );
+                assertEquals(Utils.mkSet(segments.segmentName(4),
+                                         segments.segmentName(5),
+                                         segments.segmentName(6)), segmentDirs(baseDir));
+
             } finally {
                 store.close();
             }
@@ -567,6 +582,7 @@ public class RocksDBWindowStoreTest {
         }
     }
 
+
     @Test
     public void testRestore() throws IOException {
         final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
@@ -641,14 +657,10 @@ public class RocksDBWindowStoreTest {
                     recordCollector, new ThreadCache(DEFAULT_CACHE_SIZE_BYTES));
 
             WindowStore<Integer, String> store = createWindowStore(context, false,
true);
-            RocksDBWindowStore<Integer, String> inner =
-                    (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer,
String>) store).inner();
 
             try {
                 context.restore(windowName, changeLog);
 
-                assertEquals(Utils.mkSet(4L, 5L, 6L), inner.segmentIds());
-
                 assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize,
startTime + windowSize)));
                 assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize,
startTime + incr + windowSize)));
                 assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + incr * 2 -
windowSize, startTime + incr * 2 + windowSize)));
@@ -662,7 +674,7 @@ public class RocksDBWindowStoreTest {
                 // check segment directories
                 store.flush();
                 assertEquals(
-                        Utils.mkSet(inner.segmentName(4L), inner.segmentName(5L), inner.segmentName(6L)),
+                        Utils.mkSet(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L)),
                         segmentDirs(baseDir)
                 );
             } finally {
@@ -693,16 +705,13 @@ public class RocksDBWindowStoreTest {
                     recordCollector, new ThreadCache(DEFAULT_CACHE_SIZE_BYTES));
 
             WindowStore<Integer, String> store = createWindowStore(context, false,
true);
-            RocksDBWindowStore<Integer, String> inner =
-                    (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer,
String>) store).inner();
-
             try {
 
                 context.setTime(0L);
                 context.setRecordContext(createRecordContext(0));
                 store.put(0, "v");
                 assertEquals(
-                        Utils.mkSet(inner.segmentName(0L)),
+                        Utils.mkSet(segments.segmentName(0L)),
                         segmentDirs(baseDir)
                 );
 
@@ -710,14 +719,14 @@ public class RocksDBWindowStoreTest {
                 store.put(0, "v");
                 store.put(0, "v");
                 assertEquals(
-                        Utils.mkSet(inner.segmentName(0L)),
+                        Utils.mkSet(segments.segmentName(0L)),
                         segmentDirs(baseDir)
                 );
 
                 context.setRecordContext(createRecordContext(60000));
                 store.put(0, "v");
                 assertEquals(
-                        Utils.mkSet(inner.segmentName(0L), inner.segmentName(1L)),
+                        Utils.mkSet(segments.segmentName(0L), segments.segmentName(1L)),
                         segmentDirs(baseDir)
                 );
 
@@ -733,7 +742,7 @@ public class RocksDBWindowStoreTest {
                 assertEquals(4, fetchedCount);
 
                 assertEquals(
-                        Utils.mkSet(inner.segmentName(0L), inner.segmentName(1L)),
+                        Utils.mkSet(segments.segmentName(0L), segments.segmentName(1L)),
                         segmentDirs(baseDir)
                 );
 
@@ -749,7 +758,7 @@ public class RocksDBWindowStoreTest {
                 assertEquals(2, fetchedCount);
 
                 assertEquals(
-                        Utils.mkSet(inner.segmentName(1L), inner.segmentName(3L)),
+                        Utils.mkSet(segments.segmentName(1L), segments.segmentName(3L)),
                         segmentDirs(baseDir)
                 );
 
@@ -765,7 +774,7 @@ public class RocksDBWindowStoreTest {
                 assertEquals(1, fetchedCount);
 
                 assertEquals(
-                        Utils.mkSet(inner.segmentName(3L), inner.segmentName(5L)),
+                        Utils.mkSet(segments.segmentName(3L), segments.segmentName(5L)),
                         segmentDirs(baseDir)
                 );
 
@@ -798,27 +807,25 @@ public class RocksDBWindowStoreTest {
             File storeDir = new File(baseDir, windowName);
 
             WindowStore<Integer, String> store = createWindowStore(context, false,
true);
-            RocksDBWindowStore<Integer, String> inner =
-                    (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer,
String>) store).inner();
+
 
             try {
-                new File(storeDir, inner.segmentName(0L)).mkdir();
-                new File(storeDir, inner.segmentName(1L)).mkdir();
-                new File(storeDir, inner.segmentName(2L)).mkdir();
-                new File(storeDir, inner.segmentName(3L)).mkdir();
-                new File(storeDir, inner.segmentName(4L)).mkdir();
-                new File(storeDir, inner.segmentName(5L)).mkdir();
-                new File(storeDir, inner.segmentName(6L)).mkdir();
+                new File(storeDir, segments.segmentName(0L)).mkdir();
+                new File(storeDir, segments.segmentName(1L)).mkdir();
+                new File(storeDir, segments.segmentName(2L)).mkdir();
+                new File(storeDir, segments.segmentName(3L)).mkdir();
+                new File(storeDir, segments.segmentName(4L)).mkdir();
+                new File(storeDir, segments.segmentName(5L)).mkdir();
+                new File(storeDir, segments.segmentName(6L)).mkdir();
             } finally {
                 store.close();
             }
 
             store = createWindowStore(context, false, true);
-            inner = (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer,
String>) store).inner();
 
             try {
                 assertEquals(
-                        Utils.mkSet(inner.segmentName(4L), inner.segmentName(5L), inner.segmentName(6L)),
+                        Utils.mkSet(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L)),
                         segmentDirs(baseDir)
                 );
 
@@ -829,7 +836,7 @@ public class RocksDBWindowStoreTest {
                 }
 
                 assertEquals(
-                        Utils.mkSet(inner.segmentName(4L), inner.segmentName(5L), inner.segmentName(6L)),
+                        Utils.mkSet(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L)),
                         segmentDirs(baseDir)
                 );
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
new file mode 100644
index 0000000..3869480
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.NoOpRecordCollector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.NoSuchElementException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class SegmentIteratorTest {
+
+    private final Segment segmentOne = new Segment("one", "one", 0);
+    private final Segment segmentTwo = new Segment("two", "window", 1);
+    private final HasNextCondition hasNextCondition = new HasNextCondition() {
+        @Override
+        public boolean hasNext(final KeyValueIterator iterator) {
+            return iterator.hasNext();
+        }
+    };
+
+    @Before
+    public void before() {
+        final MockProcessorContext context = new MockProcessorContext(null,
+                                                                      TestUtils.tempDirectory(),
+                                                                      Serdes.String(),
+                                                                      Serdes.String(),
+                                                                      new NoOpRecordCollector(),
+                                                                      new ThreadCache(0));
+        segmentOne.openDB(context);
+        segmentTwo.openDB(context);
+        segmentOne.put(Bytes.wrap("a".getBytes()), "1".getBytes());
+        segmentOne.put(Bytes.wrap("b".getBytes()), "2".getBytes());
+        segmentTwo.put(Bytes.wrap("c".getBytes()), "3".getBytes());
+        segmentTwo.put(Bytes.wrap("d".getBytes()), "4".getBytes());
+
+    }
+
+    @After
+    public void closeSegments() {
+        segmentOne.close();
+        segmentTwo.close();
+    }
+
+    @Test
+    public void shouldIterateOverAllSegments() throws Exception {
+        final SegmentIterator iterator = new SegmentIterator(
+                Arrays.asList(segmentOne,
+                              segmentTwo).iterator(),
+                hasNextCondition,
+                Bytes.wrap("a".getBytes()),
+                Bytes.wrap("z".getBytes()));
+
+        assertTrue(iterator.hasNext());
+        assertEquals("a", new String(iterator.peekNextKey().get()));
+        assertEquals(KeyValue.pair("a", "1"), toStringKeyValue(iterator.next()));
+
+        assertTrue(iterator.hasNext());
+        assertEquals("b", new String(iterator.peekNextKey().get()));
+        assertEquals(KeyValue.pair("b", "2"), toStringKeyValue(iterator.next()));
+
+        assertTrue(iterator.hasNext());
+        assertEquals("c", new String(iterator.peekNextKey().get()));
+        assertEquals(KeyValue.pair("c", "3"), toStringKeyValue(iterator.next()));
+
+        assertTrue(iterator.hasNext());
+        assertEquals("d", new String(iterator.peekNextKey().get()));
+        assertEquals(KeyValue.pair("d", "4"), toStringKeyValue(iterator.next()));
+
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    public void shouldOnlyIterateOverSegmentsInRange() throws Exception {
+        final SegmentIterator iterator = new SegmentIterator(
+                Arrays.asList(segmentOne,
+                              segmentTwo).iterator(),
+                hasNextCondition,
+                Bytes.wrap("a".getBytes()),
+                Bytes.wrap("b".getBytes()));
+
+        assertTrue(iterator.hasNext());
+        assertEquals("a", new String(iterator.peekNextKey().get()));
+        assertEquals(KeyValue.pair("a", "1"), toStringKeyValue(iterator.next()));
+
+        assertTrue(iterator.hasNext());
+        assertEquals("b", new String(iterator.peekNextKey().get()));
+        assertEquals(KeyValue.pair("b", "2"), toStringKeyValue(iterator.next()));
+
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test(expected = NoSuchElementException.class)
+    public void shouldThrowNoSuchElementOnPeekNextKeyIfNoNext() throws Exception {
+        final SegmentIterator iterator = new SegmentIterator(
+                Arrays.asList(segmentOne,
+                              segmentTwo).iterator(),
+                hasNextCondition,
+                Bytes.wrap("f".getBytes()),
+                Bytes.wrap("h".getBytes()));
+
+        iterator.peekNextKey();
+    }
+
+    @Test(expected = NoSuchElementException.class)
+    public void shouldThrowNoSuchElementOnNextIfNoNext() throws Exception {
+        final SegmentIterator iterator = new SegmentIterator(
+                Arrays.asList(segmentOne,
+                              segmentTwo).iterator(),
+                hasNextCondition,
+                Bytes.wrap("f".getBytes()),
+                Bytes.wrap("h".getBytes()));
+
+        iterator.next();
+    }
+
+    private KeyValue<String, String> toStringKeyValue(final KeyValue<Bytes, byte[]>
binaryKv) {
+        return KeyValue.pair(new String(binaryKv.key.get()), new String(binaryKv.value));
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
new file mode 100644
index 0000000..9c2f688
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.NoOpRecordCollector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class SegmentsTest {
+
+    private static final int NUM_SEGMENTS = 5;
+    private MockProcessorContext context;
+    private Segments segments;
+
+    @Before
+    public void createContext() {
+        context = new MockProcessorContext(null,
+                                           TestUtils.tempDirectory(),
+                                           Serdes.String(),
+                                           Serdes.Long(),
+                                           new NoOpRecordCollector(),
+                                           new ThreadCache(0));
+        segments = new Segments("test", 4 * 60 * 1000, NUM_SEGMENTS);
+    }
+
+    @After
+    public void close() {
+        segments.close();
+    }
+
+    @Test
+    public void shouldGetSegmentIdsFromTimestamp() throws Exception {
+        assertEquals(0, segments.segmentId(0));
+        assertEquals(1, segments.segmentId(60000));
+        assertEquals(2, segments.segmentId(120000));
+        assertEquals(3, segments.segmentId(180000));
+    }
+
+    @Test
+    public void shouldBaseSegmentIntervalOnRetentionAndNumSegments() throws Exception {
+        final Segments segments = new Segments("test", 8 * 60 * 1000, 5);
+        assertEquals(0, segments.segmentId(0));
+        assertEquals(0, segments.segmentId(60000));
+        assertEquals(1, segments.segmentId(120000));
+    }
+
+    @Test
+    public void shouldGetSegmentNameFromId() throws Exception {
+        assertEquals("test-197001010000", segments.segmentName(0));
+        assertEquals("test-197001010001", segments.segmentName(1));
+        assertEquals("test-197001010002", segments.segmentName(2));
+    }
+
+    @Test
+    public void shouldCreateSegments() throws Exception {
+        final Segment segment1 = segments.getOrCreateSegment(0, context);
+        final Segment segment2 = segments.getOrCreateSegment(1, context);
+        final Segment segment3 = segments.getOrCreateSegment(2, context);
+        assertTrue(new File(context.stateDir(), "test/test-197001010000").isDirectory());
+        assertTrue(new File(context.stateDir(), "test/test-197001010001").isDirectory());
+        assertTrue(new File(context.stateDir(), "test/test-197001010002").isDirectory());
+        assertEquals(true, segment1.isOpen());
+        assertEquals(true, segment2.isOpen());
+        assertEquals(true, segment3.isOpen());
+    }
+
+    @Test
+    public void shouldNotCreateSegmentThatIsAlreadyExpired() throws Exception {
+        segments.getOrCreateSegment(7, context);
+        assertNull(segments.getOrCreateSegment(0, context));
+        assertFalse(new File(context.stateDir(), "test/test-197001010000").exists());
+    }
+
+    @Test
+    public void shouldCleanupSegmentsThatHaveExpired() throws Exception {
+        final Segment segment1 = segments.getOrCreateSegment(0, context);
+        final Segment segment2 = segments.getOrCreateSegment(0, context);
+        final Segment segment3 = segments.getOrCreateSegment(7, context);
+        assertFalse(segment1.isOpen());
+        assertFalse(segment2.isOpen());
+        assertTrue(segment3.isOpen());
+        assertFalse(new File(context.stateDir(), "test/test-197001010000").exists());
+        assertFalse(new File(context.stateDir(), "test/test-197001010001").exists());
+        assertTrue(new File(context.stateDir(), "test/test-197001010007").exists());
+    }
+
+    @Test
+    public void shouldGetSegmentForTimestamp() throws Exception {
+        final Segment segment = segments.getOrCreateSegment(0, context);
+        segments.getOrCreateSegment(1, context);
+        assertEquals(segment, segments.getSegmentForTimestamp(0L));
+    }
+
+    @Test
+    public void shouldCloseAllOpenSegments() throws Exception {
+        final Segment first = segments.getOrCreateSegment(0, context);
+        final Segment second = segments.getOrCreateSegment(1, context);
+        final Segment third = segments.getOrCreateSegment(2, context);
+        segments.close();
+
+        assertFalse(first.isOpen());
+        assertFalse(second.isOpen());
+        assertFalse(third.isOpen());
+    }
+
+    @Test
+    public void shouldOpenExistingSegments() throws Exception {
+        segments.getOrCreateSegment(0, context);
+        segments.getOrCreateSegment(1, context);
+        segments.getOrCreateSegment(2, context);
+        segments.getOrCreateSegment(3, context);
+        segments.getOrCreateSegment(4, context);
+        // close existing.
+        segments.close();
+
+        segments = new Segments("test", 4 * 60 * 1000, 5);
+        segments.openExisting(context);
+
+        assertTrue(segments.getSegmentForTimestamp(0).isOpen());
+        assertTrue(segments.getSegmentForTimestamp(1).isOpen());
+        assertTrue(segments.getSegmentForTimestamp(2).isOpen());
+        assertTrue(segments.getSegmentForTimestamp(3).isOpen());
+        assertTrue(segments.getSegmentForTimestamp(4).isOpen());
+    }
+
+    @Test
+    public void shouldGetSegmentsWithinTimeRange() throws Exception {
+        segments.getOrCreateSegment(0, context);
+        segments.getOrCreateSegment(1, context);
+        segments.getOrCreateSegment(2, context);
+        segments.getOrCreateSegment(3, context);
+        segments.getOrCreateSegment(4, context);
+
+        final List<Segment> segments = this.segments.segments(0, 2 * 60 * 1000);
+        assertEquals(3, segments.size());
+        assertEquals(0, segments.get(0).id);
+        assertEquals(1, segments.get(1).id);
+        assertEquals(2, segments.get(2).id);
+    }
+
+    @Test
+    public void shouldRollSegments() throws Exception {
+        segments.getOrCreateSegment(0, context);
+        verifyCorrectSegments(0, 1);
+        segments.getOrCreateSegment(1, context);
+        verifyCorrectSegments(0, 2);
+        segments.getOrCreateSegment(2, context);
+        verifyCorrectSegments(0, 3);
+        segments.getOrCreateSegment(3, context);
+        verifyCorrectSegments(0, 4);
+        segments.getOrCreateSegment(4, context);
+        verifyCorrectSegments(0, 5);
+        segments.getOrCreateSegment(5, context);
+        verifyCorrectSegments(1, 5);
+        segments.getOrCreateSegment(6, context);
+        verifyCorrectSegments(2, 5);
+    }
+
+    private void verifyCorrectSegments(final long first, final int numSegments) {
+        final List<Segment> result = this.segments.segments(0, Long.MAX_VALUE);
+        assertEquals(numSegments, result.size());
+        for (int i = 0; i < numSegments; i++) {
+            assertEquals(i + first, result.get(i).id);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
index 6c446ec..8aaf9e4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.junit.Test;
 
@@ -242,8 +243,8 @@ public class ThreadCacheTest {
         final String namespace = "streams";
         cache.put(namespace, theByte, dirtyEntry(theByte));
         final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, theByte,
new byte[]{1});
-        assertArrayEquals(theByte, iterator.peekNextKey());
-        assertArrayEquals(theByte, iterator.peekNextKey());
+        assertEquals(Bytes.wrap(theByte), iterator.peekNextKey());
+        assertEquals(Bytes.wrap(theByte), iterator.peekNextKey());
     }
 
     @Test
@@ -253,7 +254,7 @@ public class ThreadCacheTest {
         final String namespace = "streams";
         cache.put(namespace, theByte, dirtyEntry(theByte));
         final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, theByte,
new byte[]{1});
-        assertArrayEquals(iterator.peekNextKey(), iterator.next().key);
+        assertEquals(iterator.peekNextKey(), iterator.next().key);
     }
 
     @Test(expected = NoSuchElementException.class)
@@ -281,10 +282,10 @@ public class ThreadCacheTest {
         final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, new
byte[]{1}, new byte[]{4});
         int bytesIndex = 1;
         while (iterator.hasNext()) {
-            byte[] peekedKey = iterator.peekNextKey();
-            final KeyValue<byte[], LRUCacheEntry> next = iterator.next();
-            assertArrayEquals(bytes[bytesIndex], peekedKey);
-            assertArrayEquals(bytes[bytesIndex], next.key);
+            Bytes peekedKey = iterator.peekNextKey();
+            final KeyValue<Bytes, LRUCacheEntry> next = iterator.next();
+            assertArrayEquals(bytes[bytesIndex], peekedKey.get());
+            assertArrayEquals(bytes[bytesIndex], next.key.get());
             bytesIndex++;
         }
         assertEquals(5, bytesIndex);
@@ -311,7 +312,7 @@ public class ThreadCacheTest {
         // should evict byte[] {0}
         cache.put(namespace, new byte[]{6}, dirtyEntry(new byte[]{6}));
 
-        assertArrayEquals(new byte[]{1}, range.peekNextKey());
+        assertEquals(Bytes.wrap(new byte[]{1}), range.peekNextKey());
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/test/KeyValueIteratorStub.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KeyValueIteratorStub.java b/streams/src/test/java/org/apache/kafka/test/KeyValueIteratorStub.java
new file mode 100644
index 0000000..4642307
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/KeyValueIteratorStub.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import java.util.Iterator;
+
+public class KeyValueIteratorStub<K, V> implements KeyValueIterator<K, V> {
+
+    private final Iterator<KeyValue<K, V>> iterator;
+
+    public KeyValueIteratorStub(final Iterator<KeyValue<K, V>> iterator) {
+        this.iterator = iterator;
+    }
+
+    @Override
+    public void close() {
+        //no-op
+    }
+
+    @Override
+    public K peekNextKey() {
+        return null;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return iterator.hasNext();
+    }
+
+    @Override
+    public KeyValue<K, V> next() {
+        return iterator.next();
+    }
+
+    @Override
+    public void remove() {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index f4ab642..f058e30 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
@@ -95,6 +96,9 @@ public class MockProcessorContext implements InternalProcessorContext, RecordCol
     }
 
     public void setTime(long timestamp) {
+        if (recordContext != null) {
+            recordContext = new ProcessorRecordContext(timestamp, recordContext.offset(),
recordContext.partition(), recordContext.topic());
+        }
         this.timestamp = timestamp;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java
new file mode 100644
index 0000000..6169474
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlySessionStore;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ReadOnlySessionStoreStub<K, V> implements ReadOnlySessionStore<K, V>,
StateStore {
+    private Map<K, List<KeyValue<Windowed<K>, V>>> sessions = new
HashMap<>();
+    private boolean open = true;
+
+    public void put(final Windowed<K> sessionKey, final V value) {
+        if (!sessions.containsKey(sessionKey.key())) {
+            sessions.put(sessionKey.key(), new ArrayList<KeyValue<Windowed<K>,
V>>());
+        }
+        sessions.get(sessionKey.key()).add(KeyValue.pair(sessionKey, value));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, V> fetch(final K key) {
+        if (!open) {
+            throw new InvalidStateStoreException("not open");
+        }
+        if (!sessions.containsKey(key)) {
+            return new KeyValueIteratorStub<>(Collections.<KeyValue<Windowed<K>,
V>>emptyIterator());
+        }
+        return new KeyValueIteratorStub<>(sessions.get(key).iterator());
+    }
+
+    @Override
+    public String name() {
+        return "";
+    }
+
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+
+    }
+
+    @Override
+    public void flush() {
+
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public boolean persistent() {
+        return false;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return open;
+    }
+
+
+    public void setOpen(final boolean open) {
+        this.open = open;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0de3a42/streams/src/test/java/org/apache/kafka/test/SegmentedBytesStoreStub.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/SegmentedBytesStoreStub.java b/streams/src/test/java/org/apache/kafka/test/SegmentedBytesStoreStub.java
new file mode 100644
index 0000000..3092d47
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/SegmentedBytesStoreStub.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.internals.SegmentedBytesStore;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class SegmentedBytesStoreStub implements SegmentedBytesStore {
+    private Map<Bytes, byte[]> store = new HashMap<>();
+    public boolean fetchCalled;
+    public boolean allUpToCalled;
+    public boolean flushed;
+    public boolean closed;
+    public boolean initialized;
+    public boolean removeCalled;
+    public boolean putCalled;
+    public boolean getCalled;
+
+    @Override
+    public String name() {
+        return "";
+    }
+
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        initialized = true;
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> fetch(final Bytes key, final long from,
final long to) {
+        fetchCalled = true;
+        return new KeyValueIteratorStub<>(Collections.<KeyValue<Bytes, byte[]>>emptyIterator());
+    }
+
+    @Override
+    public void remove(final Bytes key) {
+        store.put(key, null);
+        removeCalled = true;
+    }
+
+    @Override
+    public void put(final Bytes key, final byte[] value) {
+        store.put(key, value);
+        putCalled = true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return false;
+    }
+
+
+    @Override
+    public byte[] get(final Bytes key) {
+        getCalled = true;
+        return store.get(key);
+    }
+
+    @Override
+    public void flush() {
+        flushed = true;
+    }
+
+    @Override
+    public void close() {
+        closed = true;
+    }
+
+    @Override
+    public boolean persistent() {
+        return false;
+    }
+}


Mime
View raw message