kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject [kafka] branch 2.7 updated: KAFKA-10570; Rename JMXReporter configs for KIP-629
Date Tue, 13 Oct 2020 19:37:15 GMT
This is an automated email from the ASF dual-hosted git repository.

gwenshap pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new e6b543a  KAFKA-10570; Rename JMXReporter configs for KIP-629
e6b543a is described below

commit e6b543a44a3989a190011868b4f8c6ec56e78aa9
Author: Xavier Léauté <xvrl@apache.org>
AuthorDate: Tue Oct 13 12:33:05 2020 -0700

    KAFKA-10570; Rename JMXReporter configs for KIP-629
    
    * rename whitelist/blacklist to include/exclude
    * add utility methods to translate deprecated configs
    
    Author: Xavier Léauté <xvrl@apache.org>
    
    Reviewers: Gwen Shapira
    
    Closes #9367 from xvrl/kafka-10570
    
    (cherry picked from commit f46d4f4fce341326c06c0aa8b2d0d64982573658)
    Signed-off-by: Gwen Shapira <cshapi@gmail.com>
---
 .../apache/kafka/common/metrics/JmxReporter.java   |  46 ++++---
 .../org/apache/kafka/common/utils/ConfigUtils.java | 116 +++++++++++++++++
 .../kafka/common/metrics/JmxReporterTest.java      |   4 +-
 .../apache/kafka/common/utils/ConfigUtilsTest.java | 143 +++++++++++++++++++++
 .../scala/unit/kafka/metrics/MetricsTest.scala     |   4 +-
 5 files changed, 291 insertions(+), 22 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
index 73522a9..3867091 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.metrics;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.ConfigUtils;
 import org.apache.kafka.common.utils.Sanitizer;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
