kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6657: Add StreamsConfig prefix for different consumers (#4805)
Date Wed, 02 May 2018 20:24:19 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1b170df  KAFKA-6657: Add StreamsConfig prefix for different consumers (#4805)
1b170df is described below

commit 1b170df31c7304f6d4c938b5e0c2a09ae1e9189d
Author: Boyang Chen <bchen11@outlook.com>
AuthorDate: Wed May 2 13:24:15 2018 -0700

    KAFKA-6657: Add StreamsConfig prefix for different consumers (#4805)
    
    This pull request is for JIRA 6657, for KIP-276.
    
    Added unit tests for new getGlobalConsumerConfigs API and make sure existing restore consumer
tests are passing.
    
    Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>,
Guozhang Wang <wangguoz@gmail.com>
---
 docs/streams/developer-guide/config-streams.html   |  20 ++-
 .../apache/kafka/streams/KafkaClientSupplier.java  |  20 ++-
 .../org/apache/kafka/streams/KafkaStreams.java     |   2 +-
 .../org/apache/kafka/streams/StreamsConfig.java    | 144 +++++++++++++++++++--
 .../internals/DefaultKafkaClientSupplier.java      |  11 +-
 .../streams/processor/internals/StreamThread.java  |   2 +-
 .../apache/kafka/streams/StreamsConfigTest.java    |  96 ++++++++++++--
 .../org/apache/kafka/test/MockClientSupplier.java  |   4 +
 8 files changed, 266 insertions(+), 33 deletions(-)

diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html
index fd0cbca..e3cae22 100644
--- a/docs/streams/developer-guide/config-streams.html
+++ b/docs/streams/developer-guide/config-streams.html
@@ -516,7 +516,7 @@
           <h4><a class="toc-backref" href="#id17">Naming</a><a class="headerlink"
href="#naming" title="Permalink to this headline"></a></h4>
           <p>Some consumer and producer configuration parameters use the same parameter
name. For example, <code class="docutils literal"><span class="pre">send.buffer.bytes</span></code>
and
             <code class="docutils literal"><span class="pre">receive.buffer.bytes</span></code>
are used to configure TCP buffers; <code class="docutils literal"><span class="pre">request.timeout.ms</span></code>
and <code class="docutils literal"><span class="pre">retry.backoff.ms</span></code>
control retries
-            for client request. You can avoid duplicate names by prefix parameter names with
<code class="docutils literal"><span class="pre">consumer.</span></code>
or <code class="docutils literal"><span class="pre">producer</span></code>
(e.g., <code class="docutils literal"><span class="pre">consumer.send.buffer.bytes</span></code>
and <code class="docutils literal"><span class="pre">producer.send.buffer.bytes</span></code>).</p>
+            for client request. You can avoid duplicate names by prefix parameter names with
<code class="docutils literal"><span class="pre">consumer.</span></code>
or <code class="docutils literal"><span class="pre">producer.</span></code>
(e.g., <code class="docutils literal"><span class="pre">consumer.send.buffer.bytes</span></code>
and <code class="docutils literal"><span class="pre">producer.send.buffer.bytes</span></code>).</p>
           <div class="highlight-java"><div class="highlight"><pre><span></span><span
class="n">Properties</span> <span class="n">streamsSettings</span> <span
class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span
class="o">();</span>
 <span class="c1">// same value for consumer and producer</span>
 <span class="n">streamsSettings</span><span class="o">.</span><span
class="na">put</span><span class="o">(</span><span class="s">&quot;PARAMETER_NAME&quot;</span><span
class="o">,</span> <span class="s">&quot;value&quot;</span><span
class="o">);</span>
@@ -527,6 +527,24 @@
 <span class="n">streamsSettings</span><span class="o">.</span><span
class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span
class="o">.</span><span class="na">consumerPrefix</span><span class="o">(</span><span
class="s">&quot;PARAMETER_NAME&quot;</span><span class="o">),</span>
<span class="s">&quot;consumer-value&quot;</span><span class="o">);</span>
 <span class="n">streamsSettings</span><span class="o">.</span><span
class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span
class="o">.</span><span class="na">producerPrefix</span><span class="o">(</span><span
class="s">&quot;PARAMETER_NAME&quot;</span><span class="o">),</span>
<span class="s">&quot;producer-value&quot;</span><span class="o">);</span>
 </pre></div>
+          <p>You could further separate consumer configuration by adding different
prefixes:</p>
+          <ul class="simple">
+            <li><code class="docutils literal"><span class="pre">main.consumer.</span></code>
for main consumer which is the default consumer of stream source.</li>
+            <li><code class="docutils literal"><span class="pre">restore.consumer.</span></code>
for restore consumer which is in charge of state store recovery.</li>
+            <li><code class="docutils literal"><span class="pre">global.consumer.</span></code>
for global consumer which is used in global KTable construction.</li>
+          </ul>
+          <p>For example, if you only want to set restore consumer config without touching
other consumers' settings, you could simply use <code class="docutils literal"><span
class="pre">restore.consumer.</span></code> to set the config.</p>
+          <div class="highlight-java"><div class="highlight"><pre><span></span><span
class="n">Properties</span> <span class="n">streamsSettings</span> <span
class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span
class="o">();</span>
+<span class="c1">// same config value for all consumer types</span>
+<span class="n">streamsSettings</span><span class="o">.</span><span
class="na">put</span><span class="o">(</span><span class="s">&quot;consumer.PARAMETER_NAME&quot;</span><span
class="o">,</span> <span class="s">&quot;general-consumer-value&quot;</span><span
class="o">);</span>
+<span class="c1">// set a different restore consumer config. This would make restore
consumer take restore-consumer-value,</span>
+<span>// while main consumer and global consumer stay with general-consumer-value</span>
+<span class="n">streamsSettings</span><span class="o">.</span><span
class="na">put</span><span class="o">(</span><span class="s">&quot;restore.consumer.PARAMETER_NAME&quot;</span><span
class="o">,</span> <span class="s">&quot;restore-consumer-value&quot;</span><span
class="o">);</span>
+<span class="c1">// alternatively, you can use</span>
+<span class="n">streamsSettings</span><span class="o">.</span><span
class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span
class="o">.</span><span class="na">restoreConsumerPrefix</span><span
class="o">(</span><span class="s">&quot;PARAMETER_NAME&quot;</span><span
class="o">),</span> <span class="s">&quot;restore-consumer-value&quot;</span><span
class="o">);</span>
+</pre></div>
+          </div>
+          <p> Same applied to <code class="docutils literal"><span class="pre">main.consumer.</span></code>
and <code class="docutils literal"><span class="pre">main.consumer.</span></code>,
if you only want to specify one consumer type config.</p>
           </div>
         </div>
         <div class="section" id="default-values">
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
index 8a6ec05..888edf3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.processor.StateStore;
 
 import java.util.Map;
