kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4241: StreamsConfig doesn't pass through custom consumer and producer properties to ConsumerConfig and ProducerConfig
Date Sun, 02 Oct 2016 04:57:52 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 33c7b88ff -> c78a3b173


KAFKA-4241: StreamsConfig doesn't pass through custom consumer and producer properties to
ConsumerConfig and ProducerConfig

pass through user-defined consumer and producer properties from StreamsConfig to ConsumerConfig
and ProducerConfig.
For example, consumer interceptor support, i.e, consumer.interceptor.classes=SomeClass

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1943 from dguy/interceptor


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c78a3b17
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c78a3b17
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c78a3b17

Branch: refs/heads/trunk
Commit: c78a3b173d019e8bb58f4e31c94bf4dd3f7ab614
Parents: 33c7b88
Author: Damian Guy <damian.guy@gmail.com>
Authored: Sat Oct 1 21:57:49 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Sat Oct 1 21:57:49 2016 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/StreamsConfig.java | 63 ++++++++++----------
 .../apache/kafka/streams/StreamsConfigTest.java | 24 ++++++++
 2 files changed, 54 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c78a3b17/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 6c88b11..66c15b9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -321,7 +321,7 @@ public class StreamsConfig extends AbstractConfig {
      * @throws ConfigException
      */
     public Map<String, Object> getConsumerConfigs(StreamThread streamThread, String
groupId, String clientId) throws ConfigException {
-        final Map<String, Object> consumerProps = getClientPropsWithPrefix(CONSUMER_PREFIX);
+        final Map<String, Object> consumerProps = getClientPropsWithPrefix(CONSUMER_PREFIX,
ConsumerConfig.configNames());
 
         // disable auto commit and throw exception if there is user overridden values,
         // this is necessary for streams commit semantics
@@ -330,26 +330,26 @@ public class StreamsConfig extends AbstractConfig {
                     + ", as the streams client will always turn off auto committing.");
         }
 
-        // generate consumer configs from original properties and overridden maps
-        Map<String, Object> props = clientProps(ConsumerConfig.configNames(), consumerProps,
CONSUMER_DEFAULT_OVERRIDES);
+        consumerProps.putAll(CONSUMER_DEFAULT_OVERRIDES);
 
         // bootstrap.servers should be from StreamsConfig
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.originals().get(BOOTSTRAP_SERVERS_CONFIG));
+        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.originals().get(BOOTSTRAP_SERVERS_CONFIG));
         // add client id with stream client id prefix, and group id
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
-        props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer");
+        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer");
 
         // add configs required for stream partition assignor
-        props.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
-        props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
-        props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
-        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamPartitionAssignor.class.getName());
-        props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
-        if (!getString(ZOOKEEPER_CONNECT_CONFIG).equals(""))
-            props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, getString(ZOOKEEPER_CONNECT_CONFIG));
-
-        props.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG));
-        return props;
+        consumerProps.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
+        consumerProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
+        consumerProps.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
+        consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamPartitionAssignor.class.getName());
+        consumerProps.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
+        if (!getString(ZOOKEEPER_CONNECT_CONFIG).equals("")) {
+            consumerProps.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, getString(ZOOKEEPER_CONNECT_CONFIG));
+        }
+
+        consumerProps.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG));
+        return consumerProps;
     }
 
 
@@ -362,7 +362,7 @@ public class StreamsConfig extends AbstractConfig {
      * @throws ConfigException
      */
     public Map<String, Object> getRestoreConsumerConfigs(String clientId) throws ConfigException
{
-        Map<String, Object> consumerProps = getClientPropsWithPrefix(CONSUMER_PREFIX);
+        Map<String, Object> consumerProps = getClientPropsWithPrefix(CONSUMER_PREFIX,
ConsumerConfig.configNames());
 
         // disable auto commit and throw exception if there is user overridden values,
         // this is necessary for streams commit semantics
@@ -371,19 +371,18 @@ public class StreamsConfig extends AbstractConfig {
                     + ", as the streams client will always turn off auto committing.");
         }
 
-        // generate consumer configs from original properties and overridden maps
-        Map<String, Object> props = clientProps(ConsumerConfig.configNames(), consumerProps,
CONSUMER_DEFAULT_OVERRIDES);
+        consumerProps.putAll(CONSUMER_DEFAULT_OVERRIDES);
 
         // bootstrap.servers should be from StreamsConfig
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.originals().get(BOOTSTRAP_SERVERS_CONFIG));
+        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.originals().get(BOOTSTRAP_SERVERS_CONFIG));
 
         // no need to set group id for a restore consumer
