kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kkaranta...@apache.org
Subject [kafka] branch 2.6 updated: KAFKA-10069: Correctly remove user-defined "predicate" and "negate" configs from transformation properties (#8755)
Date Thu, 04 Jun 2020 21:51:04 GMT
This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 87f54f0  KAFKA-10069: Correctly remove user-defined "predicate" and "negate" configs
from transformation properties (#8755)
87f54f0 is described below

commit 87f54f060c67738a6318246864d3409f8790add5
Author: Chia-Ping Tsai <chia7712@gmail.com>
AuthorDate: Thu Jun 4 13:13:50 2020 +0800

    KAFKA-10069: Correctly remove user-defined "predicate" and "negate" configs from transformation
properties (#8755)
    
    With the recent introduction of predicated SMTs, properties named "predicate" and "negate"
should be ignored and removed in case they are present in transformation configs.
    This commit fixes the equality check to be with the key of the config to apply proper
removal.
    
    Reviewers: Tom Bentley <tbentley@redhat.com>, Konstantine Karantasis <konstantine@confluent.io>
---
 .../kafka/connect/runtime/ConnectorConfig.java     |  6 +--
 .../kafka/connect/runtime/ConnectorConfigTest.java | 49 ++++++++++++++++++++++
 2 files changed, 52 insertions(+), 3 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index 37a710c..69f417b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -331,10 +331,10 @@ public class ConnectorConfig extends AbstractConfig {
                 return super.configDefsForClass(typeConfig)
                     .filter(entry -> {
                         // The implicit parameters mask any from the transformer with the
same name
-                        if (PredicatedTransformation.PREDICATE_CONFIG.equals(entry.getValue())
-                                || PredicatedTransformation.NEGATE_CONFIG.equals(entry.getValue()))
{
+                        if (PredicatedTransformation.PREDICATE_CONFIG.equals(entry.getKey())
+                                || PredicatedTransformation.NEGATE_CONFIG.equals(entry.getKey()))
{
                             log.warn("Transformer config {} is masked by implicit config
of that name",
-                                    entry.getValue());
+                                    entry.getKey());
                             return false;
                         } else {
                             return true;
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
index c35663b..c400e48 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
@@ -33,6 +33,8 @@ import java.util.Map;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -434,5 +436,52 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>>
{
         }
     }
 
+    @Test
+    public void testEnrichedConfigDef() {
+        String alias = "hdt";
+        String prefix = ConnectorConfig.TRANSFORMS_CONFIG + "." + alias + ".";
+        Map<String, String> props = new HashMap<>();
+        props.put(ConnectorConfig.TRANSFORMS_CONFIG, alias);
+        props.put(prefix + "type", HasDuplicateConfigTransformation.class.getName());
+        ConfigDef def = ConnectorConfig.enrich(MOCK_PLUGINS, new ConfigDef(), props, false);
+        assertEnrichedConfigDef(def, prefix, HasDuplicateConfigTransformation.MUST_EXIST_KEY,
ConfigDef.Type.BOOLEAN);
+        assertEnrichedConfigDef(def, prefix, PredicatedTransformation.PREDICATE_CONFIG, ConfigDef.Type.STRING);
+        assertEnrichedConfigDef(def, prefix, PredicatedTransformation.NEGATE_CONFIG, ConfigDef.Type.BOOLEAN);
+    }
+
+    private static void assertEnrichedConfigDef(ConfigDef def, String prefix, String keyName,
ConfigDef.Type expectedType) {
+        assertNull(def.configKeys().get(keyName));
+        ConfigDef.ConfigKey configKey = def.configKeys().get(prefix + keyName);
+        assertNotNull(prefix + keyName + "' config must be present", configKey);
+        assertEquals(prefix + keyName + "' config should be a " + expectedType, expectedType,
configKey.type);
+    }
+
+    public static class HasDuplicateConfigTransformation<R extends ConnectRecord<R>>
implements Transformation<R> {
+        private static final String MUST_EXIST_KEY = "must.exist.key";
+        private static final ConfigDef CONFIG_DEF = new ConfigDef()
+                // this configDef is duplicate. It should be removed automatically so as
to avoid duplicate config error.
+                .define(PredicatedTransformation.PREDICATE_CONFIG, ConfigDef.Type.INT, ConfigDef.NO_DEFAULT_VALUE,
ConfigDef.Importance.MEDIUM, "fake")
+                // this configDef is duplicate. It should be removed automatically so as
to avoid duplicate config error.
+                .define(PredicatedTransformation.NEGATE_CONFIG, ConfigDef.Type.INT, 123,
ConfigDef.Importance.MEDIUM, "fake")
+                // this configDef should appear if above duplicate configDef is removed without
any error
+                .define(MUST_EXIST_KEY, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM,
"this key must exist");
+
+        @Override
+        public R apply(R record) {
+            return record;
+        }
+
+        @Override
+        public ConfigDef config() {
+            return CONFIG_DEF;
+        }
 
+        @Override
+        public void close() {
+        }
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+        }
+    }
 }


Mime
View raw message