kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: KAFKA-5515; Remove date formatting from Segments
Date Mon, 18 Sep 2017 11:12:01 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk be6252d8e -> ed0e69214


KAFKA-5515; Remove date formatting from Segments

Remove date formatting from `Segments` and use the `segementId` instead.
Add tests to make sure can load old segments.
Rename old segment dirs to new formatting at load time.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: tedyu <yuzhihong@gmail.com>, Matthias J. Sax <matthias@confluent.io>,
Guozhang Wang <wangguoz@gmail.com>

Closes #3783 from dguy/kafka-5515


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ed0e6921
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ed0e6921
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ed0e6921

Branch: refs/heads/trunk
Commit: ed0e692147d81e396bf10f4d9425516d51bd52cc
Parents: be6252d
Author: Damian Guy <damian.guy@gmail.com>
Authored: Mon Sep 18 12:11:56 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Mon Sep 18 12:11:56 2017 +0100

----------------------------------------------------------------------
 .../internals/RocksDBSegmentedBytesStore.java   |  1 +
 .../kafka/streams/state/internals/Segments.java | 46 +++++++++++++++-----
 .../RocksDBSegmentedBytesStoreTest.java         | 39 ++++++++++++++++-
 .../streams/state/internals/SegmentsTest.java   | 29 ++++++------
 4 files changed, 91 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ed0e6921/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
