kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7101: Consider session store for windowed store default configs (#5298)
Date Tue, 03 Jul 2018 05:00:29 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8250738  KAFKA-7101: Consider session store for windowed store default configs (#5298)
8250738 is described below

commit 8250738ae41f6f0a87dc1e21e7623c5d69cae148
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Mon Jul 2 22:00:23 2018 -0700

    KAFKA-7101: Consider session store for windowed store default configs (#5298)
    
    1. extend isWindowStore to consider session store as well.
    2. extend the existing unit test accordingly.
    
    Reviewers: John Roesler <john@confluent.io>, Bill Bejeck <bill@confluent.io>,
Matthias J. Sax <matthias@confluent.io>
---
 .../internals/InternalTopologyBuilder.java         | 13 ++++++----
 .../streams/state/SessionBytesStoreSupplier.java   |  7 +++++
 .../RocksDbSessionBytesStoreSupplier.java          |  5 ++++
 .../state/internals/SessionStoreBuilder.java       |  4 +++
 .../internals/InternalTopologyBuilderTest.java     | 30 +++++++++++++++-------
 5 files changed, 45 insertions(+), 14 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index ed51754..c644f9b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.TopicNameExtractor;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
 import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -138,8 +139,10 @@ public class InternalTopologyBuilder {
         }
 
         long retentionPeriod() {
-            if (isWindowStore()) {
+            if (builder instanceof WindowStoreBuilder) {
                 return ((WindowStoreBuilder) builder).retentionPeriod();
+            } else if (builder instanceof SessionStoreBuilder) {
+                return ((SessionStoreBuilder) builder).retentionPeriod();
             } else {
                 throw new IllegalStateException("retentionPeriod is not supported when not
a window store");
             }
@@ -159,7 +162,7 @@ public class InternalTopologyBuilder {
         }
 
         private boolean isWindowStore() {
-            return builder instanceof WindowStoreBuilder;
+            return builder instanceof WindowStoreBuilder || builder instanceof SessionStoreBuilder;
         }
 
         // Apparently Java strips the generics from this method because we're using the raw
type for builder,
@@ -226,7 +229,7 @@ public class InternalTopologyBuilder {
                                   final Deserializer<?> keyDeserializer,
                                   final Deserializer<?> valDeserializer) {
             super(name, NO_PREDECESSORS);
-            this.topics = topics != null ? Arrays.asList(topics) : new ArrayList<String>();
+            this.topics = topics != null ? Arrays.asList(topics) : new ArrayList<>();
             this.pattern = pattern;
             this.keyDeserializer = keyDeserializer;
             this.valDeserializer = valDeserializer;
@@ -316,7 +319,7 @@ public class InternalTopologyBuilder {
                 final String topic = ((StaticTopicNameExtractor) topicExtractor).topicName;
                 if (internalTopicNames.contains(topic)) {
                     // prefix the internal topic name with the application id
-                    return new SinkNode<>(name, new StaticTopicNameExtractor<K,
V>(decorateTopic(topic)), keySerializer, valSerializer, partitioner);
+                    return new SinkNode<>(name, new StaticTopicNameExtractor<>(decorateTopic(topic)),
keySerializer, valSerializer, partitioner);
                 } else {
                     return new SinkNode<>(name, topicExtractor, keySerializer, valSerializer,
partitioner);
                 }
@@ -415,7 +418,7 @@ public class InternalTopologyBuilder {
             throw new TopologyException("Sink " + name + " must have at least one parent");
         }
 
-        addSink(name, new StaticTopicNameExtractor<K, V>(topic), keySerializer, valSerializer,
partitioner, predecessorNames);
+        addSink(name, new StaticTopicNameExtractor<>(topic), keySerializer, valSerializer,
partitioner, predecessorNames);
         nodeToSinkTopic.put(name, topic);
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java
index 04b0ceb..6954089 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java
@@ -35,4 +35,11 @@ public interface SessionBytesStoreSupplier extends StoreSupplier<SessionStore<By
      * @return segmentInterval in milliseconds
      */
     long segmentIntervalMs();
+
+    /**
+     * The time period for which the {@link SessionStore} will retain historic data.
+     *
+     * @return retentionPeriod
+     */
+    long retentionPeriod();
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
index 45df39c..5610fb2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
@@ -56,4 +56,9 @@ public class RocksDbSessionBytesStoreSupplier implements SessionBytesStoreSuppli
         // Selected somewhat arbitrarily. Profiling may reveal a different value is preferable.
         return Math.max(retentionPeriod / 2, 60_000L);
     }
+
+    @Override
+    public long retentionPeriod() {
+        return retentionPeriod;
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java
index 61919c3..b433895 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java
@@ -60,4 +60,8 @@ public class SessionStoreBuilder<K, V> extends AbstractStoreBuilder<K,
V, Sessio
         }
         return new ChangeLoggingSessionBytesStore(inner);
     }
+
+    public long retentionPeriod() {
+        return storeSupplier.retentionPeriod();
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index fb64130..78c217d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -532,24 +532,36 @@ public class InternalTopologyBuilderTest {
         builder.setApplicationId("appId");
         builder.addSource(null, "source", null, null, null, "topic");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source");
-
         builder.addStateStore(
             Stores.windowStoreBuilder(
-                Stores.persistentWindowStore("store", 30_000L, 10_000L, false),
+                Stores.persistentWindowStore("store1", 30_000L, 10_000L, false),
                 Serdes.String(),
                 Serdes.String()
             ),
             "processor"
         );
+        builder.addStateStore(
+                Stores.sessionStoreBuilder(
+                        Stores.persistentSessionStore("store2", 30000), Serdes.String(),
Serdes.String()
+                ),
+                "processor"
+        );
         final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
         final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
-        final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog");
-        final Map<String, String> properties = topicConfig.getProperties(Collections.<String,
String>emptyMap(), 10000);
-        assertEquals(2, properties.size());
-        assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE,
properties.get(TopicConfig.CLEANUP_POLICY_CONFIG));
-        assertEquals("40000", properties.get(TopicConfig.RETENTION_MS_CONFIG));
-        assertEquals("appId-store-changelog", topicConfig.name());
-        assertTrue(topicConfig instanceof WindowedChangelogTopicConfig);
+        final InternalTopicConfig topicConfig1 = topicsInfo.stateChangelogTopics.get("appId-store1-changelog");
+        final Map<String, String> properties1 = topicConfig1.getProperties(Collections.<String,
String>emptyMap(), 10000);
+        assertEquals(2, properties1.size());
+        assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE,
properties1.get(TopicConfig.CLEANUP_POLICY_CONFIG));
+        assertEquals("40000", properties1.get(TopicConfig.RETENTION_MS_CONFIG));
+        assertEquals("appId-store1-changelog", topicConfig1.name());
+        assertTrue(topicConfig1 instanceof WindowedChangelogTopicConfig);
+        final InternalTopicConfig topicConfig2 = topicsInfo.stateChangelogTopics.get("appId-store2-changelog");
+        final Map<String, String> properties2 = topicConfig2.getProperties(Collections.<String,
String>emptyMap(), 10000);
+        assertEquals(2, properties2.size());
+        assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE,
properties2.get(TopicConfig.CLEANUP_POLICY_CONFIG));
+        assertEquals("40000", properties2.get(TopicConfig.RETENTION_MS_CONFIG));
+        assertEquals("appId-store2-changelog", topicConfig2.name());
+        assertTrue(topicConfig2 instanceof WindowedChangelogTopicConfig);
     }
 
     @Test


Mime
View raw message