kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6535: Set default retention ms for Streams repartition topics to Long.MAX_VALUE (#4730)
Date Mon, 30 Apr 2018 10:18:46 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6655a4d  KAFKA-6535: Set default retention ms for Streams repartition topics to Long.MAX_VALUE
(#4730)
6655a4d is described below

commit 6655a4d75f8d839c9f87e68fbda77ed0637825d2
Author: khairy <khaireddine120@gmail.com>
AuthorDate: Mon Apr 30 11:18:40 2018 +0100

    KAFKA-6535: Set default retention ms for Streams repartition topics to Long.MAX_VALUE
(#4730)
    
    Implements KIP-284
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>,
Guozhang Wang <guozhang@confluent.io>
---
 docs/streams/upgrade-guide.html                                   | 6 +++++-
 docs/upgrade.html                                                 | 3 ++-
 .../kafka/streams/processor/internals/RepartitionTopicConfig.java | 7 ++++---
 .../kafka/streams/integration/InternalTopicIntegrationTest.java   | 4 ++--
 .../org/apache/kafka/streams/processor/TopologyBuilderTest.java   | 3 ++-
 .../streams/processor/internals/InternalTopicConfigTest.java      | 8 ++++++++
 .../streams/processor/internals/InternalTopologyBuilderTest.java  | 3 ++-
 7 files changed, 25 insertions(+), 9 deletions(-)

diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 565bd0b..462824f 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -100,6 +100,7 @@
     </ul>
 
     <!-- TODO: verify release verion and update `id` and `href` attributes (also at other
places that link to this headline) -->
+    
     <h3><a id="streams_api_changes_120" href="#streams_api_changes_120">Streams
API changes in 1.2.0</a></h3>
     <p>
         We have removed the <code>skippedDueToDeserializationError-rate</code>
and <code>skippedDueToDeserializationError-total</code> metrics.
@@ -156,7 +157,10 @@
       The new class <code>To</code> allows you to send records to all or specific
downstream processors by name and to set the timestamp for the output record.
       Forwarding based on child index is not supported in the new API any longer.
     </p>
-
+    <p>
+        <a href="https://cwiki.apache.org/confluence/x/DVyHB">KIP-284</a> changed
the retention time for repartition topics by setting its default value to <code>Long.MAX_VALUE</code>.
+        Instead of relying on data retention Kafka Streams uses the new purge data API to
delete consumed data from those topics and to keep used storage small now.
+    </p>
     <p>
       Kafka Streams DSL for Scala is a new Kafka Streams client library available for developers
authoring Kafka Streams applications in Scala.  It wraps core Kafka Streams DSL types to make
it easier to call when
       interoperating with Scala code.  For example, it includes higher order functions as
parameters for transformations avoiding the need anonymous classes in Java 7 or experimental
SAM type conversions in Scala 2.11, automatic conversion between Java and Scala collection
types, a way
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 08cc892..4fe7e20 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -75,6 +75,7 @@
         updated to aggregate across different versions.
     </li>
     <li> New Kafka Streams configuration parameter <code>upgrade.from</code>
added that allows rolling bounce upgrade from older version. </li>
+    <li><a href="https://cwiki.apache.org/confluence/x/DVyHB">KIP-284</a>
changed the retention time for repartition topics by setting its default value to <code>Long.MAX_VALUE</code>.</li>
 </ul>
 
 <h5><a id="upgrade_120_new_protocols" href="#upgrade_120_new_protocols">New Protocol
Versions</a></h5>
@@ -87,7 +88,6 @@
     <li> See <a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_120">Streams
API changes in 1.2.0</a> for more details. </li>
 </ul>
 
-
 <h4><a id="upgrade_1_1_0" href="#upgrade_1_1_0">Upgrading from 0.8.x, 0.9.x,
0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x or 1.0.x to 1.1.x</a></h4>
 <p>Kafka 1.1.0 introduces wire protocol changes. By following the recommended rolling
upgrade plan below,
     you guarantee no downtime during the upgrade. However, please review the <a href="#upgrade_110_notable">notable
changes in 1.1.0</a> before upgrading.
@@ -132,6 +132,7 @@
         Hot-swaping the jar-file only might not work.</li>
 </ol>
 
+
 <!-- TODO add if 1.1.1 gets release
 <h5><a id="upgrade_111_notable" href="#upgrade_111_notable">Notable changes in
1.1.1</a></h5>
 <ul>
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java
index 1459310..ca8fbff 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java
@@ -33,9 +33,10 @@ public class RepartitionTopicConfig extends InternalTopicConfig {
     static {
         final Map<String, String> tempTopicDefaultOverrides = new HashMap<>();
         tempTopicDefaultOverrides.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);
-        tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, "52428800");
    // 50 MB
-        tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_BYTES_CONFIG, "52428800");    
      // 50 MB
-        tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_MS_CONFIG, "600000");         
      // 10 min
+        tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, "52428800");
              // 50 MB