@@ -32,7 +33,7 @@ public interface KafkaClientSupplier {
     /**
      * Create an {@link AdminClient} which is used for internal topic management.
      *
-     * @param config Supplied by the {@link StreamsConfig} given to the {@link KafkaStreams}
+     * @param config Supplied by the {@link java.util.Properties} given to the {@link KafkaStreams}
      * @return an instance of {@link AdminClient}
      */
     AdminClient getAdminClient(final Map<String, Object> config);
@@ -41,7 +42,7 @@ public interface KafkaClientSupplier {
      * Create a {@link Producer} which is used to write records to sink topics.
      *
      * @param config {@link StreamsConfig#getProducerConfigs(String) producer config} which
is supplied by the
-     *               {@link StreamsConfig} given to the {@link KafkaStreams} instance
+     *               {@link java.util.Properties} given to the {@link KafkaStreams} instance
      * @return an instance of Kafka producer
      */
     Producer<byte[], byte[]> getProducer(final Map<String, Object> config);
@@ -49,8 +50,8 @@ public interface KafkaClientSupplier {
     /**
      * Create a {@link Consumer} which is used to read records of source topics.
      *
-     * @param config {@link StreamsConfig#getConsumerConfigs(String, String) consumer config}
which is
-     *               supplied by the {@link StreamsConfig} given to the {@link KafkaStreams}
instance
+     * @param config {@link StreamsConfig#getMainConsumerConfigs(String, String) consumer
config} which is
+     *               supplied by the {@link java.util.Properties} given to the {@link KafkaStreams}
instance
      * @return an instance of Kafka consumer
      */
     Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config);
@@ -59,8 +60,17 @@ public interface KafkaClientSupplier {
      * Create a {@link Consumer} which is used to read records to restore {@link StateStore}s.
      *
      * @param config {@link StreamsConfig#getRestoreConsumerConfigs(String) restore consumer
config} which is supplied
-     *               by the {@link StreamsConfig} given to the {@link KafkaStreams}
+     *               by the {@link java.util.Properties} given to the {@link KafkaStreams}
      * @return an instance of Kafka consumer
      */
     Consumer<byte[], byte[]> getRestoreConsumer(final Map<String, Object> config);
+
+    /**
+     * Create a {@link Consumer} which is used to consume records for {@link GlobalKTable}.
+     *
+     * @param config {@link StreamsConfig#getGlobalConsumerConfigs(String) global consumer
config} which is supplied
+     *               by the {@link java.util.Properties} given to the {@link KafkaStreams}
+     * @return an instance of Kafka consumer
+     */
+    Consumer<byte[], byte[]> getGlobalConsumer(final Map<String, Object> config);
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 776dde7..66a8934 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -708,7 +708,7 @@ public class KafkaStreams {
             final String globalThreadId = clientId + "-GlobalStreamThread";
             globalStreamThread = new GlobalStreamThread(globalTaskTopology,
                                                         config,
-                                                        clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(clientId
+ "-global")),
+                                                        clientSupplier.getGlobalConsumer(config.getGlobalConsumerConfigs(clientId)),
                                                         stateDirectory,
                                                         cacheSizePerThread,
                                                         metrics,
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 653243c..18dc891 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -155,6 +155,33 @@ public class StreamsConfig extends AbstractConfig {
     public static final String CONSUMER_PREFIX = "consumer.";
 
     /**
+     * Prefix used to override {@link KafkaConsumer consumer} configs for the main consumer
client from
+     * the general consumer client configs. The override precedence is the following (from
highest to lowest precedence):
+     * 1. main.consumer.[config-name]
+     * 2. consumer.[config-name]
+     * 3. [config-name]
+     */
+    public static final String MAIN_CONSUMER_PREFIX = "main.consumer.";
+
+    /**
+     * Prefix used to override {@link KafkaConsumer consumer} configs for the restore consumer
client from
+     * the general consumer client configs. The override precedence is the following (from
highest to lowest precedence):
+     * 1. restore.consumer.[config-name]
+     * 2. consumer.[config-name]
+     * 3. [config-name]
+     */
+    public static final String RESTORE_CONSUMER_PREFIX = "restore.consumer.";
+
+    /**
+     * Prefix used to override {@link KafkaConsumer consumer} configs for the global consumer
client from
+     * the general consumer client configs. The override precedence is the following (from
highest to lowest precedence):
+     * 1. global.consumer.[config-name]
+     * 2. consumer.[config-name]
+     * 3. [config-name]
+     */
+    public static final String GLOBAL_CONSUMER_PREFIX = "global.consumer.";
+
+    /**
      * Prefix used to isolate {@link KafkaProducer producer} configs from other client configs.
      * It is recommended to use {@link #producerPrefix(String)} to add this prefix to {@link
ProducerConfig producer
      * properties}.
@@ -640,6 +667,39 @@ public class StreamsConfig extends AbstractConfig {
     }
 
     /**
+     * Prefix a property with {@link #MAIN_CONSUMER_PREFIX}. This is used to isolate {@link
ConsumerConfig main consumer configs}
+     * from other client configs.
+     *
+     * @param consumerProp the consumer property to be masked
+     * @return {@link #MAIN_CONSUMER_PREFIX} + {@code consumerProp}
+     */
+    public static String mainConsumerPrefix(final String consumerProp) {
+        return MAIN_CONSUMER_PREFIX + consumerProp;
+    }
+
+    /**
+     * Prefix a property with {@link #RESTORE_CONSUMER_PREFIX}. This is used to isolate {@link
ConsumerConfig restore consumer configs}
+     * from other client configs.
+     *
+     * @param consumerProp the consumer property to be masked
+     * @return {@link #RESTORE_CONSUMER_PREFIX} + {@code consumerProp}
+     */
+    public static String restoreConsumerPrefix(final String consumerProp) {
+        return RESTORE_CONSUMER_PREFIX + consumerProp;
+    }
+
+    /**
+     * Prefix a property with {@link #GLOBAL_CONSUMER_PREFIX}. This is used to isolate {@link
ConsumerConfig global consumer configs}
+     * from other client configs.
+     *
+     * @param consumerProp the consumer property to be masked
+     * @return {@link #GLOBAL_CONSUMER_PREFIX} + {@code consumerProp}
+     */
+    public static String globalConsumerPrefix(final String consumerProp) {
+        return GLOBAL_CONSUMER_PREFIX + consumerProp;
+    }
+
+    /**
      * Prefix a property with {@link #PRODUCER_PREFIX}. This is used to isolate {@link ProducerConfig
producer configs}
      * from other client configs.
      *
@@ -771,10 +831,37 @@ public class StreamsConfig extends AbstractConfig {
      * @param groupId      consumer groupId
      * @param clientId     clientId
      * @return Map of the consumer configuration.
+     * @Deprecated use {@link StreamsConfig#getMainConsumerConfigs(String, String)}
      */
+    @Deprecated
     public Map<String, Object> getConsumerConfigs(final String groupId,
                                                   final String clientId) {
-        final Map<String, Object> consumerProps = getCommonConsumerConfigs();
+        return getMainConsumerConfigs(groupId, clientId);
+    }
+
+    /**
+     * Get the configs to the {@link KafkaConsumer main consumer}.
+     * Properties using the prefix {@link #MAIN_CONSUMER_PREFIX} will be used in favor over
+     * the properties prefixed with {@link #CONSUMER_PREFIX} and the non-prefixed versions
+     * (read the override precedence ordering in {@link #MAIN_CONSUMER_PREFIX)
+     * except in the case of {@link ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG} where we always
use the non-prefixed
+     * version as we only support reading/writing from/to the same Kafka Cluster.
+     * If not specified by {@link #MAIN_CONSUMER_PREFIX}, main consumer will share the general
consumer configs
+     * prefixed by {@link #CONSUMER_PREFIX}.
+     *
+     * @param groupId      consumer groupId
+     * @param clientId     clientId
+     * @return Map of the consumer configuration.
+     */
+    public Map<String, Object> getMainConsumerConfigs(final String groupId,
+                                                      final String clientId) {
+        Map<String, Object> consumerProps = getCommonConsumerConfigs();
+
+        // Get main consumer override configs
+        Map<String, Object> mainConsumerProps = originalsWithPrefix(MAIN_CONSUMER_PREFIX);
+        for (Map.Entry<String, Object> entry: mainConsumerProps.entrySet()) {
+            consumerProps.put(entry.getKey(), entry.getValue());
+        }
 
         // add client id with stream client id prefix, and group id
         consumerProps.put(APPLICATION_ID_CONFIG, groupId);
@@ -821,23 +908,64 @@ public class StreamsConfig extends AbstractConfig {
 
     /**
      * Get the configs for the {@link KafkaConsumer restore-consumer}.
-     * Properties using the prefix {@link #CONSUMER_PREFIX} will be used in favor over their
non-prefixed versions
+     * Properties using the prefix {@link #RESTORE_CONSUMER_PREFIX} will be used in favor
over
+     * the properties prefixed with {@link #CONSUMER_PREFIX} and the non-prefixed versions
+     * (read the override precedence ordering in {@link #RESTORE_CONSUMER_PREFIX)
      * except in the case of {@link ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG} where we always
use the non-prefixed
      * version as we only support reading/writing from/to the same Kafka Cluster.
+     * If not specified by {@link #RESTORE_CONSUMER_PREFIX}, restore consumer will share
the general consumer configs
+     * prefixed by {@link #CONSUMER_PREFIX}.
      *
      * @param clientId clientId
-     * @return Map of the consumer configuration.
+     * @return Map of the restore consumer configuration.
      */
     public Map<String, Object> getRestoreConsumerConfigs(final String clientId) {
-        final Map<String, Object> consumerProps = getCommonConsumerConfigs();
+        Map<String, Object> baseConsumerProps = getCommonConsumerConfigs();
+
+        // Get restore consumer override configs
+        Map<String, Object> restoreConsumerProps = originalsWithPrefix(RESTORE_CONSUMER_PREFIX);
+        for (Map.Entry<String, Object> entry: restoreConsumerProps.entrySet()) {
+            baseConsumerProps.put(entry.getKey(), entry.getValue());
+        }
 
         // no need to set group id for a restore consumer
-        consumerProps.remove(ConsumerConfig.GROUP_ID_CONFIG);
+        baseConsumerProps.remove(ConsumerConfig.GROUP_ID_CONFIG);
         // add client id with stream client id prefix
-        consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-restore-consumer");
-        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
+        baseConsumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-restore-consumer");
+        baseConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
 
-        return consumerProps;
+        return baseConsumerProps;
+    }
+
+    /**
+     * Get the configs for the {@link KafkaConsumer global consumer}.
+     * Properties using the prefix {@link #GLOBAL_CONSUMER_PREFIX} will be used in favor
over
+     * the properties prefixed with {@link #CONSUMER_PREFIX} and the non-prefixed versions
+     * (read the override precedence ordering in {@link #GLOBAL_CONSUMER_PREFIX)
+     * except in the case of {@link ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG} where we always
use the non-prefixed
+     * version as we only support reading/writing from/to the same Kafka Cluster.
+     * If not specified by {@link #GLOBAL_CONSUMER_PREFIX}, global consumer will share the
general consumer configs
+     * prefixed by {@link #CONSUMER_PREFIX}.
+     *
+     * @param clientId clientId
+     * @return Map of the global consumer configuration.
+     */
+    public Map<String, Object> getGlobalConsumerConfigs(final String clientId) {
+        Map<String, Object> baseConsumerProps = getCommonConsumerConfigs();
+
+        // Get global consumer override configs
+        Map<String, Object> globalConsumerProps = originalsWithPrefix(GLOBAL_CONSUMER_PREFIX);
+        for (Map.Entry<String, Object> entry: globalConsumerProps.entrySet()) {
+            baseConsumerProps.put(entry.getKey(), entry.getValue());
+        }
+
+        // no need to set group id for a global consumer
+        baseConsumerProps.remove(ConsumerConfig.GROUP_ID_CONFIG);
+        // add client id with stream client id prefix
+        baseConsumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-global-consumer");
+        baseConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
+
+        return baseConsumerProps;
     }
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
index 6f01e2f..69331b4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
@@ -35,17 +35,22 @@ public class DefaultKafkaClientSupplier implements KafkaClientSupplier
{
     }
 
     @Override
-    public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
+    public Producer<byte[], byte[]> getProducer(final Map<String, Object> config)
{
         return new KafkaProducer<>(config, new ByteArraySerializer(), new ByteArraySerializer());
     }
 
     @Override
-    public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) {
+    public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config)
{
         return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
     }
 
     @Override
-    public Consumer<byte[], byte[]> getRestoreConsumer(Map<String, Object> config)
{
+    public Consumer<byte[], byte[]> getRestoreConsumer(final Map<String, Object>
config) {
+        return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+    }
+
+    @Override
+    public Consumer<byte[], byte[]> getGlobalConsumer(final Map<String, Object>
config) {
         return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index e4ad138..cc5a07f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -641,7 +641,7 @@ public class StreamThread extends Thread {
 
         log.info("Creating consumer client");
         final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
-        final Map<String, Object> consumerConfigs = config.getConsumerConfigs(applicationId,
threadClientId);
+        final Map<String, Object> consumerConfigs = config.getMainConsumerConfigs(applicationId,
threadClientId);
         consumerConfigs.put(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR,
taskManager);
         String originalReset = null;
         if (!builder.latestResetTopicsPattern().pattern().equals("") || !builder.earliestResetTopicsPattern().pattern().equals(""))
{
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index ac82c04..e991b6f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -96,7 +96,7 @@ public class StreamsConfigTest {
     public void testGetConsumerConfigs() {
         final String groupId = "example-application";
         final String clientId = "client";
-        final Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(groupId,
clientId);
+        final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId,
clientId);
         assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId + "-consumer");
         assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), groupId);
         assertEquals(returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "1000");
@@ -115,7 +115,7 @@ public class StreamsConfigTest {
 
         final String groupId = "example-application";
         final String clientId = "client";
-        final Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(groupId,
clientId);
+        final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId,
clientId);
 
         assertEquals(42, returnedProps.get(StreamsConfig.REPLICATION_FACTOR_CONFIG));
         assertEquals(1, returnedProps.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG));
@@ -135,12 +135,22 @@ public class StreamsConfigTest {
 
         final String groupId = "example-application";
         final String clientId = "client";
-        final Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(groupId,
clientId);
+        final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId,
clientId);
 
         assertEquals(20, returnedProps.get(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG)));
     }
 
     @Test
+    public void testGetMainConsumerConfigsWithMainConsumerOverridenPrefix() {
+        props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "5");
+        props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
"50");
+        final String groupId = "example-application";
+        final String clientId = "client";
+        final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId,
clientId);
+        assertEquals("50", returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
+    }
+
+    @Test
     public void testGetRestoreConsumerConfigs() {
         final String clientId = "client";
         final Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs(clientId);
@@ -185,7 +195,7 @@ public class StreamsConfigTest {
         props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
         props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId",
"clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId",
"clientId");
         assertEquals("earliest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
         assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
     }
@@ -202,7 +212,7 @@ public class StreamsConfigTest {
     public void shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() {
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         props.put(consumerPrefix("interceptor.statsd.host"), "host");
-        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId",
"clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId",
"clientId");
         assertEquals("host", consumerConfigs.get("interceptor.statsd.host"));
     }
 
@@ -238,7 +248,7 @@ public class StreamsConfigTest {
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId",
"clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId",
"clientId");
         assertEquals("earliest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
         assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
     }
@@ -265,7 +275,7 @@ public class StreamsConfigTest {
     public void shouldForwardCustomConfigsWithNoPrefixToAllClients() {
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         props.put("custom.property.host", "host");
-        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId",
"clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId",
"clientId");
         final Map<String, Object> restoreConsumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
         final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
         final Map<String, Object> adminConfigs = streamsConfig.getAdminConfigs("clientId");
@@ -282,7 +292,7 @@ public class StreamsConfigTest {
         props.put(consumerPrefix("custom.property.host"), "host1");
         props.put(producerPrefix("custom.property.host"), "host2");
         props.put(adminClientPrefix("custom.property.host"), "host3");
-        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId",
"clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId",
"clientId");
         final Map<String, Object> restoreConsumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
         final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
         final Map<String, Object> adminConfigs = streamsConfig.getAdminConfigs("clientId");
@@ -319,7 +329,7 @@ public class StreamsConfigTest {
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
"latest");
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId",
"clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId",
"clientId");
         assertEquals("latest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
         assertEquals("10", consumerConfigs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
     }
@@ -344,7 +354,7 @@ public class StreamsConfigTest {
     public void shouldResetToDefaultIfConsumerAutoCommitIsOverridden() {
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
"true");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("a",
"b");
+        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("a",
"b");
         assertEquals("false", consumerConfigs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
     }
 
@@ -357,9 +367,65 @@ public class StreamsConfigTest {
     }
 
     @Test
+    public void testGetRestoreConsumerConfigsWithRestoreConsumerOverridenPrefix() {
+        props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "5");
+        props.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
"50");
+        final Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs("clientId");
+        assertEquals("50", returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
+    }
+
+    @Test
+    public void testGetGlobalConsumerConfigs() {
+        final String clientId = "client";
+        final Map<String, Object> returnedProps = streamsConfig.getGlobalConsumerConfigs(clientId);
+        assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId + "-global-consumer");
+        assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG));
+    }
+
+    @Test
+    public void shouldSupportPrefixedGlobalConsumerConfigs() {
+        props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        final Map<String, Object> consumerConfigs = streamsConfig.getGlobalConsumerConfigs("clientId");
+        assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
+    }
+
+    @Test
+    public void shouldSupportPrefixedPropertiesThatAreNotPartOfGlobalConsumerConfig() {
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        props.put(consumerPrefix("interceptor.statsd.host"), "host");
+        final Map<String, Object> consumerConfigs = streamsConfig.getGlobalConsumerConfigs("clientId");
+        assertEquals("host", consumerConfigs.get("interceptor.statsd.host"));
+    }
+
+    @Test
+    public void shouldBeSupportNonPrefixedGlobalConsumerConfigs() {
+        props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        final Map<String, Object> consumerConfigs = streamsConfig.getGlobalConsumerConfigs("groupId");
+        assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
+    }
+
+    @Test
+    public void shouldResetToDefaultIfGlobalConsumerAutoCommitIsOverridden() {
+        props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
"true");
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        final Map<String, Object> consumerConfigs = streamsConfig.getGlobalConsumerConfigs("client");
+        assertEquals("false", consumerConfigs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
+    }
+
+    @Test
+    public void testGetGlobalConsumerConfigsWithGlobalConsumerOverridenPrefix() {
+        props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "5");
+        props.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
"50");
+        final Map<String, Object> returnedProps = streamsConfig.getGlobalConsumerConfigs("clientId");
+        assertEquals("50", returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
+    }
+
+    @Test
     public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() {
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId",
"clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId",
"clientId");
         assertThat(consumerConfigs.get("internal.leave.group.on.close"), CoreMatchers.<Object>equalTo(false));
     }
 
@@ -388,7 +454,9 @@ public class StreamsConfigTest {
         props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
         props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "anyValue");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId",
"clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId",
"clientId");
+        String isoLevel = (String) consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG);
+        String name = READ_COMMITTED.name();
         assertThat((String) consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
     }
 
@@ -396,7 +464,7 @@ public class StreamsConfigTest {
     public void shouldAllowSettingConsumerIsolationLevelIfEosDisabled() {
         props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT));
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId",
"clientrId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId",
"clientrId");
         assertThat((String) consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT)));
     }
 
@@ -423,7 +491,7 @@ public class StreamsConfigTest {
         props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
 
-        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId",
"clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId",
"clientId");
         final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
 
         assertThat((String) consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
diff --git a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
index 1ec28fa..d3430f2 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
@@ -83,4 +83,8 @@ public class MockClientSupplier implements KafkaClientSupplier {
         return restoreConsumer;
     }
 
+    @Override
+    public Consumer<byte[], byte[]> getGlobalConsumer(final Map<String, Object>
config) {
+        return restoreConsumer;
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message