kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3786: Let ConfigDef filter property key value pairs
Date Mon, 13 Jun 2016 21:15:31 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk b058fcf5e -> b8ea094b4


KAFKA-3786: Let ConfigDef filter property key value pairs

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #1465 from guozhangwang/K3786-config-parsing


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b8ea094b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b8ea094b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b8ea094b

Branch: refs/heads/trunk
Commit: b8ea094b427768bb360d87fc0a07f670cb667e1e
Parents: b058fcf
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Mon Jun 13 14:15:28 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Jun 13 14:15:28 2016 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/ConsumerConfig.java  |   5 +
 .../kafka/clients/producer/ProducerConfig.java  |   5 +
 .../org/apache/kafka/streams/StreamsConfig.java | 120 ++++++++++++-------
 .../apache/kafka/streams/StreamsConfigTest.java |  10 +-
 4 files changed, 93 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b8ea094b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index e58f2fd..b7fc1d2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.Deserializer;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
@@ -376,6 +377,10 @@ public class ConsumerConfig extends AbstractConfig {
         super(CONFIG, props);
     }
 
+    public static Set<String> configNames() {
+        return CONFIG.names();
+    }
+
     public static void main(String[] args) {
         System.out.println(CONFIG.toHtmlTable());
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8ea094b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index e505f71..47eb309 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.serialization.Serializer;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 import static org.apache.kafka.common.config.ConfigDef.Range.between;
@@ -336,6 +337,10 @@ public class ProducerConfig extends AbstractConfig {
         super(CONFIG, props);
     }
 
+    public static Set<String> configNames() {
+        return CONFIG.names();
+    }
+
     public static void main(String[] args) {
         System.out.println(CONFIG.toHtmlTable());
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8ea094b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 95e55c9..7f32434 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.processor.ConsumerRecordTimestampExtractor;
@@ -31,7 +32,10 @@ import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
 import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 
@@ -212,6 +216,28 @@ public class StreamsConfig extends AbstractConfig {
                                         CommonClientConfigs.METRICS_NUM_SAMPLES_DOC);
     }
 
+    // this is the list of configs for underlying clients
+    // that streams prefer different default values
+    private static final Map<String, Object> PRODUCER_DEFAULT_OVERRIDES;
+    static
+    {
+        Map<String, Object> tempProducerDefaultOverrides = new HashMap<>();
+        tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, "100");
+
+        PRODUCER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempProducerDefaultOverrides);
+    }
+
+    private static final Map<String, Object> CONSUMER_DEFAULT_OVERRIDES;
+    static
+    {
+        Map<String, Object> tempConsumerDefaultOverrides = new HashMap<>();
+        tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
+        tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+
+        CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides);
+    }
+
     public static class InternalConfig {
         public static final String STREAM_THREAD_INSTANCE = "__stream.thread.instance__";
     }
@@ -220,8 +246,18 @@ public class StreamsConfig extends AbstractConfig {
         super(CONFIG, props);
     }
 
-    public Map<String, Object> getConsumerConfigs(StreamThread streamThread, String
groupId, String clientId) {
-        Map<String, Object> props = getBaseConsumerConfigs();
+    public Map<String, Object> getConsumerConfigs(StreamThread streamThread, String
groupId, String clientId) throws ConfigException {
+        Map<String, Object> originals = this.originals();
+
+        // disable auto commit and throw exception if there is user overridden values,
+        // this is necessary for streams commit semantics
+        if (originals.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
+            throw new ConfigException("Unexpected user-specified consumer config " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
+                    + ", as the streams client will always turn off auto committing.");
+        }
+
+        // generate consumer configs from original properties and overridden maps
+        Map<String, Object> props = clientProps(ConsumerConfig.configNames(), originals,
CONSUMER_DEFAULT_OVERRIDES);
 
         // add client id with stream client id prefix, and group id
         props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
@@ -232,15 +268,24 @@ public class StreamsConfig extends AbstractConfig {
         props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
         props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
         props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamPartitionAssignor.class.getName());
-
         if (!getString(ZOOKEEPER_CONNECT_CONFIG).equals(""))
             props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, getString(ZOOKEEPER_CONNECT_CONFIG));
 
         return props;
     }
 