@@ -51,14 +52,20 @@ public class JmxReporter implements MetricsReporter {
 
     public static final String METRICS_CONFIG_PREFIX = "metrics.jmx.";
 
-    public static final String BLACKLIST_CONFIG = METRICS_CONFIG_PREFIX + "blacklist";
-    public static final String WHITELIST_CONFIG = METRICS_CONFIG_PREFIX + "whitelist";
+    public static final String EXCLUDE_CONFIG = METRICS_CONFIG_PREFIX + "exclude";
+    public static final String EXCLUDE_CONFIG_ALIAS = METRICS_CONFIG_PREFIX + "blacklist";
 
-    public static final Set<String> RECONFIGURABLE_CONFIGS = Utils.mkSet(WHITELIST_CONFIG,
-                                                                          BLACKLIST_CONFIG);
+    public static final String INCLUDE_CONFIG = METRICS_CONFIG_PREFIX + "include";
+    public static final String INCLUDE_CONFIG_ALIAS = METRICS_CONFIG_PREFIX + "whitelist";
 
-    public static final String DEFAULT_WHITELIST = ".*";
-    public static final String DEFAULT_BLACKLIST = "";
+
+    public static final Set<String> RECONFIGURABLE_CONFIGS = Utils.mkSet(INCLUDE_CONFIG,
+                                                                         INCLUDE_CONFIG_ALIAS,
+                                                                         EXCLUDE_CONFIG,
+                                                                         EXCLUDE_CONFIG_ALIAS);
+
+    public static final String DEFAULT_INCLUDE = ".*";
+    public static final String DEFAULT_EXCLUDE = "";
 
     private static final Logger log = LoggerFactory.getLogger(JmxReporter.class);
     private static final Object LOCK = new Object();
@@ -300,27 +307,30 @@ public class JmxReporter implements MetricsReporter {
 
     }
 
-    public static Predicate<String> compilePredicate(Map<String, ?> configs)
{
-        String whitelist = (String) configs.get(WHITELIST_CONFIG);
-        String blacklist = (String) configs.get(BLACKLIST_CONFIG);
+    public static Predicate<String> compilePredicate(Map<String, ?> originalConfig)
{
+        Map<String, ?> configs = ConfigUtils.translateDeprecatedConfigs(
+            originalConfig, new String[][]{{INCLUDE_CONFIG, INCLUDE_CONFIG_ALIAS},
+                                           {EXCLUDE_CONFIG, EXCLUDE_CONFIG_ALIAS}});
+        String include = (String) configs.get(INCLUDE_CONFIG);
+        String exclude = (String) configs.get(EXCLUDE_CONFIG);
 
-        if (whitelist == null) {
-            whitelist = DEFAULT_WHITELIST;
+        if (include == null) {
+            include = DEFAULT_INCLUDE;
         }
 
-        if (blacklist == null) {
-            blacklist = DEFAULT_BLACKLIST;
+        if (exclude == null) {
+            exclude = DEFAULT_EXCLUDE;
         }
 
         try {
-            Pattern whitelistPattern = Pattern.compile(whitelist);
-            Pattern blacklistPattern = Pattern.compile(blacklist);
+            Pattern includePattern = Pattern.compile(include);
+            Pattern excludePattern = Pattern.compile(exclude);
 
-            return s -> whitelistPattern.matcher(s).matches()
-                        && !blacklistPattern.matcher(s).matches();
+            return s -> includePattern.matcher(s).matches()
+                        && !excludePattern.matcher(s).matches();
         } catch (PatternSyntaxException e) {
             throw new ConfigException("JMX filter for configuration" + METRICS_CONFIG_PREFIX
-                                      + ".(whitelist/blacklist) is not a valid regular expression");
+                                      + ".(include/exclude) is not a valid regular expression");
         }
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ConfigUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/ConfigUtils.java
new file mode 100644
index 0000000..504a1f0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/ConfigUtils.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class ConfigUtils {
+
+    private static final Logger log = LoggerFactory.getLogger(ConfigUtils.class);
+
+    /**
+     * Translates deprecated configurations into their non-deprecated equivalents
+     *
+     * This is a convenience method for {@link ConfigUtils#translateDeprecatedConfigs(Map,
Map)}
+     * until we can use Java 9+ {@code Map.of(..)} and {@code Set.of(...)}
+     *
+     * @param configs the input configuration
+     * @param aliasGroups An array of arrays of synonyms.  Each synonym array begins with
the non-deprecated synonym
+     *                    For example, new String[][] { { a, b }, { c, d, e} }
+     *                    would declare b as a deprecated synonym for a,
+     *                    and d and e as deprecated synonyms for c.
+     *                    The ordering of synonyms determines the order of precedence
+     *                    (e.g. the first synonym takes precedence over the second one)
+     * @return a new configuration map with deprecated  keys translated to their non-deprecated
equivalents
+     */
+    public static <T> Map<String, T> translateDeprecatedConfigs(Map<String,
T> configs, String[][] aliasGroups) {
+        return translateDeprecatedConfigs(configs, Stream.of(aliasGroups)
+            .collect(Collectors.toMap(x -> x[0], x -> Stream.of(x).skip(1).collect(Collectors.toList()))));
+    }
+
+    /**
+     * Translates deprecated configurations into their non-deprecated equivalents
+     *
+     * @param configs the input configuration
+     * @param aliasGroups A map of config to synonyms.  Each key is the non-deprecated synonym
+     *                    For example, Map.of(a , Set.of(b), c, Set.of(d, e))
+     *                    would declare b as a deprecated synonym for a,
+     *                    and d and e as deprecated synonyms for c.
+     *                    The ordering of synonyms determines the order of precedence
+     *                    (e.g. the first synonym takes precedence over the second one)
+     * @return a new configuration map with deprecated  keys translated to their non-deprecated
equivalents
+     */
+    public static <T> Map<String, T> translateDeprecatedConfigs(Map<String,
T> configs,
+                                                                Map<String, List<String>>
aliasGroups) {
+        Set<String> aliasSet = Stream.concat(
+            aliasGroups.keySet().stream(),
+            aliasGroups.values().stream().flatMap(Collection::stream))
+            .collect(Collectors.toSet());
+
+        // pass through all configurations without aliases
+        Map<String, T> newConfigs = configs.entrySet().stream()
+            .filter(e -> !aliasSet.contains(e.getKey()))
+            // filter out null values
+            .filter(e -> Objects.nonNull(e.getValue()))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+        aliasGroups.forEach((target, aliases) -> {
+            List<String> deprecated = aliases.stream()
+                .filter(configs::containsKey)
+                .collect(Collectors.toList());
+
+            if (deprecated.isEmpty()) {
+                // No deprecated key(s) found.
+                if (configs.containsKey(target)) {
+                    newConfigs.put(target, configs.get(target));
+                }
+                return;
+            }
+
+            String aliasString = String.join(", ", deprecated);
+
+            if (configs.containsKey(target)) {
+                // Ignore the deprecated key(s) because the actual key was set.
+                log.error(target + " was configured, as well as the deprecated alias(es)
" +
+                          aliasString + ".  Using the value of " + target);
+                newConfigs.put(target, configs.get(target));
+            } else if (deprecated.size() > 1) {
+                log.error("The configuration keys " + aliasString + " are deprecated and
may be " +
+                          "removed in the future.  Additionally, this configuration is ambigous
because " +
+                          "these configuration keys are all aliases for " + target + ". 
Please update " +
+                          "your configuration to have only " + target + " set.");
+                newConfigs.put(target, configs.get(deprecated.get(0)));
+            } else {
+                log.warn("Configuration key " + deprecated.get(0) + " is deprecated and may
be removed " +
+                         "in the future.  Please update your configuration to use " + target
+ " instead.");
+                newConfigs.put(target, configs.get(deprecated.get(0)));
+            }
+        });
+
+        return newConfigs;
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
index 3285889..15eebdc 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
@@ -124,7 +124,7 @@ public class JmxReporterTest {
 
         Map<String, String> configs = new HashMap<>();
 
-        configs.put(JmxReporter.BLACKLIST_CONFIG,
+        configs.put(JmxReporter.EXCLUDE_CONFIG,
                     JmxReporter.getMBeanName("", metrics.metricName("pack.bean2.total", "grp2")));
 
         try {
@@ -143,7 +143,7 @@ public class JmxReporterTest {
 
             sensor.record();
 
-            configs.put(JmxReporter.BLACKLIST_CONFIG,
+            configs.put(JmxReporter.EXCLUDE_CONFIG,
                         JmxReporter.getMBeanName("", metrics.metricName("pack.bean2.avg",
"grp1")));
 
             reporter.reconfigure(configs);
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ConfigUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ConfigUtilsTest.java
new file mode 100644
index 0000000..c77cb0e
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/utils/ConfigUtilsTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class ConfigUtilsTest {
+
+    @Test
+    public void testTranslateDeprecated() {
+        Map<String, Object> config = new HashMap<>();
+        config.put("foo.bar", "baz");
+        config.put("foo.bar.deprecated", "quux");
+        config.put("chicken", "1");
+        config.put("rooster", "2");
+        config.put("hen", "3");
+        config.put("heifer", "moo");
+        config.put("blah", "blah");
+        config.put("unexpected.non.string.object", 42);
+        Map<String, Object> newConfig = ConfigUtils.translateDeprecatedConfigs(config,
new String[][]{
+            {"foo.bar", "foo.bar.deprecated"},
+            {"chicken", "rooster", "hen"},
+            {"cow", "beef", "heifer", "steer"}
+        });
+        assertEquals("baz", newConfig.get("foo.bar"));
+        assertEquals(null, newConfig.get("foobar.deprecated"));
+        assertEquals("1", newConfig.get("chicken"));
+        assertEquals(null, newConfig.get("rooster"));
+        assertEquals(null, newConfig.get("hen"));
+        assertEquals("moo", newConfig.get("cow"));
+        assertEquals(null, newConfig.get("beef"));
+        assertEquals(null, newConfig.get("heifer"));
+        assertEquals(null, newConfig.get("steer"));
+        assertEquals(null, config.get("cow"));
+        assertEquals("blah", config.get("blah"));
+        assertEquals("blah", newConfig.get("blah"));
+        assertEquals(42, newConfig.get("unexpected.non.string.object"));
+        assertEquals(42, config.get("unexpected.non.string.object"));
+
+    }
+
+    @Test
+    public void testAllowsNewKey() {
+        Map<String, String> config = new HashMap<>();
+        config.put("foo.bar", "baz");
+        Map<String, String> newConfig = ConfigUtils.translateDeprecatedConfigs(config,
new String[][]{
+            {"foo.bar", "foo.bar.deprecated"},
+            {"chicken", "rooster", "hen"},
+            {"cow", "beef", "heifer", "steer"}
+        });
+        assertNotNull(newConfig);
+        assertEquals("baz", newConfig.get("foo.bar"));
+        assertNull(newConfig.get("foo.bar.deprecated"));
+    }
+
+    @Test
+    public void testAllowDeprecatedNulls() {
+        Map<String, String> config = new HashMap<>();
+        config.put("foo.bar.deprecated", null);
+        config.put("foo.bar", "baz");
+        Map<String, String> newConfig = ConfigUtils.translateDeprecatedConfigs(config,
new String[][]{
+            {"foo.bar", "foo.bar.deprecated"}
+        });
+        assertNotNull(newConfig);
+        assertEquals("baz", newConfig.get("foo.bar"));
+        assertNull(newConfig.get("foo.bar.deprecated"));
+    }
+
+    @Test
+    public void testAllowNullOverride() {
+        Map<String, String> config = new HashMap<>();
+        config.put("foo.bar.deprecated", "baz");
+        config.put("foo.bar", null);
+        Map<String, String> newConfig = ConfigUtils.translateDeprecatedConfigs(config,
new String[][]{
+            {"foo.bar", "foo.bar.deprecated"}
+        });
+        assertNotNull(newConfig);
+        assertNull(newConfig.get("foo.bar"));
+        assertNull(newConfig.get("foo.bar.deprecated"));
+    }
+
+    @Test
+    public void testNullMapEntriesWithoutAliasesDoNotThrowNPE() {
+        Map<String, String> config = new HashMap<>();
+        config.put("other", null);
+        Map<String, String> newConfig = ConfigUtils.translateDeprecatedConfigs(config,
new String[][]{
+            {"foo.bar", "foo.bar.deprecated"}
+        });
+        assertNotNull(newConfig);
+        assertNull(newConfig.get("other"));
+    }
+
+    @Test
+    public void testDuplicateSynonyms() {
+        Map<String, String> config = new HashMap<>();
+        config.put("foo.bar", "baz");
+        config.put("foo.bar.deprecated", "derp");
+        Map<String, String> newConfig = ConfigUtils.translateDeprecatedConfigs(config,
new String[][]{
+            {"foo.bar", "foo.bar.deprecated"},
+            {"chicken", "foo.bar.deprecated"}
+        });
+        assertNotNull(newConfig);
+        assertEquals("baz", newConfig.get("foo.bar"));
+        assertEquals("derp", newConfig.get("chicken"));
+        assertNull(newConfig.get("foo.bar.deprecated"));
+    }
+
+    @Test
+    public void testMultipleDeprecations() {
+        Map<String, String> config = new HashMap<>();
+        config.put("foo.bar.deprecated", "derp");
+        config.put("foo.bar.even.more.deprecated", "very old configuration");
+        Map<String, String> newConfig = ConfigUtils.translateDeprecatedConfigs(config,
new String[][]{
+            {"foo.bar", "foo.bar.deprecated", "foo.bar.even.more.deprecated"}
+        });
+        assertNotNull(newConfig);
+        assertEquals("derp", newConfig.get("foo.bar"));
+        assertNull(newConfig.get("foo.bar.deprecated"));
+        assertNull(newConfig.get("foo.bar.even.more.deprecated"));
+    }
+}
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index 7d8643f..bac15de 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -40,7 +40,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
 
   val overridingProps = new Properties
   overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)
-  overridingProps.put(JmxReporter.BLACKLIST_CONFIG, "kafka.server:type=KafkaServer,name=ClusterId")
+  overridingProps.put(JmxReporter.EXCLUDE_CONFIG, "kafka.server:type=KafkaServer,name=ClusterId")
 
   def generateConfigs =
     TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps(_, overridingProps))
@@ -90,7 +90,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
   def testUpdateJMXFilter(): Unit = {
     // verify previously exposed metrics are removed and existing matching metrics are added
     servers.foreach(server => server.kafkaYammerMetrics.reconfigure(
-      Map(JmxReporter.BLACKLIST_CONFIG -> "kafka.controller:type=KafkaController,name=ActiveControllerCount").asJava
+      Map(JmxReporter.EXCLUDE_CONFIG -> "kafka.controller:type=KafkaController,name=ActiveControllerCount").asJava
     ))
     assertFalse(ManagementFactory.getPlatformMBeanServer
                  .isRegistered(new ObjectName("kafka.controller:type=KafkaController,name=ActiveControllerCount")))


Mime
View raw message