kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: Remove 1 minute minimum segment interval (#5323)
Date Wed, 01 Aug 2018 18:01:17 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 814fbe0  MINOR: Remove 1 minute minimum segment interval (#5323)
814fbe0 is described below

commit 814fbe0feabea0f78b690f44ee99b61b07ea7dd2
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Wed Aug 1 13:01:12 2018 -0500

    MINOR: Remove 1 minute minimum segment interval (#5323)
    
    * new minimum is 0, just like window size
    * refactor tests to use smaller segment sizes as well
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../org/apache/kafka/streams/state/Stores.java     |  14 +-
 .../org/apache/kafka/streams/state/StoresTest.java |  31 ++--
 .../state/internals/CachingSessionStoreTest.java   | 132 ++++++++--------
 .../state/internals/CachingWindowStoreTest.java    |  97 +++++++-----
 .../internals/RocksDBSegmentedBytesStoreTest.java  | 169 +++++++++++++--------
 .../state/internals/RocksDBWindowStoreTest.java    |  34 ++---
 .../streams/state/internals/SegmentsTest.java      |  81 +++++-----
 7 files changed, 318 insertions(+), 240 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index c1b81c6..03eaa07 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -29,8 +29,6 @@ import org.apache.kafka.streams.state.internals.RocksDbSessionBytesStoreSupplier
 import org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
 import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Objects;
 
@@ -74,8 +72,6 @@ import java.util.Objects;
 @InterfaceStability.Evolving
 public class Stores {
 
-    private static final Logger log = LoggerFactory.getLogger(Stores.class);
-
     /**
      * Create a persistent {@link KeyValueBytesStoreSupplier}.
      * @param name  name of the store (cannot be {@code null})
@@ -195,7 +191,7 @@ public class Stores {
      * Create a persistent {@link WindowBytesStoreSupplier}.
      * @param name                  name of the store (cannot be {@code null})
      * @param retentionPeriod       length of time to retain data in the store (cannot be negative)
-     * @param segmentInterval       size of segments in ms (must be at least one minute)
+     * @param segmentInterval       size of segments in ms (cannot be negative)
      * @param windowSize            size of the windows (cannot be negative)
      * @param retainDuplicates      whether or not to retain duplicates.
      * @return an instance of {@link WindowBytesStoreSupplier}
@@ -206,14 +202,14 @@ public class Stores {
                                                                  final boolean retainDuplicates,
                                                                  final long segmentInterval) {
         Objects.requireNonNull(name, "name cannot be null");
-        if (retentionPeriod < 0) {
+        if (retentionPeriod < 0L) {
             throw new IllegalArgumentException("retentionPeriod cannot be negative");
         }
-        if (windowSize < 0) {
+        if (windowSize < 0L) {
             throw new IllegalArgumentException("windowSize cannot be negative");
         }
-        if (segmentInterval < 60_000) {
-            throw new IllegalArgumentException("segmentInterval must be at least one minute");
+        if (segmentInterval < 1L) {
+            throw new IllegalArgumentException("segmentInterval cannot be zero or negative");
         }
 
         return new RocksDbWindowBytesStoreSupplier(name, retentionPeriod, segmentInterval, windowSize, retainDuplicates);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
index 23f246d..d0da158 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
@@ -38,6 +38,7 @@ public class StoresTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowIfIMemoryKeyValueStoreStoreNameIsNull() {
+        //noinspection ResultOfMethodCallIgnored
         Stores.inMemoryKeyValueStore(null);
     }
 
@@ -53,12 +54,12 @@ public class StoresTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowIfIPersistentWindowStoreStoreNameIsNull() {
-        Stores.persistentWindowStore(null, 0L, 0L, false, 60_000L);
+        Stores.persistentWindowStore(null, 0L, 0L, false, 0L);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void shouldThrowIfIPersistentWindowStoreRetentionPeriodIsNegative() {
-        Stores.persistentWindowStore("anyName", -1L, 0L, false, 60_000L);
+        Stores.persistentWindowStore("anyName", -1L, 0L, false, 0L);
     }
 
     @Deprecated
@@ -74,7 +75,7 @@ public class StoresTest {
 
     @Test(expected = IllegalArgumentException.class)
     public void shouldThrowIfIPersistentWindowStoreIfSegmentIntervalIsTooSmall() {
-        Stores.persistentWindowStore("anyName", 1L, 1L, false, 59_999L);
+        Stores.persistentWindowStore("anyName", 1L, 1L, false, -1L);
     }
 
     @Test(expected = NullPointerException.class)
@@ -129,25 +130,31 @@ public class StoresTest {
 
     @Test
     public void shouldBuildWindowStore() {
-        final WindowStore<String, String> store = Stores.windowStoreBuilder(Stores.persistentWindowStore("store", 3L, 3L, true),
-                                                                      Serdes.String(),
-                                                                      Serdes.String()).build();
+        final WindowStore<String, String> store = Stores.windowStoreBuilder(
+            Stores.persistentWindowStore("store", 3L, 3L, true),
+            Serdes.String(),
+            Serdes.String()
+        ).build();
         assertThat(store, not(nullValue()));
     }
 
     @Test
     public void shouldBuildKeyValueStore() {
-        final KeyValueStore<String, String> store = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("name"),
-                                                                          Serdes.String(),
-                                                                          Serdes.String()).build();
+        final KeyValueStore<String, String> store = Stores.keyValueStoreBuilder(
+            Stores.persistentKeyValueStore("name"),
+            Serdes.String(),
+            Serdes.String()
+        ).build();
         assertThat(store, not(nullValue()));
     }
 
     @Test
     public void shouldBuildSessionStore() {
-        final SessionStore<String, String> store = Stores.sessionStoreBuilder(Stores.persistentSessionStore("name", 10),
-                                                                       Serdes.String(),
-                                                                       Serdes.String()).build();
+        final SessionStore<String, String> store = Stores.sessionStoreBuilder(
+            Stores.persistentSessionStore("name", 10),
+            Serdes.String(),
+            Serdes.String()
+        ).build();
         assertThat(store, not(nullValue()));
     }
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index 194edb1..47e79c9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
 import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
@@ -38,9 +37,12 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
 
+import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.apache.kafka.streams.state.internals.RocksDBSessionStoreTest.toList;
 import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList;
 import static org.apache.kafka.test.StreamsTestUtils.verifyWindowedKeyValue;
@@ -49,14 +51,15 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 
 
+@SuppressWarnings("PointlessArithmeticExpression")
 public class CachingSessionStoreTest {
 
     private static final int MAX_CACHE_SIZE_BYTES = 600;
-    private InternalMockProcessorContext context;
+    private static final Long DEFAULT_TIMESTAMP = 10L;
+    private static final long SEGMENT_INTERVAL = 100L;
     private RocksDBSegmentedBytesStore underlying;
     private CachingSessionStore<String, String> cachingStore;
     private ThreadCache cache;
-    private static final Long DEFAULT_TIMESTAMP = 10L;
     private final Bytes keyA = Bytes.wrap("a".getBytes());
     private final Bytes keyAA = Bytes.wrap("aa".getBytes());
     private final Bytes keyB = Bytes.wrap("b".getBytes());
@@ -65,17 +68,11 @@ public class CachingSessionStoreTest {
     public void setUp() {
         final SessionKeySchema schema = new SessionKeySchema();
         schema.init("topic");
-        final int retention = 60000;
-        final int segmentInterval = 60_000;
-        underlying = new RocksDBSegmentedBytesStore("test", retention, segmentInterval, schema);
+        underlying = new RocksDBSegmentedBytesStore("test", 0L, SEGMENT_INTERVAL, schema);
         final RocksDBSessionStore<Bytes, byte[]> sessionStore = new RocksDBSessionStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray());
-        cachingStore = new CachingSessionStore<>(sessionStore,
-                                                 Serdes.String(),
-                                                 Serdes.String(),
-                                                 segmentInterval
-                                                 );
+        cachingStore = new CachingSessionStore<>(sessionStore, Serdes.String(), Serdes.String(), SEGMENT_INTERVAL);
         cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
-        context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
+        final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
         context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, "topic", null));
         cachingStore.init(context, cachingStore);
     }
@@ -134,11 +131,13 @@ public class CachingSessionStoreTest {
 
     @Test
     public void shouldFetchAllSessionsWithSameRecordKey() {
-        final List<KeyValue<Windowed<Bytes>, byte[]>> expected = Arrays.asList(KeyValue.pair(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()),
-                                                                              KeyValue.pair(new Windowed<>(keyA, new SessionWindow(10, 10)), "2".getBytes()),
-                                                                              KeyValue.pair(new Windowed<>(keyA, new SessionWindow(100, 100)), "3".getBytes()),
-                                                                              KeyValue.pair(new Windowed<>(keyA, new SessionWindow(1000, 1000)), "4".getBytes()));
-        for (KeyValue<Windowed<Bytes>, byte[]> kv : expected) {
+        final List<KeyValue<Windowed<Bytes>, byte[]>> expected = Arrays.asList(
+            KeyValue.pair(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()),
+            KeyValue.pair(new Windowed<>(keyA, new SessionWindow(10, 10)), "2".getBytes()),
+            KeyValue.pair(new Windowed<>(keyA, new SessionWindow(100, 100)), "3".getBytes()),
+            KeyValue.pair(new Windowed<>(keyA, new SessionWindow(1000, 1000)), "4".getBytes())
+        );
+        for (final KeyValue<Windowed<Bytes>, byte[]> kv : expected) {
             cachingStore.put(kv.key, kv.value);
         }
 
@@ -184,14 +183,14 @@ public class CachingSessionStoreTest {
 
     @Test
     public void shouldFetchCorrectlyAcrossSegments() {
-        final Windowed<Bytes> a1 = new Windowed<>(keyA, new SessionWindow(0, 0));
-        final Windowed<Bytes> a2 = new Windowed<>(keyA, new SessionWindow(60_000, 60_000));
-        final Windowed<Bytes> a3 = new Windowed<>(keyA, new SessionWindow(120_000, 120_000));
+        final Windowed<Bytes> a1 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0));
+        final Windowed<Bytes> a2 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 1, SEGMENT_INTERVAL * 1));
+        final Windowed<Bytes> a3 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2));
         cachingStore.put(a1, "1".getBytes());
         cachingStore.put(a2, "2".getBytes());
         cachingStore.put(a3, "3".getBytes());
         cachingStore.flush();
-        final KeyValueIterator<Windowed<Bytes>, byte[]> results = cachingStore.findSessions(keyA, 0, 60_000 * 2);
+        final KeyValueIterator<Windowed<Bytes>, byte[]> results = cachingStore.findSessions(keyA, 0, SEGMENT_INTERVAL * 2);
         assertEquals(a1, results.next().key);
         assertEquals(a2, results.next().key);
         assertEquals(a3, results.next().key);
@@ -200,11 +199,11 @@ public class CachingSessionStoreTest {
 
     @Test
     public void shouldFetchRangeCorrectlyAcrossSegments() {
-        final Windowed<Bytes> a1 = new Windowed<>(keyA, new SessionWindow(0, 0));
-        final Windowed<Bytes> aa1 = new Windowed<>(keyAA, new SessionWindow(0, 0));
-        final Windowed<Bytes> a2 = new Windowed<>(keyA, new SessionWindow(60_000, 60_000));
-        final Windowed<Bytes> a3 = new Windowed<>(keyA, new SessionWindow(60_000 * 2, 60_000 * 2));
-        final Windowed<Bytes> aa3 = new Windowed<>(keyAA, new SessionWindow(60_000 * 2, 60_000 * 2));
+        final Windowed<Bytes> a1 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0));
+        final Windowed<Bytes> aa1 = new Windowed<>(keyAA, new SessionWindow(SEGMENT_INTERVAL * 0, SEGMENT_INTERVAL * 0));
+        final Windowed<Bytes> a2 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 1, SEGMENT_INTERVAL * 1));
+        final Windowed<Bytes> a3 = new Windowed<>(keyA, new SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2));
+        final Windowed<Bytes> aa3 = new Windowed<>(keyAA, new SessionWindow(SEGMENT_INTERVAL * 2, SEGMENT_INTERVAL * 2));
         cachingStore.put(a1, "1".getBytes());
         cachingStore.put(aa1, "1".getBytes());
         cachingStore.put(a2, "2".getBytes());
@@ -212,13 +211,13 @@ public class CachingSessionStoreTest {
         cachingStore.put(aa3, "3".getBytes());
         cachingStore.flush();
 
-        final KeyValueIterator<Windowed<Bytes>, byte[]> rangeResults = cachingStore.findSessions(keyA, keyAA, 0, 60_000 * 2);
-        assertEquals(a1, rangeResults.next().key);
-        assertEquals(aa1, rangeResults.next().key);
-        assertEquals(a2, rangeResults.next().key);
-        assertEquals(a3, rangeResults.next().key);
-        assertEquals(aa3, rangeResults.next().key);
-        assertFalse(rangeResults.hasNext());
+        final KeyValueIterator<Windowed<Bytes>, byte[]> rangeResults = cachingStore.findSessions(keyA, keyAA, 0, SEGMENT_INTERVAL * 2);
+        final Set<Windowed<Bytes>> keys = new HashSet<>();
+        while (rangeResults.hasNext()) {
+            keys.add(rangeResults.next().key);
+        }
+        rangeResults.close();
+        assertEquals(mkSet(a1, a2, a3, aa1, aa3), keys);
     }
 
     @Test
@@ -226,25 +225,28 @@ public class CachingSessionStoreTest {
         final Windowed<Bytes> a = new Windowed<>(keyA, new SessionWindow(0, 0));
         final Windowed<String> aDeserialized = new Windowed<>("a", new SessionWindow(0, 0));
         final List<KeyValue<Windowed<String>, Change<String>>> flushed = new ArrayList<>();
-        cachingStore.setFlushListener(new CacheFlushListener<Windowed<String>, String>() {
-                @Override
-                public void apply(final Windowed<String> key, final String newValue, final String oldValue) {
-                    flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue)));
-                }
-            }, true);
-        
+        cachingStore.setFlushListener(
+            (key, newValue, oldValue) -> flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue))),
+            true
+        );
+
         cachingStore.put(a, "1".getBytes());
         cachingStore.flush();
-        
+
         cachingStore.put(a, "2".getBytes());
         cachingStore.flush();
 
         cachingStore.remove(a);
         cachingStore.flush();
 
-        assertEquals(flushed, Arrays.asList(KeyValue.pair(aDeserialized, new Change<>("1", null)),
-                                            KeyValue.pair(aDeserialized, new Change<>("2", "1")),
-                                            KeyValue.pair(aDeserialized, new Change<>(null, "2"))));
+        assertEquals(
+            flushed,
+            Arrays.asList(
+                KeyValue.pair(aDeserialized, new Change<>("1", null)),
+                KeyValue.pair(aDeserialized, new Change<>("2", "1")),
+                KeyValue.pair(aDeserialized, new Change<>(null, "2"))
+            )
+        );
     }
 
     @Test
@@ -252,12 +254,10 @@ public class CachingSessionStoreTest {
         final Windowed<Bytes> a = new Windowed<>(keyA, new SessionWindow(0, 0));
         final Windowed<String> aDeserialized = new Windowed<>("a", new SessionWindow(0, 0));
         final List<KeyValue<Windowed<String>, Change<String>>> flushed = new ArrayList<>();
-        cachingStore.setFlushListener(new CacheFlushListener<Windowed<String>, String>() {
-            @Override
-            public void apply(final Windowed<String> key, final String newValue, final String oldValue) {
-                flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue)));
-            }
-        }, false);
+        cachingStore.setFlushListener(
+            (key, newValue, oldValue) -> flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue))),
+            false
+        );
 
         cachingStore.put(a, "1".getBytes());
         cachingStore.flush();
@@ -268,9 +268,14 @@ public class CachingSessionStoreTest {
         cachingStore.remove(a);
         cachingStore.flush();
 
-        assertEquals(flushed, Arrays.asList(KeyValue.pair(aDeserialized, new Change<>("1", null)),
-                                            KeyValue.pair(aDeserialized, new Change<>("2", null)),
-                                            KeyValue.pair(aDeserialized, new Change<>(null, "2"))));
+        assertEquals(
+            flushed,
+            Arrays.asList(
+                KeyValue.pair(aDeserialized, new Change<>("1", null)),
+                KeyValue.pair(aDeserialized, new Change<>("2", null)),
+                KeyValue.pair(aDeserialized, new Change<>(null, "2"))
+            )
+        );
     }
 
     @Test
@@ -278,12 +283,10 @@ public class CachingSessionStoreTest {
         final Windowed<Bytes> a = new Windowed<>(keyA, new SessionWindow(0, 0));
         final Windowed<String> aDeserialized = new Windowed<>("a", new SessionWindow(0, 0));
         final List<KeyValue<Windowed<String>, Change<String>>> flushed = new ArrayList<>();
-        cachingStore.setFlushListener(new CacheFlushListener<Windowed<String>, String>() {
-            @Override
-            public void apply(final Windowed<String> key, final String newValue, final String oldValue) {
-                flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue)));
-            }
-        }, false);
+        cachingStore.setFlushListener(
+            (key, newValue, oldValue) -> flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue))),
+            false
+        );
 
         cachingStore.put(a, "1".getBytes());
         cachingStore.flush();
@@ -292,8 +295,13 @@ public class CachingSessionStoreTest {
         cachingStore.flush();
 
 
-        assertEquals(flushed, Arrays.asList(KeyValue.pair(aDeserialized, new Change<>("1", null)),
-                                            KeyValue.pair(aDeserialized, new Change<>("2", null))));
+        assertEquals(
+            flushed,
+            Arrays.asList(
+                KeyValue.pair(aDeserialized, new Change<>("1", null)),
+                KeyValue.pair(aDeserialized, new Change<>("2", null))
+            )
+        );
     }
 
     @Test
@@ -369,7 +377,7 @@ public class CachingSessionStoreTest {
         cachingStore.put(null, "1".getBytes());
     }
 
-    private List<KeyValue<Windowed<Bytes>, byte[]>> addSessionsUntilOverflow(final String...sessionIds) {
+    private List<KeyValue<Windowed<Bytes>, byte[]>> addSessionsUntilOverflow(final String... sessionIds) {
         final Random random = new Random();
         final List<KeyValue<Windowed<Bytes>, byte[]>> results = new ArrayList<>();
         while (cache.size() == results.size()) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index 118acec..3bcb1a2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -26,7 +26,6 @@ import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.test.InternalMockProcessorContext;
@@ -35,7 +34,6 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.IOException;
 import java.util.List;
 
 import static org.apache.kafka.common.utils.Utils.mkList;
@@ -55,7 +53,8 @@ public class CachingWindowStoreTest {
 
     private static final int MAX_CACHE_SIZE_BYTES = 150;
     private static final long DEFAULT_TIMESTAMP = 10L;
-    private static final Long WINDOW_SIZE = 10000L;
+    private static final Long WINDOW_SIZE = 10L;
+    private static final long SEGMENT_INTERVAL = 100L;
     private InternalMockProcessorContext context;
     private RocksDBSegmentedBytesStore underlying;
     private CachingWindowStore<String, String> cachingStore;
@@ -67,20 +66,14 @@ public class CachingWindowStoreTest {
     @Before
     public void setUp() {
         keySchema = new WindowKeySchema();
-        final int retention = 60_000;
-        final int segmentInterval = 60_000;
-        underlying = new RocksDBSegmentedBytesStore("test", retention, segmentInterval, keySchema);
+        underlying = new RocksDBSegmentedBytesStore("test", 0, SEGMENT_INTERVAL, keySchema);
         final RocksDBWindowStore<Bytes, byte[]> windowStore = new RocksDBWindowStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray(), false, WINDOW_SIZE);
         cacheListener = new CachingKeyValueStoreTest.CacheFlushListenerStub<>();
-        cachingStore = new CachingWindowStore<>(windowStore,
-                                                Serdes.String(),
-                                                Serdes.String(),
-                                                WINDOW_SIZE,
-                                                segmentInterval);
+        cachingStore = new CachingWindowStore<>(windowStore, Serdes.String(), Serdes.String(), WINDOW_SIZE, SEGMENT_INTERVAL);
         cachingStore.setFlushListener(cacheListener, false);
         cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
         topic = "topic";
-        context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache);
+        context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
         context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, topic, null));
         cachingStore.init(context, cachingStore);
     }
@@ -133,7 +126,7 @@ public class CachingWindowStoreTest {
         assertFalse(iterator.hasNext());
         assertEquals(2, cache.size());
     }
-    
+
     @Test
     public void shouldGetAllFromCache() {
         cachingStore.put(bytesKey("a"), bytesValue("a"));
@@ -146,46 +139,46 @@ public class CachingWindowStoreTest {
         cachingStore.put(bytesKey("h"), bytesValue("h"));
 
         final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = cachingStore.all();
-        String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"};
-        for (String s : array) {
+        final String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"};
+        for (final String s : array) {
             verifyWindowedKeyValue(iterator.next(), new Windowed<>(bytesKey(s), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), s);
         }
         assertFalse(iterator.hasNext());
     }
-    
+
     @Test
     public void shouldFetchAllWithinTimestampRange() {
-        String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"};
+        final String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"};
         for (int i = 0; i < array.length; i++) {
             context.setTime(i);
             cachingStore.put(bytesKey(array[i]), bytesValue(array[i]));
         }
-        
+
         final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = cachingStore.fetchAll(0, 7);
         for (int i = 0; i < array.length; i++) {
-            String str = array[i];
+            final String str = array[i];
             verifyWindowedKeyValue(iterator.next(), new Windowed<>(bytesKey(str), new TimeWindow(i, i + WINDOW_SIZE)), str);
         }
         assertFalse(iterator.hasNext());
-        
+
         final KeyValueIterator<Windowed<Bytes>, byte[]> iterator1 = cachingStore.fetchAll(2, 4);
         for (int i = 2; i <= 4; i++) {
-            String str = array[i];
+            final String str = array[i];
             verifyWindowedKeyValue(iterator1.next(), new Windowed<>(bytesKey(str), new TimeWindow(i, i + WINDOW_SIZE)), str);
         }
         assertFalse(iterator1.hasNext());
-        
+
         final KeyValueIterator<Windowed<Bytes>, byte[]> iterator2 = cachingStore.fetchAll(5, 7);
         for (int i = 5; i <= 7; i++) {
-            String str = array[i];
+            final String str = array[i];
             verifyWindowedKeyValue(iterator2.next(), new Windowed<>(bytesKey(str), new TimeWindow(i, i + WINDOW_SIZE)), str);
         }
         assertFalse(iterator2.hasNext());
     }
 
     @Test
-    public void shouldFlushEvictedItemsIntoUnderlyingStore() throws IOException {
-        int added = addItemsToCache();
+    public void shouldFlushEvictedItemsIntoUnderlyingStore() {
+        final int added = addItemsToCache();
         // all dirty entries should have been flushed
         final KeyValueIterator<Bytes, byte[]> iter = underlying.fetch(Bytes.wrap("0".getBytes()), DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP);
         final KeyValue<Bytes, byte[]> next = iter.next();
@@ -228,8 +221,8 @@ public class CachingWindowStoreTest {
     }
 
     @Test
-    public void shouldForwardDirtyItemToListenerWhenEvicted() throws IOException {
-        int numRecords = addItemsToCache();
+    public void shouldForwardDirtyItemToListenerWhenEvicted() {
+        final int numRecords = addItemsToCache();
         assertEquals(numRecords, cacheListener.forwarded.size());
     }
 
@@ -257,7 +250,7 @@ public class CachingWindowStoreTest {
 
     @Test
     public void shouldIterateCacheAndStore() {
-        final Bytes key = Bytes.wrap("1" .getBytes());
+        final Bytes key = Bytes.wrap("1".getBytes());
         underlying.put(WindowKeySchema.toStoreKeyBinary(key, DEFAULT_TIMESTAMP, 0), "a".getBytes());
         cachingStore.put(key, bytesValue("b"), DEFAULT_TIMESTAMP + WINDOW_SIZE);
         final WindowStoreIterator<byte[]> fetch = cachingStore.fetch(bytesKey("1"), DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE);
@@ -268,7 +261,7 @@ public class CachingWindowStoreTest {
 
     @Test
     public void shouldIterateCacheAndStoreKeyRange() {
-        final Bytes key = Bytes.wrap("1" .getBytes());
+        final Bytes key = Bytes.wrap("1".getBytes());
         underlying.put(WindowKeySchema.toStoreKeyBinary(key, DEFAULT_TIMESTAMP, 0), "a".getBytes());
         cachingStore.put(key, bytesValue("b"), DEFAULT_TIMESTAMP + WINDOW_SIZE);
 
@@ -311,9 +304,13 @@ public class CachingWindowStoreTest {
         cachingStore.put(bytesKey("aa"), bytesValue("0002"), 0);
         cachingStore.put(bytesKey("a"), bytesValue("0003"), 1);
         cachingStore.put(bytesKey("aa"), bytesValue("0004"), 1);
-        cachingStore.put(bytesKey("a"), bytesValue("0005"), 60000);
+        cachingStore.put(bytesKey("a"), bytesValue("0005"), SEGMENT_INTERVAL);
 
-        final List<KeyValue<Long, byte[]>> expected = mkList(KeyValue.pair(0L, bytesValue("0001")), KeyValue.pair(1L, bytesValue("0003")), KeyValue.pair(60000L, bytesValue("0005")));
+        final List<KeyValue<Long, byte[]>> expected = mkList(
+            KeyValue.pair(0L, bytesValue("0001")),
+            KeyValue.pair(1L, bytesValue("0003")),
+            KeyValue.pair(SEGMENT_INTERVAL, bytesValue("0005"))
+        );
         final List<KeyValue<Long, byte[]>> actual = toList(cachingStore.fetch(bytesKey("a"), 0, Long.MAX_VALUE));
         verifyKeyValueList(expected, actual);
     }
@@ -324,16 +321,32 @@ public class CachingWindowStoreTest {
         cachingStore.put(bytesKey("aa"), bytesValue("0002"), 0);
         cachingStore.put(bytesKey("a"), bytesValue("0003"), 1);
         cachingStore.put(bytesKey("aa"), bytesValue("0004"), 1);
-        cachingStore.put(bytesKey("a"), bytesValue("0005"), 60000);
-
-        verifyKeyValueList(mkList(windowedPair("a", "0001", 0), windowedPair("a", "0003", 1), windowedPair("a", "0005", 60000L)),
-                           toList(cachingStore.fetch(bytesKey("a"), bytesKey("a"), 0, Long.MAX_VALUE)));
-
-        verifyKeyValueList(mkList(windowedPair("aa", "0002", 0), windowedPair("aa", "0004", 1)),
-                           toList(cachingStore.fetch(bytesKey("aa"), bytesKey("aa"), 0, Long.MAX_VALUE)));
-
-        verifyKeyValueList(mkList(windowedPair("a", "0001", 0), windowedPair("a", "0003", 1), windowedPair("aa", "0002", 0), windowedPair("aa", "0004", 1), windowedPair("a", "0005", 60000L)),
-                           toList(cachingStore.fetch(bytesKey("a"), bytesKey("aa"), 0, Long.MAX_VALUE)));
+        cachingStore.put(bytesKey("a"), bytesValue("0005"), SEGMENT_INTERVAL);
+
+        verifyKeyValueList(
+            mkList(
+                windowedPair("a", "0001", 0),
+                windowedPair("a", "0003", 1),
+                windowedPair("a", "0005", SEGMENT_INTERVAL)
+            ),
+            toList(cachingStore.fetch(bytesKey("a"), bytesKey("a"), 0, Long.MAX_VALUE))
+        );
+
+        verifyKeyValueList(
+            mkList(windowedPair("aa", "0002", 0), windowedPair("aa", "0004", 1)),
+            toList(cachingStore.fetch(bytesKey("aa"), bytesKey("aa"), 0, Long.MAX_VALUE))
+        );
+
+        verifyKeyValueList(
+            mkList(
+                windowedPair("a", "0001", 0),
+                windowedPair("a", "0003", 1),
+                windowedPair("aa", "0002", 0),
+                windowedPair("aa", "0004", 1),
+                windowedPair("a", "0005", SEGMENT_INTERVAL)
+            ),
+            toList(cachingStore.fetch(bytesKey("a"), bytesKey("aa"), 0, Long.MAX_VALUE))
+        );
     }
 
     @Test(expected = NullPointerException.class)
@@ -361,7 +374,7 @@ public class CachingWindowStoreTest {
         cachingStore.fetch(bytesKey("anyFrom"), null, 1L, 2L);
     }
 
-    private static KeyValue<Windowed<Bytes>, byte[]> windowedPair(String key, String value, long timestamp) {
+    private static KeyValue<Windowed<Bytes>, byte[]> windowedPair(final String key, final String value, final long timestamp) {
         return KeyValue.pair(new Windowed<>(bytesKey(key), new TimeWindow(timestamp, timestamp + WINDOW_SIZE)), bytesValue(value));
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index 8e69ccb..cffd73f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -38,13 +38,10 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
 import org.rocksdb.WriteBatch;
 
-import static org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize;
-
-
 import java.io.File;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -52,12 +49,13 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.SimpleTimeZone;
 
+import static org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
@@ -68,14 +66,13 @@ import static org.junit.Assert.assertTrue;
 @RunWith(Parameterized.class)
 public class RocksDBSegmentedBytesStoreTest {
 
+    private final long windowSizeForTimeWindow = 500;
     private final long retention = 1000;
-    private final long segmentInterval = 60_000;
-    private final int numSegments = 3;
+    private final long segmentInterval = 60_000L;
     private InternalMockProcessorContext context;
     private final String storeName = "bytes-store";
     private RocksDBSegmentedBytesStore bytesStore;
     private File stateDir;
-    private long windowSizeForTimeWindow = 500;
     private final Window[] windows = new Window[4];
 
     @Parameter
@@ -83,7 +80,7 @@ public class RocksDBSegmentedBytesStoreTest {
 
     @Parameters(name = "{0}")
     public static Object[] getKeySchemas() {
-        return new Object[]{new SessionKeySchema(), new WindowKeySchema()};
+        return new Object[] {new SessionKeySchema(), new WindowKeySchema()};
     }
 
     @Before
@@ -94,29 +91,32 @@ public class RocksDBSegmentedBytesStoreTest {
             windows[0] = new SessionWindow(10, 10);
             windows[1] = new SessionWindow(500, 1000);
             windows[2] = new SessionWindow(1000, 1500);
-            windows[3] = new SessionWindow(30000, 60000);
+            windows[3] = new SessionWindow(30_000L, 60_000L);
         }
         if (schema instanceof WindowKeySchema) {
 
             windows[0] = timeWindowForSize(10, windowSizeForTimeWindow);
             windows[1] = timeWindowForSize(500, windowSizeForTimeWindow);
             windows[2] = timeWindowForSize(1000, windowSizeForTimeWindow);
-            windows[3] = timeWindowForSize(60000, windowSizeForTimeWindow);
+            windows[3] = timeWindowForSize(60_000L, windowSizeForTimeWindow);
         }
 
 
-        bytesStore = new RocksDBSegmentedBytesStore(storeName,
-                retention,
-                segmentInterval,
-                schema);
+        bytesStore = new RocksDBSegmentedBytesStore(
+            storeName,
+            retention,
+            segmentInterval,
+            schema
+        );
 
         stateDir = TestUtils.tempDirectory();
         context = new InternalMockProcessorContext(
-                stateDir,
-                Serdes.String(),
-                Serdes.Long(),
-                new NoOpRecordCollector(),
-                new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
+            stateDir,
+            Serdes.String(),
+            Serdes.Long(),
+            new NoOpRecordCollector(),
+            new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))
+        );
         bytesStore.init(context, bytesStore);
     }
 
@@ -134,8 +134,10 @@ public class RocksDBSegmentedBytesStoreTest {
 
         final KeyValueIterator<Bytes, byte[]> values = bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 500);
 
-        final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 10L),
-                KeyValue.pair(new Windowed<>(key, windows[1]), 50L));
+        final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
+            KeyValue.pair(new Windowed<>(key, windows[0]), 10L),
+            KeyValue.pair(new Windowed<>(key, windows[1]), 50L)
+        );
 
         assertEquals(expected, toList(values));
     }
@@ -147,8 +149,10 @@ public class RocksDBSegmentedBytesStoreTest {
         bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), serializeValue(50));
         bytesStore.put(serializeKey(new Windowed<>(key, windows[2])), serializeValue(100));
         final KeyValueIterator<Bytes, byte[]> results = bytesStore.fetch(Bytes.wrap(key.getBytes()), 1, 999);
-        final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 10L),
-                KeyValue.pair(new Windowed<>(key, windows[1]), 50L));
+        final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
+            KeyValue.pair(new Windowed<>(key, windows[0]), 10L),
+            KeyValue.pair(new Windowed<>(key, windows[1]), 50L)
+        );
 
         assertEquals(expected, toList(results));
     }
@@ -181,9 +185,14 @@ public class RocksDBSegmentedBytesStoreTest {
 
         final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 1500));
 
-        assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
+        assertEquals(
+            Arrays.asList(
+                KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
                 KeyValue.pair(new Windowed<>(key, windows[1]), 100L),
-                KeyValue.pair(new Windowed<>(key, windows[2]), 500L)), results);
+                KeyValue.pair(new Windowed<>(key, windows[2]), 500L)
+            ),
+            results
+        );
 
     }
 
@@ -198,13 +207,22 @@ public class RocksDBSegmentedBytesStoreTest {
         assertEquals(Collections.singleton(segments.segmentName(0)), segmentDirs());
 
         bytesStore.put(serializeKey(new Windowed<>(key, windows[3])), serializeValue(100L));
-        assertEquals(Utils.mkSet(segments.segmentName(0),
-                segments.segmentName(1)), segmentDirs());
+        assertEquals(
+            Utils.mkSet(
+                segments.segmentName(0),
+                segments.segmentName(1)
+            ),
+            segmentDirs()
+        );
 
         final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
-        assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
+        assertEquals(
+            Arrays.asList(
+                KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
                 KeyValue.pair(new Windowed<>(key, windows[3]), 100L)
-        ), results);
+            ),
+            results
+        );
 
     }
 
@@ -218,18 +236,27 @@ public class RocksDBSegmentedBytesStoreTest {
         assertEquals(Collections.singleton(segments.segmentName(0)), segmentDirs());
 
         bytesStore.put(serializeKey(new Windowed<>(key, windows[3])), serializeValue(100L));
-        assertEquals(Utils.mkSet(segments.segmentName(0),
-                segments.segmentName(1)), segmentDirs());
-
-        final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetchAll(0L, 60000L));
-        assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
+        assertEquals(
+            Utils.mkSet(
+                segments.segmentName(0),
+                segments.segmentName(1)
+            ),
+            segmentDirs()
+        );
+
+        final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetchAll(0L, 60_000L));
+        assertEquals(
+            Arrays.asList(
+                KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
                 KeyValue.pair(new Windowed<>(key, windows[3]), 100L)
-        ), results);
+            ),
+            results
+        );
 
     }
 
     @Test
-    public void shouldLoadSegementsWithOldStyleDateFormattedName() {
+    public void shouldLoadSegmentsWithOldStyleDateFormattedName() {
         final Segments segments = new Segments(storeName, retention, segmentInterval);
         final String key = "a";
 
@@ -247,20 +274,29 @@ public class RocksDBSegmentedBytesStoreTest {
         final File oldStyleName = new File(parent, nameParts[0] + "-" + formatted);
         assertTrue(new File(parent, firstSegmentName).renameTo(oldStyleName));
 
-        bytesStore = new RocksDBSegmentedBytesStore(storeName,
-                retention,
-                segmentInterval,
-                schema);
+        bytesStore = new RocksDBSegmentedBytesStore(
+            storeName,
+            retention,
+            segmentInterval,
+            schema
+        );
 
         bytesStore.init(context, bytesStore);
-        final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0L, 60000L));
-        assertThat(results, equalTo(Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
-                KeyValue.pair(new Windowed<>(key, windows[3]), 100L))));
+        final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0L, 60_000L));
+        assertThat(
+            results,
+            equalTo(
+                Arrays.asList(
+                    KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
+                    KeyValue.pair(new Windowed<>(key, windows[3]), 100L)
+                )
+            )
+        );
     }
 
 
     @Test
-    public void shouldLoadSegementsWithOldStyleColonFormattedName() {
+    public void shouldLoadSegmentsWithOldStyleColonFormattedName() {
         final Segments segments = new Segments(storeName, retention, segmentInterval);
         final String key = "a";
 
@@ -274,15 +310,24 @@ public class RocksDBSegmentedBytesStoreTest {
         final File oldStyleName = new File(parent, nameParts[0] + ":" + Long.parseLong(nameParts[1]));
         assertTrue(new File(parent, firstSegmentName).renameTo(oldStyleName));
 
-        bytesStore = new RocksDBSegmentedBytesStore(storeName,
-                retention,
-                segmentInterval,
-                schema);
+        bytesStore = new RocksDBSegmentedBytesStore(
+            storeName,
+            retention,
+            segmentInterval,
+            schema
+        );
 
         bytesStore.init(context, bytesStore);
-        final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0L, 60000L));
-        assertThat(results, equalTo(Arrays.asList(KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
-                KeyValue.pair(new Windowed<>(key, windows[3]), 100L))));
+        final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0L, 60_000L));
+        assertThat(
+            results,
+            equalTo(
+                Arrays.asList(
+                    KeyValue.pair(new Windowed<>(key, windows[0]), 50L),
+                    KeyValue.pair(new Windowed<>(key, windows[3]), 100L)
+                )
+            )
+        );
     }
 
 
@@ -304,7 +349,7 @@ public class RocksDBSegmentedBytesStoreTest {
         records.add(new KeyValue<>(serializeKey(new Windowed<>(key, windows[3])).get(), serializeValue(100L)));
         final Map<Segment, WriteBatch> writeBatchMap = bytesStore.getWriteBatches(records);
         assertEquals(2, writeBatchMap.size());
-        for (final WriteBatch batch: writeBatchMap.values()) {
+        for (final WriteBatch batch : writeBatchMap.values()) {
             assertEquals(1, batch.count());
         }
     }
@@ -323,7 +368,7 @@ public class RocksDBSegmentedBytesStoreTest {
         assertEquals(2, bytesStore.getSegments().size());
 
         // Bulk loading is enabled during recovery.
-        for (final Segment segment: bytesStore.getSegments()) {
+        for (final Segment segment : bytesStore.getSegments()) {
             Assert.assertThat(segment.getOptions().level0FileNumCompactionTrigger(), equalTo(1 << 30));
         }
 
@@ -347,20 +392,20 @@ public class RocksDBSegmentedBytesStoreTest {
 
         restoreListener.onRestoreStart(null, bytesStore.name(), 0L, 0L);
 
-        for (final Segment segment: bytesStore.getSegments()) {
+        for (final Segment segment : bytesStore.getSegments()) {
             Assert.assertThat(segment.getOptions().level0FileNumCompactionTrigger(), equalTo(1 << 30));
         }
 
         restoreListener.onRestoreEnd(null, bytesStore.name(), 0L);
-        for (final Segment segment: bytesStore.getSegments()) {
+        for (final Segment segment : bytesStore.getSegments()) {
             Assert.assertThat(segment.getOptions().level0FileNumCompactionTrigger(), equalTo(4));
         }
     }
 
     private Set<String> segmentDirs() {
-        File windowDir = new File(stateDir, storeName);
+        final File windowDir = new File(stateDir, storeName);
 
-        return new HashSet<>(Arrays.asList(windowDir.list()));
+        return Utils.mkSet(Objects.requireNonNull(windowDir.list()));
     }
 
     private byte[] serializeValue(final long value) {
@@ -383,14 +428,14 @@ public class RocksDBSegmentedBytesStoreTest {
             final KeyValue<Bytes, byte[]> next = iterator.next();
             if (schema instanceof WindowKeySchema) {
                 final KeyValue<Windowed<String>, Long> deserialized = KeyValue.pair(
-                        WindowKeySchema.fromStoreKey(next.key.get(), windowSizeForTimeWindow, stateSerdes),
-                        stateSerdes.valueDeserializer().deserialize("dummy", next.value)
+                    WindowKeySchema.fromStoreKey(next.key.get(), windowSizeForTimeWindow, stateSerdes),
+                    stateSerdes.valueDeserializer().deserialize("dummy", next.value)
                 );
                 results.add(deserialized);
             } else {
                 final KeyValue<Windowed<String>, Long> deserialized = KeyValue.pair(
-                        SessionKeySchema.from(next.key.get(), stateSerdes.keyDeserializer(), "dummy"),
-                        stateSerdes.valueDeserializer().deserialize("dummy", next.value)
+                    SessionKeySchema.from(next.key.get(), stateSerdes.keyDeserializer(), "dummy"),
+                    stateSerdes.valueDeserializer().deserialize("dummy", next.value)
                 );
                 results.add(deserialized);
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index ac481a7..b0057e5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -68,9 +68,9 @@ public class RocksDBWindowStoreTest {
 
     private final int numSegments = 3;
     private final long windowSize = 3L;
-    private final String windowName = "window";
-    private final long segmentInterval = 60_000;
+    private final long segmentInterval = 600L;
     private final long retentionPeriod = segmentInterval * (numSegments - 1);
+    private final String windowName = "window";
     private final Segments segments = new Segments(windowName, retentionPeriod, segmentInterval);
     private final StateSerdes<Integer, String> serdes = new StateSerdes<>("", Serdes.Integer(), Serdes.String());
 
@@ -145,8 +145,8 @@ public class RocksDBWindowStoreTest {
         windowStore.put(1, "four");
 
         // should only have 2 values as the first segment is no longer open
-        assertEquals(new KeyValue<>(60000L, "two"), iterator.next());
-        assertEquals(new KeyValue<>(120000L, "three"), iterator.next());
+        assertEquals(new KeyValue<>(segmentInterval, "two"), iterator.next());
+        assertEquals(new KeyValue<>(2 * segmentInterval, "three"), iterator.next());
         assertFalse(iterator.hasNext());
     }
 
@@ -639,7 +639,7 @@ public class RocksDBWindowStoreTest {
             segmentDirs(baseDir)
         );
 
-        setCurrentTime(59999);
+        setCurrentTime(segmentInterval - 1);
         windowStore.put(0, "v");
         windowStore.put(0, "v");
         assertEquals(
@@ -647,7 +647,7 @@ public class RocksDBWindowStoreTest {
             segmentDirs(baseDir)
         );
 
-        setCurrentTime(60000);
+        setCurrentTime(segmentInterval);
         windowStore.put(0, "v");
         assertEquals(
             Utils.mkSet(segments.segmentName(0L), segments.segmentName(1L)),
@@ -657,7 +657,7 @@ public class RocksDBWindowStoreTest {
         WindowStoreIterator iter;
         int fetchedCount;
 
-        iter = windowStore.fetch(0, 0L, 240000L);
+        iter = windowStore.fetch(0, 0L, segmentInterval * 4);
         fetchedCount = 0;
         while (iter.hasNext()) {
             iter.next();
@@ -670,10 +670,10 @@ public class RocksDBWindowStoreTest {
             segmentDirs(baseDir)
         );
 
-        setCurrentTime(180000);
+        setCurrentTime(segmentInterval * 3);
         windowStore.put(0, "v");
 
-        iter = windowStore.fetch(0, 0L, 240000L);
+        iter = windowStore.fetch(0, 0L, segmentInterval * 4);
         fetchedCount = 0;
         while (iter.hasNext()) {
             iter.next();
@@ -686,10 +686,10 @@ public class RocksDBWindowStoreTest {
             segmentDirs(baseDir)
         );
 
-        setCurrentTime(300000);
+        setCurrentTime(segmentInterval * 5);
         windowStore.put(0, "v");
 
-        iter = windowStore.fetch(0, 240000L, 1000000L);
+        iter = windowStore.fetch(0, segmentInterval * 4, segmentInterval * 10);
         fetchedCount = 0;
         while (iter.hasNext()) {
             iter.next();
@@ -847,9 +847,9 @@ public class RocksDBWindowStoreTest {
 
         windowStore.init(context, windowStore);
 
-        final Bytes key1 = Bytes.wrap(new byte[]{0});
-        final Bytes key2 = Bytes.wrap(new byte[]{0, 0});
-        final Bytes key3 = Bytes.wrap(new byte[]{0, 0, 0});
+        final Bytes key1 = Bytes.wrap(new byte[] {0});
+        final Bytes key2 = Bytes.wrap(new byte[] {0, 0});
+        final Bytes key3 = Bytes.wrap(new byte[] {0, 0, 0});
         windowStore.put(key1, "1", 0);
         windowStore.put(key2, "2", 0);
         windowStore.put(key3, "3", 0);
@@ -924,11 +924,7 @@ public class RocksDBWindowStoreTest {
             final Integer key = WindowKeySchema.extractStoreKey(entry.key, serdes);
             final String value = entry.value == null ? null : serdes.valueFrom(entry.value);
 
-            Set<String> entries = entriesByKey.get(key);
-            if (entries == null) {
-                entries = new HashSet<>();
-                entriesByKey.put(key, entries);
-            }
+            final Set<String> entries = entriesByKey.computeIfAbsent(key, k -> new HashSet<>());
             entries.add(value + "@" + (timestamp - startTime));
         }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
index 1fc0853..efed24f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
@@ -44,22 +44,24 @@ import static org.junit.Assert.assertTrue;
 public class SegmentsTest {
 
     private static final int NUM_SEGMENTS = 5;
+    private static final long SEGMENT_INTERVAL = 100L;
+    private static final long RETENTION_PERIOD = 4 * SEGMENT_INTERVAL;
     private InternalMockProcessorContext context;
     private Segments segments;
-    private final long segmentInterval = 60_000L;
     private File stateDirectory;
-    private String storeName = "test";
-    private final int retentionPeriod =  4 * 60 * 1000;
+    private final String storeName = "test";
 
     @Before
     public void createContext() {
         stateDirectory = TestUtils.tempDirectory();
-        context = new InternalMockProcessorContext(stateDirectory,
-                                           Serdes.String(),
-                                           Serdes.Long(),
-                                           new NoOpRecordCollector(),
-                                           new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
-        segments = new Segments(storeName, retentionPeriod, segmentInterval);
+        context = new InternalMockProcessorContext(
+            stateDirectory,
+            Serdes.String(),
+            Serdes.Long(),
+            new NoOpRecordCollector(),
+            new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))
+        );
+        segments = new Segments(storeName, RETENTION_PERIOD, SEGMENT_INTERVAL);
     }
 
     @After
@@ -70,24 +72,24 @@ public class SegmentsTest {
     @Test
     public void shouldGetSegmentIdsFromTimestamp() {
         assertEquals(0, segments.segmentId(0));
-        assertEquals(1, segments.segmentId(60000));
-        assertEquals(2, segments.segmentId(120000));
-        assertEquals(3, segments.segmentId(180000));
+        assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
+        assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
+        assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
     }
 
     @Test
     public void shouldBaseSegmentIntervalOnRetentionAndNumSegments() {
-        final Segments segments = new Segments("test", 8 * 60 * 1000, 120_000);
+        final Segments segments = new Segments("test", 8 * SEGMENT_INTERVAL, 2 * SEGMENT_INTERVAL);
         assertEquals(0, segments.segmentId(0));
-        assertEquals(0, segments.segmentId(60000));
-        assertEquals(1, segments.segmentId(120000));
+        assertEquals(0, segments.segmentId(SEGMENT_INTERVAL));
+        assertEquals(1, segments.segmentId(2 * SEGMENT_INTERVAL));
     }
 
     @Test
-    public void shouldGetSegmentNameFromId() throws Exception {
+    public void shouldGetSegmentNameFromId() {
         assertEquals("test.0", segments.segmentName(0));
-        assertEquals("test." + segmentInterval, segments.segmentName(1));
-        assertEquals("test." + 2 * segmentInterval, segments.segmentName(2));
+        assertEquals("test." + SEGMENT_INTERVAL, segments.segmentName(1));
+        assertEquals("test." + 2 * SEGMENT_INTERVAL, segments.segmentName(2));
     }
 
     @Test
@@ -96,11 +98,11 @@ public class SegmentsTest {
         final Segment segment2 = segments.getOrCreateSegmentIfLive(1, context);
         final Segment segment3 = segments.getOrCreateSegmentIfLive(2, context);
         assertTrue(new File(context.stateDir(), "test/test.0").isDirectory());
-        assertTrue(new File(context.stateDir(), "test/test." + segmentInterval).isDirectory());
-        assertTrue(new File(context.stateDir(), "test/test." + 2 * segmentInterval).isDirectory());
-        assertEquals(true, segment1.isOpen());
-        assertEquals(true, segment2.isOpen());
-        assertEquals(true, segment3.isOpen());
+        assertTrue(new File(context.stateDir(), "test/test." + SEGMENT_INTERVAL).isDirectory());
+        assertTrue(new File(context.stateDir(), "test/test." + 2 * SEGMENT_INTERVAL).isDirectory());
+        assertTrue(segment1.isOpen());
+        assertTrue(segment2.isOpen());
+        assertTrue(segment3.isOpen());
     }
 
     @Test
@@ -114,14 +116,14 @@ public class SegmentsTest {
     public void shouldCleanupSegmentsThatHaveExpired() {
         final Segment segment1 = segments.getOrCreateSegmentIfLive(0, context);
         final Segment segment2 = segments.getOrCreateSegmentIfLive(1, context);
-        context.setStreamTime(segmentInterval * 7);
+        context.setStreamTime(SEGMENT_INTERVAL * 7);
         final Segment segment3 = segments.getOrCreateSegmentIfLive(7, context);
         assertFalse(segment1.isOpen());
         assertFalse(segment2.isOpen());
         assertTrue(segment3.isOpen());
         assertFalse(new File(context.stateDir(), "test/test.0").exists());
-        assertFalse(new File(context.stateDir(), "test/test." + segmentInterval).exists());
-        assertTrue(new File(context.stateDir(), "test/test." + 7 * segmentInterval).exists());
+        assertFalse(new File(context.stateDir(), "test/test." + SEGMENT_INTERVAL).exists());
+        assertTrue(new File(context.stateDir(), "test/test." + 7 * SEGMENT_INTERVAL).exists());
     }
 
     @Test
@@ -151,6 +153,7 @@ public class SegmentsTest {
 
     @Test
     public void shouldOpenExistingSegments() {
+        segments = new Segments("test", 4, 1);
         segments.getOrCreateSegmentIfLive(0, context);
         segments.getOrCreateSegmentIfLive(1, context);
         segments.getOrCreateSegmentIfLive(2, context);
@@ -159,7 +162,7 @@ public class SegmentsTest {
         // close existing.
         segments.close();
 
-        segments = new Segments("test", 4 * 60 * 1000, 60_000);
+        segments = new Segments("test", 4, 1);
         segments.openExisting(context);
 
         assertTrue(segments.getSegmentForTimestamp(0).isOpen());
@@ -182,7 +185,7 @@ public class SegmentsTest {
         segments.getOrCreateSegmentIfLive(3, context);
         segments.getOrCreateSegmentIfLive(4, context);
 
-        final List<Segment> segments = this.segments.segments(0, 2 * 60 * 1000);
+        final List<Segment> segments = this.segments.segments(0, 2 * SEGMENT_INTERVAL);
         assertEquals(3, segments.size());
         assertEquals(0, segments.get(0).id);
         assertEquals(1, segments.get(1).id);
@@ -190,14 +193,14 @@ public class SegmentsTest {
     }
 
     @Test
-    public void shouldGetSegmentsWithinTimeRangeOutOfOrder() throws Exception {
+    public void shouldGetSegmentsWithinTimeRangeOutOfOrder() {
         updateStreamTimeAndCreateSegment(4);
         updateStreamTimeAndCreateSegment(2);
         updateStreamTimeAndCreateSegment(0);
         updateStreamTimeAndCreateSegment(1);
         updateStreamTimeAndCreateSegment(3);
 
-        final List<Segment> segments = this.segments.segments(0, 2 * 60 * 1000);
+        final List<Segment> segments = this.segments.segments(0, 2 * SEGMENT_INTERVAL);
         assertEquals(3, segments.size());
         assertEquals(0, segments.get(0).id);
         assertEquals(1, segments.get(1).id);
@@ -241,14 +244,19 @@ public class SegmentsTest {
     }
 
     private void updateStreamTimeAndCreateSegment(final int segment) {
-        context.setStreamTime(segmentInterval * segment);
+        context.setStreamTime(SEGMENT_INTERVAL * segment);
         segments.getOrCreateSegmentIfLive(segment, context);
     }
 
     @Test
     public void shouldUpdateSegmentFileNameFromOldDateFormatToNewFormat() throws Exception {
+        final long segmentInterval = 60_000L; // the old segment file's naming system maxes out at 1 minute granularity.
+
+        segments = new Segments(storeName, NUM_SEGMENTS * segmentInterval, segmentInterval);
+
         final String storeDirectoryPath = stateDirectory.getAbsolutePath() + File.separator + storeName;
         final File storeDirectory = new File(storeDirectoryPath);
+        //noinspection ResultOfMethodCallIgnored
         storeDirectory.mkdirs();
 
         final SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmm");
@@ -256,13 +264,15 @@ public class SegmentsTest {
 
         for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) {
             final File oldSegment = new File(storeDirectoryPath + File.separator + storeName + "-" + formatter.format(new Date(segmentId * segmentInterval)));
+            //noinspection ResultOfMethodCallIgnored
             oldSegment.createNewFile();
         }
 
         segments.openExisting(context);
 
         for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) {
-            final File newSegment = new File(storeDirectoryPath + File.separator + storeName + "." + segmentId * (retentionPeriod / (NUM_SEGMENTS - 1)));
+            final String segmentName = storeName + "." + (long) segmentId * segmentInterval;
+            final File newSegment = new File(storeDirectoryPath + File.separator + segmentName);
             assertTrue(newSegment.exists());
         }
     }
@@ -271,17 +281,19 @@ public class SegmentsTest {
     public void shouldUpdateSegmentFileNameFromOldColonFormatToNewFormat() throws Exception {
         final String storeDirectoryPath = stateDirectory.getAbsolutePath() + File.separator + storeName;
         final File storeDirectory = new File(storeDirectoryPath);
+        //noinspection ResultOfMethodCallIgnored
         storeDirectory.mkdirs();
 
         for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) {
-            final File oldSegment = new File(storeDirectoryPath + File.separator + storeName + ":" + segmentId * (retentionPeriod / (NUM_SEGMENTS - 1)));
+            final File oldSegment = new File(storeDirectoryPath + File.separator + storeName + ":" + segmentId * (RETENTION_PERIOD / (NUM_SEGMENTS - 1)));
+            //noinspection ResultOfMethodCallIgnored
             oldSegment.createNewFile();
         }
 
         segments.openExisting(context);
 
         for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) {
-            final File newSegment = new File(storeDirectoryPath + File.separator + storeName + "." + segmentId * (retentionPeriod / (NUM_SEGMENTS - 1)));
+            final File newSegment = new File(storeDirectoryPath + File.separator + storeName + "." + segmentId * (RETENTION_PERIOD / (NUM_SEGMENTS - 1)));
             assertTrue(newSegment.exists());
         }
     }
@@ -292,6 +304,7 @@ public class SegmentsTest {
         segments.close();
         assertThat(segments.getSegmentForTimestamp(0), is(nullValue()));
     }
+
     private void verifyCorrectSegments(final long first, final int numSegments) {
         final List<Segment> result = this.segments.segments(0, Long.MAX_VALUE);
         assertEquals(numSegments, result.size());


Mime
View raw message