-        props.remove(ConsumerConfig.GROUP_ID_CONFIG);
+        consumerProps.remove(ConsumerConfig.GROUP_ID_CONFIG);
 
         // add client id with stream client id prefix
-        props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-restore-consumer");
+        consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-restore-consumer");
 
-        return props;
+        return consumerProps;
     }
 
 
@@ -397,7 +396,9 @@ public class StreamsConfig extends AbstractConfig {
      */
     public Map<String, Object> getProducerConfigs(String clientId) {
         // generate producer configs from original properties and overridden maps
-        Map<String, Object> props = clientProps(ProducerConfig.configNames(), getClientPropsWithPrefix(PRODUCER_PREFIX),
PRODUCER_DEFAULT_OVERRIDES);
+        final Map<String, Object> props = getClientPropsWithPrefix(PRODUCER_PREFIX,
ProducerConfig.configNames());
+        props.putAll(PRODUCER_DEFAULT_OVERRIDES);
+
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.originals().get(BOOTSTRAP_SERVERS_CONFIG));
         // add client id with stream client id prefix
         props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-producer");
@@ -405,9 +406,8 @@ public class StreamsConfig extends AbstractConfig {
         return props;
     }
 
-    private Map<String, Object> getClientPropsWithPrefix(final String prefix) {
-        // To be backward compatible we first get all the originals.
-        final Map<String, Object> props = this.originals();
+    private Map<String, Object> getClientPropsWithPrefix(final String prefix, final
Set<String> configNames) {
+        final Map<String, Object> props = clientProps(configNames, originals());
         props.putAll(this.originalsWithPrefix(prefix));
         return props;
     }
@@ -433,22 +433,19 @@ public class StreamsConfig extends AbstractConfig {
     }
 
     /**
-     * Filter configs that are not defined in the given set of configuration names.
+     * Override any client properties in the original configs with overrides
      *
      * @param configNames The given set of configuration names.
      * @param originals The original configs to be filtered.
-     * @param overrides The default overridden values.
-     * @return Filtered configs.
+     * @return client config with any overrides
      */
-    private Map<String, Object> clientProps(Set<String> configNames, Map<String,
Object> originals, Map<String, Object> overrides) {
+    private Map<String, Object> clientProps(Set<String> configNames, Map<String,
Object> originals) {
         // iterate all client config names, filter out non-client configs from the original
         // property map and use the overridden values when they are not specified by users
         Map<String, Object> parsed = new HashMap<>();
         for (String configName: configNames) {
             if (originals.containsKey(configName)) {
                 parsed.put(configName, originals.get(configName));
-            } else if (overrides.containsKey(configName)) {
-                parsed.put(configName, overrides.get(configName));
             }
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c78a3b17/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 5f36b9d..3caa767 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -133,6 +133,30 @@ public class StreamsConfigTest {
         assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
     }
 
+    @Test
+    public void shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() throws Exception
{
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        props.put(consumerPrefix("interceptor.statsd.host"), "host");
+        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null,
"groupId", "clientId");
+        assertEquals("host", consumerConfigs.get("interceptor.statsd.host"));
+    }
+
+    @Test
+    public void shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig() throws
Exception {
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        props.put(consumerPrefix("interceptor.statsd.host"), "host");
+        final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
+        assertEquals("host", consumerConfigs.get("interceptor.statsd.host"));
+    }
+
+    @Test
+    public void shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig() throws Exception
{
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        props.put(producerPrefix("interceptor.statsd.host"), "host");
+        final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
+        assertEquals("host", producerConfigs.get("interceptor.statsd.host"));
+    }
+
 
     @Test
     public void shouldSupportPrefixedProducerConfigs() throws Exception {


Mime
View raw message