index f3c4639..4d4ee41 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
@@ -138,4 +138,5 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
     public boolean isOpen() {
         return open;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed0e6921/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
index 9c8653a..7c6bb53 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
@@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -61,12 +62,14 @@ class Segments {
         this.formatter.setTimeZone(new SimpleTimeZone(0, "UTC"));
     }
 
-    long segmentId(long timestamp) {
+    long segmentId(final long timestamp) {
         return timestamp / segmentInterval;
     }
 
-    String segmentName(long segmentId) {
-        return name + "-" + formatter.format(new Date(segmentId * segmentInterval));
+    String segmentName(final long segmentId) {
+        // previous format used - as a separator so if this changes in the future
+        // then we should use something different.
+        return name + ":" + segmentId * segmentInterval;
     }
 
     Segment getSegmentForTimestamp(final long timestamp) {
@@ -101,7 +104,7 @@ class Segments {
                 if (list != null) {
                     long[] segmentIds = new long[list.length];
                     for (int i = 0; i < list.length; i++)
-                        segmentIds[i] = segmentIdFromSegmentName(list[i]);
+                        segmentIds[i] = segmentIdFromSegmentName(list[i], dir);
 
                     // open segments in the id order
                     Arrays.sort(segmentIds);
@@ -185,12 +188,35 @@ class Segments {
         }
     }
 
-    private long segmentIdFromSegmentName(String segmentName) {
-        try {
-            Date date = formatter.parse(segmentName.substring(name.length() + 1));
-            return date.getTime() / segmentInterval;
-        } catch (Exception ex) {
-            return -1L;
+    private long segmentIdFromSegmentName(final String segmentName,
+                                          final File parent) {
+        // old style segment name with date
+        if (segmentName.charAt(name.length()) == '-') {
+            final String datePart = segmentName.substring(name.length() + 1);
+            final Date date;
+            try {
+                date = formatter.parse(datePart);
+                final long segmentId = date.getTime() / segmentInterval;
+                final File newName = new File(parent, segmentName(segmentId));
+                final File oldName = new File(parent, segmentName);
+                if (!oldName.renameTo(newName)) {
+                    throw new ProcessorStateException("Unable to rename old style segment
from: "
+                                                              + oldName
+                                                              + " to new name: "
+                                                              + newName);
+                }
+                return segmentId;
+            } catch (ParseException e) {
+                log.warn("Unable to parse segmentName {} to a date. This segment will be
skipped", segmentName);
+                return -1L;
+            }
+        } else {
+            try {
+                return Long.parseLong(segmentName.substring(name.length() + 1)) / segmentInterval;
+            } catch (NumberFormatException e) {
+                throw new ProcessorStateException("Unable to parse segment id as long from
segmentName: " + segmentName);
+            }
         }
+
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed0e6921/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index 9cb6ee5..2dfc84b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -35,15 +35,22 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.SimpleTimeZone;
 
+import static org.apache.kafka.streams.state.internals.Segments.segmentInterval;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class RocksDBSegmentedBytesStoreTest {
 
@@ -53,10 +60,10 @@ public class RocksDBSegmentedBytesStoreTest {
     private final String storeName = "bytes-store";
     private RocksDBSegmentedBytesStore bytesStore;
     private File stateDir;
+    private final SessionKeySchema schema = new SessionKeySchema();
 
     @Before
     public void before() {
-        final SessionKeySchema schema = new SessionKeySchema();
         schema.init("topic");
         bytesStore = new RocksDBSegmentedBytesStore(storeName,
                                                     retention,
@@ -148,6 +155,36 @@ public class RocksDBSegmentedBytesStoreTest {
 
     }
 
+    @Test
+    public void shouldLoadSegementsWithOldStyleDateFormattedName() {
+        final Segments segments = new Segments(storeName, retention, numSegments);
+        final String key = "a";
+        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))),
serializeValue(50L));
+        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(30000L, 60000L))),
serializeValue(100L));
+        bytesStore.close();
+
+        final String firstSegmentName = segments.segmentName(0);
+        final String[] nameParts = firstSegmentName.split(":");
+        final Long segmentId = Long.parseLong(nameParts[1]);
+        final SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmm");
+        formatter.setTimeZone(new SimpleTimeZone(0, "UTC"));
+        final String formatted = formatter.format(new Date(segmentId * segmentInterval(retention,
numSegments)));
+        final File parent = new File(stateDir, storeName);
+        final File oldStyleName = new File(parent, nameParts[0] + "-" + formatted);
+        assertTrue(new File(parent, firstSegmentName).renameTo(oldStyleName));
+
+        bytesStore = new RocksDBSegmentedBytesStore(storeName,
+                                                    retention,
+                                                    numSegments,
+                                                    schema);
+
+        bytesStore.init(context, bytesStore);
+        final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()),
0L, 60000L));
+        assertThat(results, equalTo(Arrays.asList(KeyValue.pair(new Windowed<>(key,
new SessionWindow(0L, 0L)), 50L),
+                                                  KeyValue.pair(new Windowed<>(key,
new SessionWindow(30000L, 60000L)), 100L))));
+    }
+
+
     private Set<String> segmentDirs() {
         File windowDir = new File(stateDir, storeName);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed0e6921/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
index 37e0d92..0646b7e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
@@ -40,6 +40,7 @@ public class SegmentsTest {
     private static final int NUM_SEGMENTS = 5;
     private MockProcessorContext context;
     private Segments segments;
+    private long segmentInterval;
 
     @Before
     public void createContext() {
@@ -48,7 +49,9 @@ public class SegmentsTest {
                                            Serdes.Long(),
                                            new NoOpRecordCollector(),
                                            new ThreadCache(new LogContext("testCache "),
0, new MockStreamsMetrics(new Metrics())));
-        segments = new Segments("test", 4 * 60 * 1000, NUM_SEGMENTS);
+        int retentionPeriod = 4 * 60 * 1000;
+        segments = new Segments("test", retentionPeriod, NUM_SEGMENTS);
+        segmentInterval = Segments.segmentInterval(retentionPeriod, NUM_SEGMENTS);
     }
 
     @After
@@ -74,10 +77,10 @@ public class SegmentsTest {
     }
 
     @Test
-    public void shouldGetSegmentNameFromId() {
-        assertEquals("test-197001010000", segments.segmentName(0));
-        assertEquals("test-197001010001", segments.segmentName(1));
-        assertEquals("test-197001010002", segments.segmentName(2));
+    public void shouldGetSegmentNameFromId() throws Exception {
+        assertEquals("test:0", segments.segmentName(0));
+        assertEquals("test:" + segmentInterval, segments.segmentName(1));
+        assertEquals("test:" + 2 * segmentInterval, segments.segmentName(2));
     }
 
     @Test
@@ -85,9 +88,9 @@ public class SegmentsTest {
         final Segment segment1 = segments.getOrCreateSegment(0, context);
         final Segment segment2 = segments.getOrCreateSegment(1, context);
         final Segment segment3 = segments.getOrCreateSegment(2, context);
-        assertTrue(new File(context.stateDir(), "test/test-197001010000").isDirectory());
-        assertTrue(new File(context.stateDir(), "test/test-197001010001").isDirectory());
-        assertTrue(new File(context.stateDir(), "test/test-197001010002").isDirectory());
+        assertTrue(new File(context.stateDir(), "test/test:0").isDirectory());
+        assertTrue(new File(context.stateDir(), "test/test:" + segmentInterval).isDirectory());
+        assertTrue(new File(context.stateDir(), "test/test:" + 2 * segmentInterval).isDirectory());
         assertEquals(true, segment1.isOpen());
         assertEquals(true, segment2.isOpen());
         assertEquals(true, segment3.isOpen());
@@ -97,20 +100,20 @@ public class SegmentsTest {
     public void shouldNotCreateSegmentThatIsAlreadyExpired() {
         segments.getOrCreateSegment(7, context);
         assertNull(segments.getOrCreateSegment(0, context));
-        assertFalse(new File(context.stateDir(), "test/test-197001010000").exists());
+        assertFalse(new File(context.stateDir(), "test/test:0").exists());
     }
 
     @Test
     public void shouldCleanupSegmentsThatHaveExpired() {
         final Segment segment1 = segments.getOrCreateSegment(0, context);
-        final Segment segment2 = segments.getOrCreateSegment(0, context);
+        final Segment segment2 = segments.getOrCreateSegment(1, context);
         final Segment segment3 = segments.getOrCreateSegment(7, context);
         assertFalse(segment1.isOpen());
         assertFalse(segment2.isOpen());
         assertTrue(segment3.isOpen());
-        assertFalse(new File(context.stateDir(), "test/test-197001010000").exists());
-        assertFalse(new File(context.stateDir(), "test/test-197001010001").exists());
-        assertTrue(new File(context.stateDir(), "test/test-197001010007").exists());
+        assertFalse(new File(context.stateDir(), "test/test:0").exists());
+        assertFalse(new File(context.stateDir(), "test/test:" + segmentInterval).exists());
+        assertTrue(new File(context.stateDir(), "test/test:" + 7 * segmentInterval).exists());
     }
 
     @Test


Mime
View raw message