kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: KAFKA-6150: KIP-204 part III; Change repartition topic segment size and ms
Date Wed, 20 Dec 2017 00:05:47 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk f3b9afe62 -> 82c6d429e


http://git-wip-us.apache.org/repos/asf/kafka/blob/82c6d429/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 0fdb575..e68f86b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsConfig;
@@ -39,7 +40,6 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 
 import java.lang.reflect.Field;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -377,29 +377,15 @@ public class InternalTopologyBuilderTest {
         expectedTopicGroups.put(0, new InternalTopologyBuilder.TopicsInfo(
             Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"),
             Collections.<String, InternalTopicConfig>emptyMap(),
-            Collections.singletonMap(
-                store1,
-                new InternalTopicConfig(
-                    store1,
-                    Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
-                    Collections.<String, String>emptyMap()))));
+            Collections.singletonMap(store1, (InternalTopicConfig) new UnwindowedChangelogTopicConfig(store1,
Collections.<String, String>emptyMap()))));
         expectedTopicGroups.put(1, new InternalTopologyBuilder.TopicsInfo(
             Collections.<String>emptySet(), mkSet("topic-3", "topic-4"),
             Collections.<String, InternalTopicConfig>emptyMap(),
-            Collections.singletonMap(
-                store2,
-                new InternalTopicConfig(
-                    store2,
-                    Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
-                    Collections.<String, String>emptyMap()))));
+            Collections.singletonMap(store2, (InternalTopicConfig) new UnwindowedChangelogTopicConfig(store2,
Collections.<String, String>emptyMap()))));
         expectedTopicGroups.put(2, new InternalTopologyBuilder.TopicsInfo(
             Collections.<String>emptySet(), mkSet("topic-5"),
             Collections.<String, InternalTopicConfig>emptyMap(),
-            Collections.singletonMap(store3,
-                new InternalTopicConfig(
-                    store3,
-                    Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
-                    Collections.<String, String>emptyMap()))));
+            Collections.singletonMap(store3, (InternalTopicConfig) new UnwindowedChangelogTopicConfig(store3,
Collections.<String, String>emptyMap()))));
 
         assertEquals(3, topicGroups.size());
         assertEquals(expectedTopicGroups, topicGroups);
@@ -520,7 +506,7 @@ public class InternalTopologyBuilderTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void shouldAddInternalTopicConfigWithCompactAndDeleteSetForWindowStores() throws
Exception {
+    public void shouldAddInternalTopicConfigForWindowStores() throws Exception {
         builder.setApplicationId("appId");
         builder.addSource(null, "source", null, null, null, "topic");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source");
@@ -528,44 +514,44 @@ public class InternalTopologyBuilderTest {
         final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
         final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
         final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog");
-        final Properties properties = topicConfig.toProperties(0);
-        final List<String> policies = Arrays.asList(properties.getProperty(InternalTopicManager.CLEANUP_POLICY_PROP).split(","));
-        assertEquals("appId-store-changelog", topicConfig.name());
-        assertTrue(policies.contains("compact"));
-        assertTrue(policies.contains("delete"));
-        assertEquals(2, policies.size());
-        assertEquals("30000", properties.getProperty(InternalTopicManager.RETENTION_MS));
+        final Map<String, String> properties = topicConfig.getProperties(Collections.<String,
String>emptyMap(), 10000);
         assertEquals(2, properties.size());
+        assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE,
properties.get(TopicConfig.CLEANUP_POLICY_CONFIG));
+        assertEquals("40000", properties.get(TopicConfig.RETENTION_MS_CONFIG));
+        assertEquals("appId-store-changelog", topicConfig.name());
+        assertTrue(topicConfig instanceof WindowedChangelogTopicConfig);
     }
 
     @SuppressWarnings("unchecked")
     @Test
-    public void shouldAddInternalTopicConfigWithCompactForNonWindowStores() throws Exception
{
+    public void shouldAddInternalTopicConfigForNonWindowStores() throws Exception {
         builder.setApplicationId("appId");
         builder.addSource(null, "source", null, null, null, "topic");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source");
-        builder.addStateStore(new MockStateStoreSupplier("name", true), "processor");
+        builder.addStateStore(new MockStateStoreSupplier("store", true), "processor");
         final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
         final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
-        final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-name-changelog");
-        final Properties properties = topicConfig.toProperties(0);
-        assertEquals("appId-name-changelog", topicConfig.name());
-        assertEquals("compact", properties.getProperty(InternalTopicManager.CLEANUP_POLICY_PROP));
+        final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog");
+        final Map<String, String> properties = topicConfig.getProperties(Collections.<String,
String>emptyMap(), 10000);
         assertEquals(1, properties.size());
+        assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, properties.get(TopicConfig.CLEANUP_POLICY_CONFIG));
+        assertEquals("appId-store-changelog", topicConfig.name());
+        assertTrue(topicConfig instanceof UnwindowedChangelogTopicConfig);
     }
 
     @SuppressWarnings("unchecked")
     @Test
-    public void shouldAddInternalTopicConfigWithCleanupPolicyDeleteForInternalTopics() throws
Exception {
+    public void shouldAddInternalTopicConfigForRepartitionTopics() throws Exception {
         builder.setApplicationId("appId");
         builder.addInternalTopic("foo");
         builder.addSource(null, "source", null, null, null, "foo");
         final InternalTopologyBuilder.TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next();
         final InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo");
-        final Properties properties = topicConfig.toProperties(0);
+        final Map<String, String> properties = topicConfig.getProperties(Collections.<String,
String>emptyMap(), 10000);
+        assertEquals(4, properties.size());
+        assertEquals(TopicConfig.CLEANUP_POLICY_DELETE, properties.get(TopicConfig.CLEANUP_POLICY_CONFIG));
         assertEquals("appId-foo", topicConfig.name());
-        assertEquals("delete", properties.getProperty(InternalTopicManager.CLEANUP_POLICY_PROP));
-        assertEquals(1, properties.size());
+        assertTrue(topicConfig instanceof RepartitionTopicConfig);
     }
 
     @SuppressWarnings("deprecation")

http://git-wip-us.apache.org/repos/asf/kafka/blob/82c6d429/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java b/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
index 3db7e53..ae73280 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
@@ -38,7 +38,7 @@ public class MockInternalTopicManager extends InternalTopicManager {
 
     public MockInternalTopicManager(final StreamsConfig streamsConfig,
                                     final MockConsumer<byte[], byte[]> restoreConsumer)
{
-        super(KafkaAdminClient.create(streamsConfig.originals()), streamsConfig.originals());
+        super(KafkaAdminClient.create(streamsConfig.originals()), streamsConfig);
 
         this.restoreConsumer = restoreConsumer;
     }


Mime
View raw message