kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maniku...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-9046: Use top-level worker configs for connector admin clients
Date Thu, 14 Nov 2019 08:40:15 GMT
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 970c7ee  KAFKA-9046: Use top-level worker configs for connector admin clients
970c7ee is described below

commit 970c7ee9db941a9804763f61d4e5a6c76c873de2
Author: Chris Egerton <chrise@confluent.io>
AuthorDate: Thu Nov 14 14:09:04 2019 +0530

    KAFKA-9046: Use top-level worker configs for connector admin clients
    
    [Jira](https://issues.apache.org/jira/browse/KAFKA-9046)
    
    The changes here are meant to find a healthy compromise between the pre- and post-KIP-458
functionality of Connect workers when configuring admin clients for use with DLQs. Before
KIP-458, admin clients were configured using the top-level worker configs; after KIP-458,
they are configured using worker configs with a prefix of `admin.` and then optionally overridden
by connector configs with a prefix of `admin.override.`. The behavior proposed here is to
use, in ascending order of prec [...]
    
    Author: Chris Egerton <chrise@confluent.io>
    
    Reviewers: Konstantine Karantasis <konstantine@confluent.io>, Randall Hauch <rhauch@gmail.com>,
Nigel Liang <nigel@nigelliang.com>
    
    Closes #7525 from C0urante/kafka-9046
    
    (cherry picked from commit 38d243b022336ecaf5cb400ae015c485f56ff978)
    Signed-off-by: Manikumar Reddy <manikumar@confluent.io>
---
 .../java/org/apache/kafka/connect/runtime/Worker.java  | 18 ++++++++++++++++--
 .../org/apache/kafka/connect/runtime/WorkerTest.java   |  7 ++++---
 2 files changed, 20 insertions(+), 5 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index fbece79..814c750 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -607,8 +608,21 @@ public class Worker {
                                             Class<? extends Connector> connectorClass,
                                             ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy)
{
         Map<String, Object> adminProps = new HashMap<>();
-        adminProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG),
","));
-        // User-specified overrides
+        // Use the top-level worker configs to retain backwards compatibility with older
releases which
+        // did not require a prefix for connector admin client configs in the worker configuration
file
+        // Ignore configs that begin with "admin." since those will be added next (with the
prefix stripped)
+        // and those that begin with "producer." and "consumer.", since we know they aren't
intended for
+        // the admin client
+        Map<String, Object> nonPrefixedWorkerConfigs = config.originals().entrySet().stream()
+            .filter(e -> !e.getKey().startsWith("admin.")
+                && !e.getKey().startsWith("producer.")
+                && !e.getKey().startsWith("consumer."))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+        adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
+            Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
+        adminProps.putAll(nonPrefixedWorkerConfigs);
+
+        // Admin client-specific overrides in the worker config
         adminProps.putAll(config.originalsWithPrefix("admin."));
 
         // Connector-specified overrides
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 36a9b66..7021503 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -1138,12 +1138,15 @@ public class WorkerTest extends ThreadedTest {
         Map<String, String> props = new HashMap<>(workerProps);
         props.put("admin.client.id", "testid");
         props.put("admin.metadata.max.age.ms", "5000");
+        props.put("producer.bootstrap.servers", "cbeauho.com");
+        props.put("consumer.bootstrap.servers", "localhost:4761");
         WorkerConfig configWithOverrides = new StandaloneConfig(props);
 
         Map<String, Object> connConfig = new HashMap<String, Object>();
         connConfig.put("metadata.max.age.ms", "10000");
 
-        Map<String, String> expectedConfigs = new HashMap<>();
+        Map<String, String> expectedConfigs = new HashMap<>(workerProps);
+
         expectedConfigs.put("bootstrap.servers", "localhost:9092");
         expectedConfigs.put("client.id", "testid");
         expectedConfigs.put("metadata.max.age.ms", "10000");
@@ -1153,7 +1156,6 @@ public class WorkerTest extends ThreadedTest {
         PowerMock.replayAll();
         assertEquals(expectedConfigs, Worker.adminConfigs(new ConnectorTaskId("test", 1),
configWithOverrides, connectorConfig,
                                                              null, allConnectorClientConfigOverridePolicy));
-
     }
 
     @Test(expected = ConnectException.class)
@@ -1171,7 +1173,6 @@ public class WorkerTest extends ThreadedTest {
         PowerMock.replayAll();
         Worker.adminConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig,
                                                           null, noneConnectorClientConfigOverridePolicy);
-
     }
 
     private void assertStatusMetrics(long expected, String metricName) {


Mime
View raw message