kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject [kafka] branch trunk updated: KAFKA-4930: Enforce set of legal characters for connector names (KIP-212)
Date Wed, 31 Jan 2018 16:49:38 GMT
This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 530bc59  KAFKA-4930: Enforce set of legal characters for connector names (KIP-212)
530bc59 is described below

commit 530bc59de2732ba71e82c0110c59e9f6162531c6
Author: Soenke Liebau <soenke.liebau@opencore.com>
AuthorDate: Wed Jan 31 08:49:23 2018 -0800

    KAFKA-4930: Enforce set of legal characters for connector names (KIP-212)
    
    …to check for empty connector name and illegal characters in connector name. This also
fixes  KAFKA-4938 by removing the check for slashes in connector name from ConnectorsResource.
    
    Author: Ewen Cheslack-Postava <me@ewencp.org>
    Author: Soenke Liebau <soenke.liebau@opencore.com>
    
    Reviewers: Gwen Shapira <cshapi@gmail.com>, Viktor Somogyi <viktor.somogyi@cloudera.com>,
Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>
    
    Closes #2755 from soenkeliebau/KAFKA-4930
---
 .../org/apache/kafka/common/config/ConfigDef.java  |  34 ++++++
 .../apache/kafka/common/config/ConfigDefTest.java  |   3 +
 .../kafka/connect/runtime/ConnectorConfig.java     |   3 +-
 .../runtime/rest/resources/ConnectorsResource.java |   9 +-
 .../kafka/connect/runtime/ConnectorConfigTest.java |  10 +-
 .../rest/resources/ConnectorsResourceTest.java     | 119 +++++++++++++++++++--
 6 files changed, 163 insertions(+), 15 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 3080298..bb199dd 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -982,6 +982,40 @@ public class ConfigDef {
         }
     }
 