-    public Map<String, Object> getRestoreConsumerConfigs(String clientId) {
-        Map<String, Object> props = getBaseConsumerConfigs();
+    public Map<String, Object> getRestoreConsumerConfigs(String clientId) throws ConfigException
{
+        Map<String, Object> originals = this.originals();
+
+        // disable auto commit and throw exception if there is user overridden values,
+        // this is necessary for streams commit semantics
+        if (originals.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
+            throw new ConfigException("Unexpected user-specified consumer config " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
+                    + ", as the streams client will always turn off auto committing.");
+        }
+
+        // generate consumer configs from original properties and overridden maps
+        Map<String, Object> props = clientProps(ConsumerConfig.configNames(), originals,
CONSUMER_DEFAULT_OVERRIDES);
 
         // no need to set group id for a restore consumer
         props.remove(ConsumerConfig.GROUP_ID_CONFIG);
@@ -251,29 +296,9 @@ public class StreamsConfig extends AbstractConfig {
         return props;
     }
 
-    private Map<String, Object> getBaseConsumerConfigs() {
-        Map<String, Object> props = this.originals();
-
-        // remove streams properties
-        removeStreamsSpecificConfigs(props);
-
-        // set consumer default property values
-        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-
-        return props;
-    }
-
     public Map<String, Object> getProducerConfigs(String clientId) {
-        Map<String, Object> props = this.originals();
-
-        // remove consumer properties that are not required for producers
-        props.remove(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
-
-        // remove streams properties
-        removeStreamsSpecificConfigs(props);
-
-        // set producer default property values
-        props.put(ProducerConfig.LINGER_MS_CONFIG, "100");
+        // generate producer configs from original properties and overridden maps
+        Map<String, Object> props = clientProps(ProducerConfig.configNames(), this.originals(),
PRODUCER_DEFAULT_OVERRIDES);
 
         // add client id with stream client id prefix
         props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-producer");
@@ -281,24 +306,6 @@ public class StreamsConfig extends AbstractConfig {
         return props;
     }
 
-    private void removeStreamsSpecificConfigs(Map<String, Object> props) {
-        props.remove(StreamsConfig.POLL_MS_CONFIG);
-        props.remove(StreamsConfig.STATE_DIR_CONFIG);
-        props.remove(StreamsConfig.APPLICATION_ID_CONFIG);
-        props.remove(StreamsConfig.KEY_SERDE_CLASS_CONFIG);
-        props.remove(StreamsConfig.VALUE_SERDE_CLASS_CONFIG);
-        props.remove(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG);
-        props.remove(StreamsConfig.REPLICATION_FACTOR_CONFIG);
-        props.remove(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
-        props.remove(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
-        props.remove(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
-        props.remove(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
-        props.remove(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG);
-        props.remove(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
-        props.remove(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
-        props.remove(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE);
-    }
-
     public Serde keySerde() {
         Serde<?> serde = getConfiguredInstance(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serde.class);
         serde.configure(originals(), true);
@@ -313,6 +320,29 @@ public class StreamsConfig extends AbstractConfig {
         return serde;
     }
 
+    /**
+     * Filter configs that are not defined in the given set of configuration names.
+     *
+     * @param configNames The given set of configuration names.
+     * @param originals The original configs to be filtered.
+     * @param overrides The default overridden values.
+     * @return Filtered configs.
+     */
+    private Map<String, Object> clientProps(Set<String> configNames, Map<String,
Object> originals, Map<String, Object> overrides) {
+        // iterate all client config names, filter out non-client configs from the original
+        // property map and use the overridden values when they are not specified by users
+        Map<String, Object> parsed = new HashMap<>();
+        for (String configName: configNames) {
+            if (originals.containsKey(configName)) {
+                parsed.put(configName, originals.get(configName));
+            } else if (overrides.containsKey(configName)) {
+                parsed.put(configName, overrides.get(configName));
+            }
+        }
+
+        return parsed;
+    }
+
     public static void main(String[] args) {
         System.out.println(CONFIG.toHtmlTable());
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8ea094b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 17d6b4b..3d4a9cc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
@@ -44,6 +45,7 @@ public class StreamsConfigTest {
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        props.put("DUMMY", "dummy");
         props.put("key.deserializer.encoding", "UTF8");
         props.put("value.deserializer.encoding", "UTF-16");
         streamsConfig = new StreamsConfig(props);
@@ -52,7 +54,9 @@ public class StreamsConfigTest {
     @Test
     public void testGetProducerConfigs() throws Exception {
         Map<String, Object> returnedProps = streamsConfig.getProducerConfigs("client");
-        assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-producer");
+        assertEquals(returnedProps.get(ProducerConfig.CLIENT_ID_CONFIG), "client-producer");
+        assertEquals(returnedProps.get(ProducerConfig.LINGER_MS_CONFIG), "100");
+        assertNull(returnedProps.get("DUMMY"));
     }
 
     @Test
@@ -60,7 +64,8 @@ public class StreamsConfigTest {
         Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(null,
"example-application", "client");
         assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-consumer");
         assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), "example-application");
-
+        assertEquals(returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "1000");
+        assertNull(returnedProps.get("DUMMY"));
     }
 
     @Test
@@ -68,6 +73,7 @@ public class StreamsConfigTest {
         Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs("client");
         assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-restore-consumer");
         assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG));
+        assertNull(returnedProps.get("DUMMY"));
     }
 
     @Test


Mime
View raw message