kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5293; Do not apply exponential backoff if users have overridden…
Date Thu, 01 Jun 2017 20:27:50 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 39c1e6259 -> b3036c586


KAFKA-5293; Do not apply exponential backoff if users have overridden…

… reconnect.backoff.ms

Author: Colin P. Mccabe <cmccabe@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #3174 from cmccabe/KAFKA-5293


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b3036c58
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b3036c58
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b3036c58

Branch: refs/heads/trunk
Commit: b3036c5861df2c5fc8a06ca5b817d43147f5ba92
Parents: 39c1e62
Author: Colin P. Mccabe <cmccabe@confluent.io>
Authored: Thu Jun 1 21:17:49 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Thu Jun 1 21:21:58 2017 +0100

----------------------------------------------------------------------
 .../kafka/clients/CommonClientConfigs.java      | 27 +++++++
 .../kafka/clients/admin/AdminClientConfig.java  |  5 ++
 .../kafka/clients/consumer/ConsumerConfig.java  |  5 ++
 .../kafka/clients/producer/ProducerConfig.java  |  5 ++
 .../kafka/clients/CommonClientConfigsTest.java  | 85 ++++++++++++++++++++
 .../kafka/connect/runtime/WorkerConfig.java     |  6 ++
 .../runtime/distributed/DistributedConfig.java  |  2 +-
 .../org/apache/kafka/streams/StreamsConfig.java |  5 +-
 8 files changed, 137 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b3036c58/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index f7103be..8634bb3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -16,16 +16,22 @@
  */
 package org.apache.kafka.clients;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Some configurations shared by both producer and consumer
  */
 public class CommonClientConfigs {
+    private static final Logger log = LoggerFactory.getLogger(CommonClientConfigs.class);
 
     /*
      * NOTE: DO NOT CHANGE EITHER CONFIG NAMES AS THESE ARE PART OF THE PUBLIC API AND CHANGE
WILL BREAK USER CODE.
@@ -90,4 +96,25 @@ public class CommonClientConfigs {
         return names;
     }
 
+
+    /**
+     * Postprocess the configuration so that exponential backoff is disabled when reconnect
backoff
+     * is explicitly configured but the maximum reconnect backoff is not cexplicitly onfigured.
+     *
+     * @param config                    The config object.
+     * @param parsedValues              The parsedValues as provided to postProcessParsedConfig.
+     *
+     * @return                          The new values which have been set as described in
postProcessParsedConfig.
+     */
+    public static Map<String, Object> postProcessReconnectBackoffConfigs(AbstractConfig
config,
+                                                    Map<String, Object> parsedValues)
{
+        HashMap<String, Object> rval = new HashMap<>();
+        if ((!config.originals().containsKey(RECONNECT_BACKOFF_MAX_MS_CONFIG)) &&
+                config.originals().containsKey(RECONNECT_BACKOFF_MS_CONFIG)) {
+            log.debug("Disabling exponential reconnect backoff because " + RECONNECT_BACKOFF_MS_CONFIG
+
+                " is set, but " + RECONNECT_BACKOFF_MAX_MS_CONFIG + " is not.");
+            rval.put(RECONNECT_BACKOFF_MAX_MS_CONFIG, parsedValues.get(RECONNECT_BACKOFF_MS_CONFIG));
+        }
+        return rval;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b3036c58/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index 62a48a8..49f5753 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -160,6 +160,11 @@ public class AdminClientConfig extends AbstractConfig {
                                 .withClientSaslSupport();
     }
 
+    @Override
+    protected Map<String, Object> postProcessParsedConfig(final Map<String, Object>
parsedValues) {
+        return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
+    }
+
     AdminClientConfig(Map<?, ?> props) {
         super(CONFIG, props);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b3036c58/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index b838b14..01b8c33 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -447,6 +447,11 @@ public class ConsumerConfig extends AbstractConfig {
 
     }
 
+    @Override
+    protected Map<String, Object> postProcessParsedConfig(final Map<String, Object>
parsedValues) {
+        return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
+    }
+
     public static Map<String, Object> addDeserializerToConfig(Map<String, Object>
configs,
                                                               Deserializer<?> keyDeserializer,
                                                               Deserializer<?> valueDeserializer)
{

http://git-wip-us.apache.org/repos/asf/kafka/blob/b3036c58/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 399a5a5..2059495 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -328,6 +328,11 @@ public class ProducerConfig extends AbstractConfig {
                                         TRANSACTIONAL_ID_DOC);
     }
 
+    @Override
+    protected Map<String, Object> postProcessParsedConfig(final Map<String, Object>
parsedValues) {
+        return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
+    }
+
     public static Map<String, Object> addSerializerToConfig(Map<String, Object>
configs,
                                                             Serializer<?> keySerializer,
Serializer<?> valueSerializer) {
         Map<String, Object> newConfigs = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/b3036c58/clients/src/test/java/org/apache/kafka/clients/CommonClientConfigsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/CommonClientConfigsTest.java b/clients/src/test/java/org/apache/kafka/clients/CommonClientConfigsTest.java
new file mode 100644
index 0000000..63a9312
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/CommonClientConfigsTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.junit.Assert.assertEquals;
+
+public class CommonClientConfigsTest {
+    private static class TestConfig extends AbstractConfig {
+        private static final ConfigDef CONFIG;
+        static {
+            CONFIG = new ConfigDef()
+                .define(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG,
+                    ConfigDef.Type.LONG,
+                    50L,
+                    atLeast(0L),
+                    ConfigDef.Importance.LOW,
+                    "")
+                .define(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG,
+                    ConfigDef.Type.LONG,
+                    1000L,
+                    atLeast(0L),
+                    ConfigDef.Importance.LOW,
+                    "");
+        }
+
+        @Override
+        protected Map<String, Object> postProcessParsedConfig(final Map<String,
Object> parsedValues) {
+            return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
+        }
+
+        public TestConfig(Map<?, ?> props) {
+            super(CONFIG, props);
+        }
+    }
+
+    @Test
+    public void testExponentialBackoffDefaults() throws Exception {
+        TestConfig defaultConf = new TestConfig(Collections.emptyMap());
+        assertEquals(Long.valueOf(50L),
+                defaultConf.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG));
+        assertEquals(Long.valueOf(1000L),
+                defaultConf.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG));
+
+        TestConfig bothSetConfig = new TestConfig(new HashMap<String, Object>() {{
+                put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, "123");
+                put(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG, "12345");
+            }});
+        assertEquals(Long.valueOf(123L),
+                bothSetConfig.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG));
+        assertEquals(Long.valueOf(12345L),
+                bothSetConfig.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG));
+
+        TestConfig reconnectBackoffSetConf = new TestConfig(new HashMap<String, Object>()
{{
+                put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, "123");
+            }});
+        assertEquals(Long.valueOf(123L),
+                reconnectBackoffSetConf.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG));
+        assertEquals(Long.valueOf(123L),
+                reconnectBackoffSetConf.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b3036c58/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index fe7a35a..9ac1b3b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
@@ -180,6 +181,11 @@ public class WorkerConfig extends AbstractConfig {
                 );
     }
 