+        tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_BYTES_CONFIG, "52428800");    
                // 50 MB
+        tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_MS_CONFIG, "600000");         
                // 10 min
+        tempTopicDefaultOverrides.put(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(Long.MAX_VALUE));
 // Infinity
         REPARTITION_TOPIC_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempTopicDefaultOverrides);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 1469d18..2d9c8c4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -164,7 +164,7 @@ public class InternalTopicIntegrationTest {
 
         final Properties repartitionProps = getTopicProperties(appID + "-Counts-repartition");
         assertEquals(LogConfig.Delete(), repartitionProps.getProperty(LogConfig.CleanupPolicyProp()));
-        assertEquals(4, repartitionProps.size());
+        assertEquals(5, repartitionProps.size());
     }
 
     @Test
@@ -213,6 +213,6 @@ public class InternalTopicIntegrationTest {
 
         final Properties repartitionProps = getTopicProperties(appID + "-CountWindows-repartition");
         assertEquals(LogConfig.Delete(), repartitionProps.getProperty(LogConfig.CleanupPolicyProp()));
-        assertEquals(4, repartitionProps.size());
+        assertEquals(5, repartitionProps.size());
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index f948c2c..e3b888d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -588,7 +588,8 @@ public class TopologyBuilderTest {
         final TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next();
         final InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo");
         final Map<String, String> properties = topicConfig.getProperties(Collections.<String,
String>emptyMap(), 10000);
-        assertEquals(4, properties.size());
+        assertEquals(5, properties.size());
+        assertEquals(String.valueOf(Long.MAX_VALUE), properties.get(TopicConfig.RETENTION_MS_CONFIG));
         assertEquals("appId-foo", topicConfig.name());
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
index eaae352..ea7a926 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
@@ -57,4 +57,12 @@ public class InternalTopicConfigTest {
         assertEquals("1000", properties.get("retention.ms"));
         assertEquals("10000", properties.get("retention.bytes"));
     }
+
+    @Test
+    public void shouldUseSuppliedConfigsForRepartitionConfig() {
+        final Map<String, String> configs = new HashMap<>();
+        configs.put("retention.ms", "1000");
+        final RepartitionTopicConfig topicConfig = new RepartitionTopicConfig("name", configs);
+        assertEquals("1000", topicConfig.getProperties(Collections.<String, String>emptyMap(),
0).get(TopicConfig.RETENTION_MS_CONFIG));
+    }
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 25468c0..b3663fa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -554,7 +554,8 @@ public class InternalTopologyBuilderTest {
         final InternalTopologyBuilder.TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next();
         final InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo");
         final Map<String, String> properties = topicConfig.getProperties(Collections.<String,
String>emptyMap(), 10000);
-        assertEquals(4, properties.size());
+        assertEquals(5, properties.size());
+        assertEquals(String.valueOf(Long.MAX_VALUE), properties.get(TopicConfig.RETENTION_MS_CONFIG));
         assertEquals(TopicConfig.CLEANUP_POLICY_DELETE, properties.get(TopicConfig.CLEANUP_POLICY_CONFIG));
         assertEquals("appId-foo", topicConfig.name());
         assertTrue(topicConfig instanceof RepartitionTopicConfig);

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.

Mime
View raw message