+    public static class NonEmptyStringWithoutControlChars implements Validator {
+
+        public static NonEmptyStringWithoutControlChars nonEmptyStringWithoutControlChars()
{
+            return new NonEmptyStringWithoutControlChars();
+        }
+
+        @Override
+        public void ensureValid(String name, Object value) {
+            String s = (String) value;
+
+            if (s == null) {
+                // This can happen during creation of the config object due to no default
value being defined for the
+                // name configuration - a missing name parameter is caught when checking
for mandatory parameters,
+                // thus we can ok a null value here
+                return;
+            } else if (s.isEmpty()) {
+                throw new ConfigException(name, value, "String may not be empty");
+            }
+
+            // Check name string for illegal characters
+            ArrayList<Integer> foundIllegalCharacters = new ArrayList<>();
+
+            for (int i = 0; i < s.length(); i++) {
+                if (Character.isISOControl(s.codePointAt(i))) {
+                    foundIllegalCharacters.add(s.codePointAt(i));
+                }
+            }
+
+            if (!foundIllegalCharacters.isEmpty()) {
+                throw new ConfigException(name, value, "String may not contain control sequences
but had the following ASCII chars: " + Utils.join(foundIllegalCharacters, ", "));
+            }
+        }
+    }
+
     public static class ConfigKey {
         public final String name;
         public final Type type;
diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
index 602147b..339c51a 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
@@ -160,6 +160,9 @@ public class ConfigDefTest {
         testValidators(Type.LIST, ConfigDef.ValidList.in("1", "2", "3"), "1", new Object[]{"1",
"2", "3"}, new Object[]{"4", "5", "6"});
         testValidators(Type.STRING, new ConfigDef.NonNullValidator(), "a", new Object[]{"abb"},
new Object[] {null});
         testValidators(Type.STRING, ConfigDef.CompositeValidator.of(new ConfigDef.NonNullValidator(),
ValidString.in("a", "b")), "a", new Object[]{"a", "b"}, new Object[] {null, -1, "c"});
+        testValidators(Type.STRING, new ConfigDef.NonEmptyStringWithoutControlChars(), "defaultname",
+                new Object[]{"test", "name", "test/test", "test\u1234", "\u1324name\\", "/+%>&):??<&()?-",
"+1", "\uD83D\uDE01", "\uF3B1", "     test   \n\r", "\n  hello \t"},
+                new Object[]{"nontrailing\nnotallowed", "as\u0001cii control char", "tes\rt",
"test\btest", "1\t2", ""});
     }
 
     @Test
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index e63d100..aad12c3 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -37,6 +37,7 @@ import java.util.List;
 import java.util.Map;
 
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars;
 
 /**
  * <p>
@@ -96,7 +97,7 @@ public class ConnectorConfig extends AbstractConfig {
 
     public static ConfigDef configDef() {
         return new ConfigDef()
-                .define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC, COMMON_GROUP,
1, Width.MEDIUM, NAME_DISPLAY)
+                .define(NAME_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, nonEmptyStringWithoutControlChars(),
Importance.HIGH, NAME_DOC, COMMON_GROUP, 1, Width.MEDIUM, NAME_DISPLAY)
                 .define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC,
COMMON_GROUP, 2, Width.LONG, CONNECTOR_CLASS_DISPLAY)
                 .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG),
Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3, Width.SHORT, TASK_MAX_DISPLAY)
                 .define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, KEY_CONVERTER_CLASS_DOC,
COMMON_GROUP, 4, Width.SHORT, KEY_CONVERTER_CLASS_DISPLAY)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index 7a01168..e966104 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -90,10 +90,11 @@ public class ConnectorsResource {
     @Path("/")
     public Response createConnector(final @QueryParam("forward") Boolean forward,
                                     final CreateConnectorRequest createRequest) throws Throwable
{
-        String name = createRequest.name();
-        if (name.contains("/")) {
-            throw new BadRequestException("connector name should not contain '/'");
-        }
+        // Trim leading and trailing whitespaces from the connector name, replace null with
empty string
+        // if no name element present to keep validation within validator (NonEmptyStringWithoutControlChars
+        // allows null values)
+        String name = createRequest.name() == null ? "" : createRequest.name().trim();
+
         Map<String, String> configs = createRequest.config();
         checkAndPutConnectorConfigName(name, configs);
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
index f8c4fd6..fe1bf26 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
@@ -91,6 +91,14 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>>
{
     }
 
     @Test(expected = ConfigException.class)
+    public void emptyConnectorName() {
+        Map<String, String> props = new HashMap<>();
+        props.put("name", "");
+        props.put("connector.class", TestConnector.class.getName());
+        new ConnectorConfig(MOCK_PLUGINS, props);
+    }
+
+    @Test(expected = ConfigException.class)
     public void wrongTransformationType() {
         Map<String, String> props = new HashMap<>();
         props.put("name", "test");
@@ -168,5 +176,5 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>>
{
         assertEquals(42, ((SimpleTransformation) transformations.get(0)).magicNumber);
         assertEquals(84, ((SimpleTransformation) transformations.get(1)).magicNumber);
     }
-    
+
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index 6e17349..cc46080 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -68,8 +68,11 @@ public class ConnectorsResourceTest {
     // URL construction properly, avoiding //, which will mess up routing in the REST server
     private static final String LEADER_URL = "http://leader:8083/";
     private static final String CONNECTOR_NAME = "test";
-    private static final String CONNECTOR_NAME_SPECIAL_CHARS = "t?a=b&c=d\rx=1.1\n><
\t`'\" x%y+z!#$&'()*+,:;=?@[]";
+    private static final String CONNECTOR_NAME_SPECIAL_CHARS = "ta/b&c=d//\\rx=1þ.1><
`'\" x%y+z!ሴ#$&'(æ)*+,:;=?ñ@[]ÿ";
+    private static final String CONNECTOR_NAME_CONTROL_SEQUENCES1 = "ta/b&c=drx=1\n.1><
`'\" x%y+z!#$&'()*+,:;=?@[]";
     private static final String CONNECTOR2_NAME = "test2";
+    private static final String CONNECTOR_NAME_ALL_WHITESPACES = "   \t\n  \b";
+    private static final String CONNECTOR_NAME_PADDING_WHITESPACES = "   " + CONNECTOR_NAME
+ "  \n  ";
     private static final Boolean FORWARD = true;
     private static final Map<String, String> CONNECTOR_CONFIG_SPECIAL_CHARS = new HashMap<>();
     static {
@@ -82,6 +85,24 @@ public class ConnectorsResourceTest {
         CONNECTOR_CONFIG.put("name", CONNECTOR_NAME);
         CONNECTOR_CONFIG.put("sample_config", "test_config");
     }
+
+    private static final Map<String, String> CONNECTOR_CONFIG_CONTROL_SEQUENCES = new
HashMap<>();
+    static {
+        CONNECTOR_CONFIG_CONTROL_SEQUENCES.put("name", CONNECTOR_NAME_CONTROL_SEQUENCES1);
+        CONNECTOR_CONFIG_CONTROL_SEQUENCES.put("sample_config", "test_config");
+    }
+
+    private static final Map<String, String> CONNECTOR_CONFIG_WITHOUT_NAME = new HashMap<>();
+    static {
+        CONNECTOR_CONFIG_WITHOUT_NAME.put("sample_config", "test_config");
+    }
+
+    private static final Map<String, String> CONNECTOR_CONFIG_WITH_EMPTY_NAME = new
HashMap<>();
+
+    static {
+        CONNECTOR_CONFIG_WITH_EMPTY_NAME.put(ConnectorConfig.NAME_CONFIG, "");
+        CONNECTOR_CONFIG_WITH_EMPTY_NAME.put("sample_config", "test_config");
+    }
     private static final List<ConnectorTaskId> CONNECTOR_TASK_NAMES = Arrays.asList(
             new ConnectorTaskId(CONNECTOR_NAME, 0),
             new ConnectorTaskId(CONNECTOR_NAME, 1)
@@ -108,6 +129,12 @@ public class ConnectorsResourceTest {
         connectorsResource = new ConnectorsResource(herder, null);
     }
 
+    private static final Map<String, String> getConnectorConfig(Map<String, String>
mapToClone) {
+        Map<String, String> result = new HashMap<>();
+        result.putAll(mapToClone);
+        return result;
+    }
+
     @Test
     public void testListConnectors() throws Throwable {
         final Capture<Callback<Collection<String>>> cb = Capture.newInstance();
@@ -206,20 +233,59 @@ public class ConnectorsResourceTest {
         PowerMock.verifyAll();
     }
 
-    @Test(expected = BadRequestException.class)
-    public void testCreateConnectorWithASlashInItsName() throws Throwable {
-        String badConnectorName = CONNECTOR_NAME + "/" + "test";
+    @Test
+    public void testCreateConnectorNameTrimWhitespaces() throws Throwable {
+        // Clone CONNECTOR_CONFIG_WITHOUT_NAME Map, as createConnector changes it (puts the
name in it) and this
+        // will affect later tests
+        Map<String, String> inputConfig = getConnectorConfig(CONNECTOR_CONFIG_WITHOUT_NAME);
+        final CreateConnectorRequest bodyIn = new CreateConnectorRequest(CONNECTOR_NAME_PADDING_WHITESPACES,
inputConfig);
+        final CreateConnectorRequest bodyOut = new CreateConnectorRequest(CONNECTOR_NAME,
CONNECTOR_CONFIG);
+
+        final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
+        herder.putConnectorConfig(EasyMock.eq(bodyOut.name()), EasyMock.eq(bodyOut.config()),
EasyMock.eq(false), EasyMock.capture(cb));
+        expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(bodyOut.name(),
bodyOut.config(), CONNECTOR_TASK_NAMES, ConnectorType.SOURCE)));
+
+        PowerMock.replayAll();
+
+        connectorsResource.createConnector(FORWARD, bodyIn);
 
-        CreateConnectorRequest body = new CreateConnectorRequest(badConnectorName, Collections.singletonMap(ConnectorConfig.NAME_CONFIG,
badConnectorName));
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCreateConnectorNameAllWhitespaces() throws Throwable {
+        // Clone CONNECTOR_CONFIG_WITHOUT_NAME Map, as createConnector changes it (puts the
name in it) and this
+        // will affect later tests
+        Map<String, String> inputConfig = getConnectorConfig(CONNECTOR_CONFIG_WITHOUT_NAME);
+        final CreateConnectorRequest bodyIn = new CreateConnectorRequest(CONNECTOR_NAME_ALL_WHITESPACES,
inputConfig);
+        final CreateConnectorRequest bodyOut = new CreateConnectorRequest("", CONNECTOR_CONFIG_WITH_EMPTY_NAME);
 
         final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
-        herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()),
EasyMock.eq(false), EasyMock.capture(cb));
-        expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME,
CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES,
-            ConnectorType.SOURCE)));
+        herder.putConnectorConfig(EasyMock.eq(bodyOut.name()), EasyMock.eq(bodyOut.config()),
EasyMock.eq(false), EasyMock.capture(cb));
+        expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(bodyOut.name(),
bodyOut.config(), CONNECTOR_TASK_NAMES, ConnectorType.SOURCE)));
 
         PowerMock.replayAll();
 
-        connectorsResource.createConnector(FORWARD, body);
+        connectorsResource.createConnector(FORWARD, bodyIn);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCreateConnectorNoName() throws Throwable {
+        // Clone CONNECTOR_CONFIG_WITHOUT_NAME Map, as createConnector changes it (puts the
name in it) and this
+        // will affect later tests
+        Map<String, String> inputConfig = getConnectorConfig(CONNECTOR_CONFIG_WITHOUT_NAME);
+        final CreateConnectorRequest bodyIn = new CreateConnectorRequest(null, inputConfig);
+        final CreateConnectorRequest bodyOut = new CreateConnectorRequest("", CONNECTOR_CONFIG_WITH_EMPTY_NAME);
+
+        final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
+        herder.putConnectorConfig(EasyMock.eq(bodyOut.name()), EasyMock.eq(bodyOut.config()),
EasyMock.eq(false), EasyMock.capture(cb));
+        expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(bodyOut.name(),
bodyOut.config(), CONNECTOR_TASK_NAMES, ConnectorType.SOURCE)));
+
+        PowerMock.replayAll();
+
+        connectorsResource.createConnector(FORWARD, bodyIn);
 
         PowerMock.verifyAll();
     }
@@ -343,6 +409,24 @@ public class ConnectorsResourceTest {
     }
 
     @Test
+    public void testCreateConnectorWithControlSequenceInName() throws Throwable {
+        CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME_CONTROL_SEQUENCES1,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME_CONTROL_SEQUENCES1));
+
+        final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
+        herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME_CONTROL_SEQUENCES1), EasyMock.eq(body.config()),
EasyMock.eq(false), EasyMock.capture(cb));
+        expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME_CONTROL_SEQUENCES1,
CONNECTOR_CONFIG,
+                CONNECTOR_TASK_NAMES, ConnectorType.SOURCE)));
+
+        PowerMock.replayAll();
+
+        String rspLocation = connectorsResource.createConnector(FORWARD, body).getLocation().toString();
+        String decoded = new URI(rspLocation).getPath();
+        Assert.assertEquals("/connectors/" + CONNECTOR_NAME_CONTROL_SEQUENCES1, decoded);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testPutConnectorConfigWithSpecialCharsInName() throws Throwable {
         final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
 
@@ -359,6 +443,23 @@ public class ConnectorsResourceTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testPutConnectorConfigWithControlSequenceInName() throws Throwable {
+        final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
+
+        herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME_CONTROL_SEQUENCES1), EasyMock.eq(CONNECTOR_CONFIG_CONTROL_SEQUENCES),
EasyMock.eq(true), EasyMock.capture(cb));
+        expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME_CONTROL_SEQUENCES1,
CONNECTOR_CONFIG_CONTROL_SEQUENCES, CONNECTOR_TASK_NAMES,
+                ConnectorType.SINK)));
+
+        PowerMock.replayAll();
+
+        String rspLocation = connectorsResource.putConnectorConfig(CONNECTOR_NAME_CONTROL_SEQUENCES1,
FORWARD, CONNECTOR_CONFIG_CONTROL_SEQUENCES).getLocation().toString();
+        String decoded = new URI(rspLocation).getPath();
+        Assert.assertEquals("/connectors/" + CONNECTOR_NAME_CONTROL_SEQUENCES1, decoded);
+
+        PowerMock.verifyAll();
+    }
+
     @Test(expected = BadRequestException.class)
     public void testPutConnectorConfigNameMismatch() throws Throwable {
         Map<String, String> connConfig = new HashMap<>(CONNECTOR_CONFIG);

-- 
To stop receiving notification emails like this one, please contact
ewencp@apache.org.

Mime
View raw message