+    @Override
+    protected Map<String, Object> postProcessParsedConfig(final Map<String, Object>
parsedValues) {
+        return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
+    }
+
     public static List<String> pluginLocations(Map<String, String> props) {
         String locationList = props.get(WorkerConfig.PLUGIN_PATH_CONFIG);
         return locationList == null

http://git-wip-us.apache.org/repos/asf/kafka/blob/b3036c58/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index 8c6c4a4..650ef67 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -184,7 +184,7 @@ public class DistributedConfig extends WorkerConfig {
                         CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
                 .define(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG,
                         ConfigDef.Type.LONG,
-                        50L,
+                        1000L,
                         atLeast(0L),
                         ConfigDef.Importance.LOW,
                         CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_DOC)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b3036c58/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 582aa9b..74db9a4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -415,7 +415,7 @@ public class StreamsConfig extends AbstractConfig {
                     CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
             .define(RECONNECT_BACKOFF_MAX_MS_CONFIG,
                     Type.LONG,
-                    50L,
+                    1000L,
                     atLeast(0L),
                     ConfigDef.Importance.LOW,
                     CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_DOC)
@@ -569,7 +569,8 @@ public class StreamsConfig extends AbstractConfig {
 
     @Override
     protected Map<String, Object> postProcessParsedConfig(final Map<String, Object>
parsedValues) {
-        final Map<String, Object> configUpdates = new HashMap<>();
+        final Map<String, Object> configUpdates =
+            CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
 
         final boolean eosEnabled = EXACTLY_ONCE.equals(parsedValues.get(PROCESSING_GUARANTEE_CONFIG));
         if (eosEnabled && !originals().containsKey(COMMIT_INTERVAL_MS_CONFIG)) {


Mime
View raw message