kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-6360: Clear RocksDB Segments when store is closed
Date Thu, 14 Dec 2017 17:51:59 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk d1a9252ca -> 1cf7ec87d


KAFKA-6360: Clear RocksDB Segments when store is closed

Now that we support re-initializing state stores, we need to clear the segments when the store
is closed so that they can be re-opened.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Bill Bejeck <bbejeck@gmail.com>, Guozhang Wang <wangguoz@gmail.com>,
Ted Yu <yuzhihong@gmail.com>

Closes #4324 from dguy/kafka-6360


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1cf7ec87
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1cf7ec87
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1cf7ec87

Branch: refs/heads/trunk
Commit: 1cf7ec87d3afeaab7b4f40d387d34e91e9616e64
Parents: d1a9252
Author: Damian Guy <damian.guy@gmail.com>
Authored: Thu Dec 14 09:51:56 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Dec 14 09:51:56 2017 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/streams/state/internals/Segments.java  | 1 +
 .../state/internals/RocksDBSegmentedBytesStoreTest.java     | 9 +++++++++
 .../apache/kafka/streams/state/internals/SegmentsTest.java  | 9 +++++++++
 3 files changed, 19 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1cf7ec87/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
index 2cf0913..7b3336a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
@@ -169,6 +169,7 @@ class Segments {
         for (Segment segment : segments.values()) {
             segment.close();
         }
+        segments.clear();
     }
 
     private Segment getSegment(long segmentId) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1cf7ec87/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index d8d291c..df50b2c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -258,6 +258,15 @@ public class RocksDBSegmentedBytesStoreTest {
             KeyValue.pair(new Windowed<>(key, new SessionWindow(30000L, 60000L)), 100L))));
     }
 
+    @Test
+    public void shouldBeAbleToWriteToReInitializedStore() {
+        final String key = "a";
+        // need to create a segment so we can attempt to write to it again.
+        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))),
serializeValue(50L));
+        bytesStore.close();
+        bytesStore.init(context, bytesStore);
+        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))),
serializeValue(50L));
+    }
 
     private Set<String> segmentDirs() {
         File windowDir = new File(stateDir, storeName);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1cf7ec87/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
index 65cfb21..46606de 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
@@ -33,6 +33,9 @@ import java.util.Date;
 import java.util.List;
 import java.util.SimpleTimeZone;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -250,6 +253,12 @@ public class SegmentsTest {
         }
     }
 
+    @Test
+    public void shouldClearSegmentsOnClose() {
+        segments.getOrCreateSegment(0, context);
+        segments.close();
+        assertThat(segments.getSegmentForTimestamp(0), is(nullValue()));
+    }
     private void verifyCorrectSegments(final long first, final int numSegments) {
         final List<Segment> result = this.segments.segments(0, Long.MAX_VALUE);
         assertEquals(numSegments, result.size());


Mime
View raw message