kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2764: Change use of Properties in Copycat to Maps.
Date Fri, 06 Nov 2015 21:21:58 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a76660ac8 -> c006c5916


KAFKA-2764: Change use of Properties in Copycat to Maps.

Author: Ewen Cheslack-Postava <me@ewencp.org>

Reviewers: Gwen Shapira, Guozhang Wang

Closes #444 from ewencp/kafka-2764-maps-not-properties


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c006c591
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c006c591
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c006c591

Branch: refs/heads/trunk
Commit: c006c5916ef7a8048ed55db64db27c7b64a3af59
Parents: a76660a
Author: Ewen Cheslack-Postava <me@ewencp.org>
Authored: Fri Nov 6 13:27:45 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Nov 6 13:27:45 2015 -0800

----------------------------------------------------------------------
 .../kafka/common/config/AbstractConfig.java     | 19 +++++++-
 .../kafka/copycat/connector/Connector.java      | 12 ++---
 .../apache/kafka/copycat/connector/Task.java    |  4 +-
 .../org/apache/kafka/copycat/sink/SinkTask.java |  3 +-
 .../apache/kafka/copycat/source/SourceTask.java |  4 +-
 .../connector/ConnectorReconfigurationTest.java | 11 +++--
 .../copycat/file/FileStreamSinkConnector.java   | 15 +++---
 .../kafka/copycat/file/FileStreamSinkTask.java  |  5 +-
 .../copycat/file/FileStreamSourceConnector.java | 19 ++++----
 .../copycat/file/FileStreamSourceTask.java      |  6 +--
 .../file/FileStreamSinkConnectorTest.java       | 17 ++++---
 .../file/FileStreamSourceConnectorTest.java     | 27 ++++++-----
 .../copycat/file/FileStreamSourceTaskTest.java  | 12 ++---
 .../kafka/copycat/cli/CopycatDistributed.java   |  8 +--
 .../kafka/copycat/cli/CopycatStandalone.java    | 14 +++---
 .../apache/kafka/copycat/runtime/Worker.java    | 21 +++-----
 .../kafka/copycat/runtime/WorkerConfig.java     |  4 +-
 .../kafka/copycat/runtime/WorkerSinkTask.java   | 21 ++++----
 .../kafka/copycat/runtime/WorkerSourceTask.java |  8 +--
 .../kafka/copycat/runtime/WorkerTask.java       |  4 +-
 .../runtime/distributed/DistributedConfig.java  |  4 +-
 .../runtime/standalone/StandaloneConfig.java    |  4 +-
 .../copycat/runtime/WorkerSinkTaskTest.java     | 17 +++----
 .../copycat/runtime/WorkerSourceTaskTest.java   | 16 +++---
 .../kafka/copycat/runtime/WorkerTest.java       | 51 ++++++++++----------
 .../distributed/DistributedHerderTest.java      |  3 +-
 26 files changed, 167 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 327a9ed..07b64c0 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -105,9 +105,9 @@ public class AbstractConfig {
         return keys;
     }
 
-    public Properties unusedProperties() {
+    public Map<String, Object> unusedConfigs() {
         Set<String> unusedKeys = this.unused();
-        Properties unusedProps = new Properties();
+        Map<String, Object> unusedProps = new HashMap<>();
         for (String key : unusedKeys)
             unusedProps.put(key, this.originals.get(key));
         return unusedProps;
@@ -120,6 +120,21 @@ public class AbstractConfig {
     }
 
     /**
+     * Get all the original settings, ensuring that all values are of type String.
+     * @return the original settings
+     * @throw ClassCastException if any of the values are not strings
+     */
+    public Map<String, String> originalsStrings() {
+        Map<String, String> copy = new RecordingMap<>();
+        for (Map.Entry<String, ?> entry : originals.entrySet()) {
+            if (!(entry.getValue() instanceof String))
+                throw new ClassCastException("Non-string value found in original settings");
+            copy.put(entry.getKey(), (String) entry.getValue());
+        }
+        return copy;
+    }
+
+    /**
      * Gets all original settings with the given prefix, stripping the prefix before adding it to the output.
      *
      * @param prefix the prefix to use as a filter

http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java
index ae141c4..6972d3d 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java
@@ -20,7 +20,7 @@ package org.apache.kafka.copycat.connector;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
 import java.util.List;
-import java.util.Properties;
+import java.util.Map;
 
 /**
  * <p>
@@ -69,7 +69,7 @@ public abstract class Connector {
      * @param taskConfigs existing task configurations, which may be used when generating new task configs to avoid
      *                    churn in partition to task assignments
      */
-    public void initialize(ConnectorContext ctx, List<Properties> taskConfigs) {
+    public void initialize(ConnectorContext ctx, List<Map<String, String>> taskConfigs) {
         context = ctx;
         // Ignore taskConfigs. May result in more churn of tasks during recovery if updated configs
         // are very different, but reduces the difficulty of implementing a Connector
@@ -81,17 +81,17 @@ public abstract class Connector {
      *
      * @param props configuration settings
      */
-    public abstract void start(Properties props);
+    public abstract void start(Map<String, String> props);
 
     /**
      * Reconfigure this Connector. Most implementations will not override this, using the default
-     * implementation that calls {@link #stop()} followed by {@link #start(Properties)}.
+     * implementation that calls {@link #stop()} followed by {@link #start(Map)}.
      * Implementations only need to override this if they want to handle this process more
      * efficiently, e.g. without shutting down network connections to the external system.
      *
      * @param props new configuration settings
      */
-    public void reconfigure(Properties props) {
+    public void reconfigure(Map<String, String> props) {
         stop();
         start(props);
     }
@@ -108,7 +108,7 @@ public abstract class Connector {
      * @param maxTasks maximum number of configurations to generate
      * @return configurations for Tasks
      */
-    public abstract List<Properties> taskConfigs(int maxTasks);
+    public abstract List<Map<String, String>> taskConfigs(int maxTasks);
 
     /**
      * Stop this connector.

http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java
index cdaba08..2a8c98c 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java
@@ -19,7 +19,7 @@ package org.apache.kafka.copycat.connector;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
 
-import java.util.Properties;
+import java.util.Map;
 
 /**
  * <p>
@@ -40,7 +40,7 @@ public interface Task {
      * Start the Task
      * @param props initial configuration
      */
-    void start(Properties props);
+    void start(Map<String, String> props);
 
     /**
      * Stop this task.

http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
index 7c03cda..b2d5ff6 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
@@ -23,7 +23,6 @@ import org.apache.kafka.copycat.connector.Task;
 
 import java.util.Collection;
 import java.util.Map;
-import java.util.Properties;
 
 /**
  * SinkTask is a Task takes records loaded from Kafka and sends them to another system. In
@@ -52,7 +51,7 @@ public abstract class SinkTask implements Task {
      * @param props initial configuration
      */
     @Override
-    public abstract void start(Properties props);
+    public abstract void start(Map<String, String> props);
 
     /**
      * Put the records in the sink. Usually this should send the records to the sink asynchronously

http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java
index 30cbf16..841943f 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java
@@ -21,7 +21,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.copycat.connector.Task;
 
 import java.util.List;
-import java.util.Properties;
+import java.util.Map;
 
 /**
  * SourceTask is a Task that pulls records from another system for storage in Kafka.
@@ -43,7 +43,7 @@ public abstract class SourceTask implements Task {
      * @param props initial configuration
      */
     @Override
-    public abstract void start(Properties props);
+    public abstract void start(Map<String, String> props);
 
     /**
      * Poll this SourceTask for new records. This method should block if no data is currently

http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java
index cbaf866..79ddfd7 100644
--- a/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java
+++ b/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java
@@ -20,8 +20,9 @@ package org.apache.kafka.copycat.connector;
 import org.apache.kafka.copycat.errors.CopycatException;
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.List;
-import java.util.Properties;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 
@@ -30,7 +31,7 @@ public class ConnectorReconfigurationTest {
     @Test
     public void testDefaultReconfigure() throws Exception {
         TestConnector conn = new TestConnector(false);
-        conn.reconfigure(new Properties());
+        conn.reconfigure(Collections.<String, String>emptyMap());
         assertEquals(conn.stopOrder, 0);
         assertEquals(conn.configureOrder, 1);
     }
@@ -38,7 +39,7 @@ public class ConnectorReconfigurationTest {
     @Test(expected = CopycatException.class)
     public void testReconfigureStopException() throws Exception {
         TestConnector conn = new TestConnector(true);
-        conn.reconfigure(new Properties());
+        conn.reconfigure(Collections.<String, String>emptyMap());
     }
 
     private static class TestConnector extends Connector {
@@ -52,7 +53,7 @@ public class ConnectorReconfigurationTest {
         }
 
         @Override
-        public void start(Properties props) {
+        public void start(Map<String, String> props) {
             configureOrder = order++;
         }
 
@@ -62,7 +63,7 @@ public class ConnectorReconfigurationTest {
         }
 
         @Override
-        public List<Properties> taskConfigs(int count) {
+        public List<Map<String, String>> taskConfigs(int count) {
             return null;
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java
index 6e2b04d..763f638 100644
--- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java
+++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java
@@ -21,8 +21,9 @@ import org.apache.kafka.copycat.connector.Task;
 import org.apache.kafka.copycat.sink.SinkConnector;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
-import java.util.Properties;
+import java.util.Map;
 
 /**
  * Very simple connector that works with the console. This connector supports both source and
@@ -34,8 +35,8 @@ public class FileStreamSinkConnector extends SinkConnector {
     private String filename;
 
     @Override
-    public void start(Properties props) {
-        filename = props.getProperty(FILE_CONFIG);
+    public void start(Map<String, String> props) {
+        filename = props.get(FILE_CONFIG);
     }
 
     @Override
@@ -44,12 +45,12 @@ public class FileStreamSinkConnector extends SinkConnector {
     }
 
     @Override
-    public List<Properties> taskConfigs(int maxTasks) {
-        ArrayList<Properties> configs = new ArrayList<>();
+    public List<Map<String, String>> taskConfigs(int maxTasks) {
+        ArrayList<Map<String, String>> configs = new ArrayList<>();
         for (int i = 0; i < maxTasks; i++) {
-            Properties config = new Properties();
+            Map<String, String> config = new HashMap<>();
             if (filename != null)
-                config.setProperty(FILE_CONFIG, filename);
+                config.put(FILE_CONFIG, filename);
             configs.add(config);
         }
         return configs;

http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
index 6dfe4a7..5286d2b 100644
--- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
+++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
@@ -30,7 +30,6 @@ import java.io.FileOutputStream;
 import java.io.PrintStream;
 import java.util.Collection;
 import java.util.Map;
-import java.util.Properties;
 
 /**
  * FileStreamSinkTask writes records to stdout or a file.
@@ -51,8 +50,8 @@ public class FileStreamSinkTask extends SinkTask {
     }
 
     @Override
-    public void start(Properties props) {
-        filename = props.getProperty(FileStreamSinkConnector.FILE_CONFIG);
+    public void start(Map<String, String> props) {
+        filename = props.get(FileStreamSinkConnector.FILE_CONFIG);
         if (filename == null) {
             outputStream = System.out;
         } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java
index 716322f..9784bb1 100644
--- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java
+++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java
@@ -22,8 +22,9 @@ import org.apache.kafka.copycat.errors.CopycatException;
 import org.apache.kafka.copycat.source.SourceConnector;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
-import java.util.Properties;
+import java.util.Map;
 
 /**
  * Very simple connector that works with the console. This connector supports both source and
@@ -37,9 +38,9 @@ public class FileStreamSourceConnector extends SourceConnector {
     private String topic;
 
     @Override
-    public void start(Properties props) {
-        filename = props.getProperty(FILE_CONFIG);
-        topic = props.getProperty(TOPIC_CONFIG);
+    public void start(Map<String, String> props) {
+        filename = props.get(FILE_CONFIG);
+        topic = props.get(TOPIC_CONFIG);
         if (topic == null || topic.isEmpty())
             throw new CopycatException("FileStreamSourceConnector configuration must include 'topic' setting");
         if (topic.contains(","))
@@ -52,13 +53,13 @@ public class FileStreamSourceConnector extends SourceConnector {
     }
 
     @Override
-    public List<Properties> taskConfigs(int maxTasks) {
-        ArrayList<Properties> configs = new ArrayList<>();
+    public List<Map<String, String>> taskConfigs(int maxTasks) {
+        ArrayList<Map<String, String>> configs = new ArrayList<>();
         // Only one input stream makes sense.
-        Properties config = new Properties();
+        Map<String, String> config = new HashMap<>();
         if (filename != null)
-            config.setProperty(FILE_CONFIG, filename);
-        config.setProperty(TOPIC_CONFIG, topic);
+            config.put(FILE_CONFIG, filename);
+        config.put(TOPIC_CONFIG, topic);
         configs.add(config);
         return configs;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
index f2249d0..70eef5c 100644
--- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
+++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
@@ -46,15 +46,15 @@ public class FileStreamSourceTask extends SourceTask {
     private Long streamOffset;
 
     @Override
-    public void start(Properties props) {
-        filename = props.getProperty(FileStreamSourceConnector.FILE_CONFIG);
+    public void start(Map<String, String> props) {
+        filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
         if (filename == null || filename.isEmpty()) {
             stream = System.in;
             // Tracking offset for stdin doesn't make sense
             streamOffset = null;
             reader = new BufferedReader(new InputStreamReader(stream));
         }
-        topic = props.getProperty(FileStreamSourceConnector.TOPIC_CONFIG);
+        topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
         if (topic == null)
             throw new CopycatException("FileStreamSourceTask config missing topic setting");
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java
index ab5fd3b..b30856f 100644
--- a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java
+++ b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java
@@ -25,8 +25,9 @@ import org.junit.Test;
 import org.powermock.api.easymock.PowerMock;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
-import java.util.Properties;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 
@@ -42,7 +43,7 @@ public class FileStreamSinkConnectorTest {
 
     private FileStreamSinkConnector connector;
     private ConnectorContext ctx;
-    private Properties sinkProperties;
+    private Map<String, String> sinkProperties;
 
     @Before
     public void setup() {
@@ -50,9 +51,9 @@ public class FileStreamSinkConnectorTest {
         ctx = PowerMock.createMock(ConnectorContext.class);
         connector.initialize(ctx);
 
-        sinkProperties = new Properties();
-        sinkProperties.setProperty(SinkConnector.TOPICS_CONFIG, MULTIPLE_TOPICS);
-        sinkProperties.setProperty(FileStreamSinkConnector.FILE_CONFIG, FILENAME);
+        sinkProperties = new HashMap<>();
+        sinkProperties.put(SinkConnector.TOPICS_CONFIG, MULTIPLE_TOPICS);
+        sinkProperties.put(FileStreamSinkConnector.FILE_CONFIG, FILENAME);
     }
 
     @Test
@@ -60,14 +61,14 @@ public class FileStreamSinkConnectorTest {
         PowerMock.replayAll();
 
         connector.start(sinkProperties);
-        List<Properties> taskConfigs = connector.taskConfigs(1);
+        List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
         assertEquals(1, taskConfigs.size());
-        assertEquals(FILENAME, taskConfigs.get(0).getProperty(FileStreamSinkConnector.FILE_CONFIG));
+        assertEquals(FILENAME, taskConfigs.get(0).get(FileStreamSinkConnector.FILE_CONFIG));
 
         taskConfigs = connector.taskConfigs(2);
         assertEquals(2, taskConfigs.size());
         for (int i = 0; i < 2; i++) {
-            assertEquals(FILENAME, taskConfigs.get(0).getProperty(FileStreamSinkConnector.FILE_CONFIG));
+            assertEquals(FILENAME, taskConfigs.get(0).get(FileStreamSinkConnector.FILE_CONFIG));
         }
 
         PowerMock.verifyAll();

http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java
index 41e15a0..28bfa62 100644
--- a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java
+++ b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java
@@ -23,8 +23,9 @@ import org.junit.Before;
 import org.junit.Test;
 import org.powermock.api.easymock.PowerMock;
 
+import java.util.HashMap;
 import java.util.List;
-import java.util.Properties;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -37,7 +38,7 @@ public class FileStreamSourceConnectorTest {
 
     private FileStreamSourceConnector connector;
     private ConnectorContext ctx;
-    private Properties sourceProperties;
+    private Map<String, String> sourceProperties;
 
     @Before
     public void setup() {
@@ -45,9 +46,9 @@ public class FileStreamSourceConnectorTest {
         ctx = PowerMock.createMock(ConnectorContext.class);
         connector.initialize(ctx);
 
-        sourceProperties = new Properties();
-        sourceProperties.setProperty(FileStreamSourceConnector.TOPIC_CONFIG, SINGLE_TOPIC);
-        sourceProperties.setProperty(FileStreamSourceConnector.FILE_CONFIG, FILENAME);
+        sourceProperties = new HashMap<>();
+        sourceProperties.put(FileStreamSourceConnector.TOPIC_CONFIG, SINGLE_TOPIC);
+        sourceProperties.put(FileStreamSourceConnector.FILE_CONFIG, FILENAME);
     }
 
     @Test
@@ -55,20 +56,20 @@ public class FileStreamSourceConnectorTest {
         PowerMock.replayAll();
 
         connector.start(sourceProperties);
-        List<Properties> taskConfigs = connector.taskConfigs(1);
+        List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
         assertEquals(1, taskConfigs.size());
         assertEquals(FILENAME,
-                taskConfigs.get(0).getProperty(FileStreamSourceConnector.FILE_CONFIG));
+                taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG));
         assertEquals(SINGLE_TOPIC,
-                taskConfigs.get(0).getProperty(FileStreamSourceConnector.TOPIC_CONFIG));
+                taskConfigs.get(0).get(FileStreamSourceConnector.TOPIC_CONFIG));
 
         // Should be able to return fewer than requested #
         taskConfigs = connector.taskConfigs(2);
         assertEquals(1, taskConfigs.size());
         assertEquals(FILENAME,
-                taskConfigs.get(0).getProperty(FileStreamSourceConnector.FILE_CONFIG));
+                taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG));
         assertEquals(SINGLE_TOPIC,
-                taskConfigs.get(0).getProperty(FileStreamSourceConnector.TOPIC_CONFIG));
+                taskConfigs.get(0).get(FileStreamSourceConnector.TOPIC_CONFIG));
 
         PowerMock.verifyAll();
     }
@@ -79,16 +80,16 @@ public class FileStreamSourceConnectorTest {
 
         sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
         connector.start(sourceProperties);
-        List<Properties> taskConfigs = connector.taskConfigs(1);
+        List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
         assertEquals(1, taskConfigs.size());
-        assertNull(taskConfigs.get(0).getProperty(FileStreamSourceConnector.FILE_CONFIG));
+        assertNull(taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG));
 
         PowerMock.verifyAll();
     }
 
     @Test(expected = CopycatException.class)
     public void testMultipleSourcesInvalid() {
-        sourceProperties.setProperty(FileStreamSourceConnector.TOPIC_CONFIG, MULTIPLE_TOPICS);
+        sourceProperties.put(FileStreamSourceConnector.TOPIC_CONFIG, MULTIPLE_TOPICS);
         connector.start(sourceProperties);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
index 4365def..ddf8e43 100644
--- a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
+++ b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
@@ -31,9 +31,9 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
 
@@ -42,7 +42,7 @@ public class FileStreamSourceTaskTest {
     private static final String TOPIC = "test";
 
     private File tempFile;
-    private Properties config;
+    private Map<String, String> config;
     private OffsetStorageReader offsetStorageReader;
     private SourceTaskContext context;
     private FileStreamSourceTask task;
@@ -52,9 +52,9 @@ public class FileStreamSourceTaskTest {
     @Before
     public void setup() throws IOException {
         tempFile = File.createTempFile("file-stream-source-task-test", null);
-        config = new Properties();
-        config.setProperty(FileStreamSourceConnector.FILE_CONFIG, tempFile.getAbsolutePath());
-        config.setProperty(FileStreamSourceConnector.TOPIC_CONFIG, TOPIC);
+        config = new HashMap<>();
+        config.put(FileStreamSourceConnector.FILE_CONFIG, tempFile.getAbsolutePath());
+        config.put(FileStreamSourceConnector.TOPIC_CONFIG, TOPIC);
         task = new FileStreamSourceTask();
         offsetStorageReader = PowerMock.createMock(OffsetStorageReader.class);
         context = PowerMock.createMock(SourceTaskContext.class);
@@ -135,7 +135,7 @@ public class FileStreamSourceTaskTest {
     }
 
     public void testInvalidFile() throws InterruptedException {
-        config.setProperty(FileStreamSourceConnector.FILE_CONFIG, "bogusfilename");
+        config.put(FileStreamSourceConnector.FILE_CONFIG, "bogusfilename");
         task.start(config);
         // Currently the task retries indefinitely if the file isn't found, but shouldn't return any data.
         for (int i = 0; i < 100; i++)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
index ca3f76c..8dfefaa 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
@@ -28,7 +28,8 @@ import org.apache.kafka.copycat.storage.KafkaOffsetBackingStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Properties;
+import java.util.Collections;
+import java.util.Map;
 
 /**
  * <p>
@@ -44,15 +45,14 @@ public class CopycatDistributed {
     private static final Logger log = LoggerFactory.getLogger(CopycatDistributed.class);
 
     public static void main(String[] args) throws Exception {
-        Properties workerProps;
-
         if (args.length < 1) {
             log.info("Usage: CopycatDistributed worker.properties");
             System.exit(1);
         }
 
         String workerPropsFile = args[0];
-        workerProps = !workerPropsFile.isEmpty() ? Utils.loadProps(workerPropsFile) : new Properties();
+        Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
+                Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();
 
         DistributedConfig config = new DistributedConfig(workerProps);
         Worker worker = new Worker(config, new KafkaOffsetBackingStore());

http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java
index cd4fc96..3869552 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java
@@ -34,7 +34,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
-import java.util.Properties;
+import java.util.Collections;
+import java.util.Map;
 
 /**
  * <p>
@@ -52,8 +53,6 @@ public class CopycatStandalone {
     private static final Logger log = LoggerFactory.getLogger(CopycatStandalone.class);
 
     public static void main(String[] args) throws Exception {
-        Properties workerProps;
-        Properties connectorProps;
 
         if (args.length < 2) {
             log.info("Usage: CopycatStandalone worker.properties connector1.properties [connector2.properties ...]");
@@ -61,7 +60,8 @@ public class CopycatStandalone {
         }
 
         String workerPropsFile = args[0];
-        workerProps = !workerPropsFile.isEmpty() ? Utils.loadProps(workerPropsFile) : new Properties();
+        Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
+                Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();
 
         StandaloneConfig config = new StandaloneConfig(workerProps);
         Worker worker = new Worker(config, new FileOffsetBackingStore());
@@ -72,7 +72,7 @@ public class CopycatStandalone {
 
         try {
             for (final String connectorPropsFile : Arrays.copyOfRange(args, 1, args.length)) {
-                connectorProps = Utils.loadProps(connectorPropsFile);
+                Map<String, String> connectorProps = Utils.propsToStringMap(Utils.loadProps(connectorPropsFile));
                 FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(new Callback<Herder.Created<ConnectorInfo>>() {
                     @Override
                     public void onCompletion(Throwable error, Herder.Created<ConnectorInfo> info) {
@@ -83,8 +83,8 @@ public class CopycatStandalone {
                     }
                 });
                 herder.putConnectorConfig(
-                        connectorProps.getProperty(ConnectorConfig.NAME_CONFIG),
-                        Utils.propsToStringMap(connectorProps), false, cb);
+                        connectorProps.get(ConnectorConfig.NAME_CONFIG),
+                        connectorProps, false, cb);
                 cb.get();
             }
         } catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
index de9f533..08eab86 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
@@ -38,7 +38,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.Set;
 
 /**
@@ -89,15 +88,12 @@ public class Worker {
     public void start() {
         log.info("Worker starting");
 
-        Properties unusedConfigs = config.unusedProperties();
-
         Map<String, Object> producerProps = new HashMap<>();
         producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-        for (String propName : unusedConfigs.stringPropertyNames()) {
-            producerProps.put(propName, unusedConfigs.getProperty(propName));
-        }
+        producerProps.putAll(config.unusedConfigs());
+
         producer = new KafkaProducer<>(producerProps);
 
         offsetBackingStore.start();
@@ -177,10 +173,7 @@ public class Worker {
         final Connector connector = instantiateConnector(connClass);
         connector.initialize(ctx);
         try {
-            Map<String, Object> originals = connConfig.originals();
-            Properties props = new Properties();
-            props.putAll(originals);
-            connector.start(props);
+            connector.start(connConfig.originalsStrings());
         } catch (CopycatException e) {
             throw new CopycatException("Connector threw an exception while starting", e);
         }
@@ -209,8 +202,8 @@ public class Worker {
 
         List<Map<String, String>> result = new ArrayList<>();
         String taskClassName = connector.taskClass().getName();
-        for (Properties taskProps : connector.taskConfigs(maxTasks)) {
-            Map<String, String> taskConfig = Utils.propsToStringMap(taskProps);
+        for (Map<String, String> taskProps : connector.taskConfigs(maxTasks)) {
+            Map<String, String> taskConfig = new HashMap<>(taskProps); // Ensure we don't modify the connector's copy of the config
             taskConfig.put(TaskConfig.TASK_CLASS_CONFIG, taskClassName);
             if (sinkTopics != null)
                 taskConfig.put(SinkTask.TOPICS_CONFIG, Utils.join(sinkTopics, ","));
@@ -280,9 +273,7 @@ public class Worker {
 
         // Start the task before adding modifying any state, any exceptions are caught higher up the
         // call chain and there's no cleanup to do here
-        Properties props = new Properties();
-        props.putAll(taskConfig.originals());
-        workerTask.start(props);
+        workerTask.start(taskConfig.originalsStrings());
         if (task instanceof SourceTask) {
             WorkerSourceTask workerSourceTask = (WorkerSourceTask) workerTask;
             sourceTaskOffsetCommitter.schedule(id, workerSourceTask);

http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerConfig.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerConfig.java
index 0c6a6f6..b962d54 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerConfig.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerConfig.java
@@ -23,7 +23,7 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
 
-import java.util.Properties;
+import java.util.Map;
 
 /**
  * Common base class providing configuration for Copycat workers, whether standalone or distributed.
@@ -132,7 +132,7 @@ public class WorkerConfig extends AbstractConfig {
                 .define(REST_ADVERTISED_PORT_CONFIG, Type.INT,  null, Importance.LOW, REST_ADVERTISED_PORT_DOC);
     }
 
-    public WorkerConfig(ConfigDef definition, Properties props) {
+    public WorkerConfig(ConfigDef definition, Map<String, String> props) {
         super(definition, props);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
index dc51730..e9193b8 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
@@ -44,7 +44,6 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -60,7 +59,7 @@ class WorkerSinkTask implements WorkerTask {
     private final Converter keyConverter;
     private final Converter valueConverter;
     private WorkerSinkTaskThread workThread;
-    private Properties taskProps;
+    private Map<String, String> taskProps;
     private KafkaConsumer<byte[], byte[]> consumer;
     private WorkerSinkTaskContext context;
     private boolean started;
@@ -78,7 +77,7 @@ class WorkerSinkTask implements WorkerTask {
     }
 
     @Override
-    public void start(Properties props) {
+    public void start(Map<String, String> props) {
         taskProps = props;
         consumer = createConsumer();
         context = new WorkerSinkTaskContext(consumer);
@@ -126,7 +125,7 @@ class WorkerSinkTask implements WorkerTask {
      * @returns true if successful, false if joining the consumer group was interrupted
      */
     public boolean joinConsumerGroupAndStart() {
-        String topicsStr = taskProps.getProperty(SinkTask.TOPICS_CONFIG);
+        String topicsStr = taskProps.get(SinkTask.TOPICS_CONFIG);
         if (topicsStr == null || topicsStr.isEmpty())
             throw new CopycatException("Sink tasks require a list of topics.");
         String[] topics = topicsStr.split(",");
@@ -222,14 +221,14 @@ class WorkerSinkTask implements WorkerTask {
     private KafkaConsumer<byte[], byte[]> createConsumer() {
         // Include any unknown worker configs so consumer configs can be set globally on the worker
         // and through to the task
-        Properties props = workerConfig.unusedProperties();
-        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "copycat-" + id.connector());
-        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+        Map<String, Object> props = workerConfig.unusedConfigs();
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, "copycat-" + id.connector());
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                 Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
-        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
 
         KafkaConsumer<byte[], byte[]> newConsumer;
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
index 1f96c78..cdb41b0 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
@@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.IdentityHashMap;
 import java.util.List;
-import java.util.Properties;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -85,7 +85,7 @@ class WorkerSourceTask implements WorkerTask {
     }
 
     @Override
-    public void start(Properties props) {
+    public void start(Map<String, String> props) {
         workThread = new WorkerSourceTaskThread("WorkerSourceTask-" + id, props);
         workThread.start();
     }
@@ -273,11 +273,11 @@ class WorkerSourceTask implements WorkerTask {
 
 
     private class WorkerSourceTaskThread extends ShutdownableThread {
-        private Properties workerProps;
+        private Map<String, String> workerProps;
         private boolean finishedStart;
         private boolean startedShutdownBeforeStartCompleted;
 
-        public WorkerSourceTaskThread(String name, Properties workerProps) {
+        public WorkerSourceTaskThread(String name, Map<String, String> workerProps) {
             super(name);
             this.workerProps = workerProps;
             this.finishedStart = false;

http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerTask.java
index af225bb..0759efe 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerTask.java
@@ -17,7 +17,7 @@
 
 package org.apache.kafka.copycat.runtime;
 
-import java.util.Properties;
+import java.util.Map;
 
 /**
  * Handles processing for an individual task. This interface only provides the basic methods
@@ -29,7 +29,7 @@ interface WorkerTask {
      * Start the Task
      * @param props initial configuration
      */
-    void start(Properties props);
+    void start(Map<String, String> props);
 
     /**
      * Stop this task from processing messages. This method does not block, it only triggers

http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedConfig.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedConfig.java
index 90d63cf..a2848b1 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedConfig.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedConfig.java
@@ -23,7 +23,7 @@ import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.copycat.runtime.WorkerConfig;
 
-import java.util.Properties;
+import java.util.Map;
 
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 
@@ -180,7 +180,7 @@ public class DistributedConfig extends WorkerConfig {
                         WORKER_UNSYNC_BACKOFF_MS_DOC);
     }
 
-    public DistributedConfig(Properties props) {
+    public DistributedConfig(Map<String, String> props) {
         super(CONFIG, props);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConfig.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConfig.java
index 246d36d..6e547d3 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConfig.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConfig.java
@@ -20,7 +20,7 @@ package org.apache.kafka.copycat.runtime.standalone;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.copycat.runtime.WorkerConfig;
 
-import java.util.Properties;
+import java.util.Map;
 
 public class StandaloneConfig extends WorkerConfig {
     private static final ConfigDef CONFIG;
@@ -29,7 +29,7 @@ public class StandaloneConfig extends WorkerConfig {
         CONFIG = baseConfigDef();
     }
 
-    public StandaloneConfig(Properties props) {
+    public StandaloneConfig(Map<String, String> props) {
         super(CONFIG, props);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
index 177f7a6..7905736 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
@@ -56,7 +56,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
-import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
@@ -86,7 +85,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
     private static final TopicPartition TOPIC_PARTITION3 = new TopicPartition(TOPIC, PARTITION3);
     private static final TopicPartition UNASSIGNED_TOPIC_PARTITION = new TopicPartition(TOPIC, 200);
 
-    private static final Properties TASK_PROPS = new Properties();
+    private static final Map<String, String> TASK_PROPS = new HashMap<>();
     static {
         TASK_PROPS.put(SinkConnector.TOPICS_CONFIG, TOPIC);
     }
@@ -111,13 +110,13 @@ public class WorkerSinkTaskTest extends ThreadedTest {
     public void setup() {
         super.setup();
         time = new MockTime();
-        Properties workerProps = new Properties();
-        workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.setProperty("internal.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.setProperty("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.setProperty("internal.key.converter.schemas.enable", "false");
-        workerProps.setProperty("internal.value.converter.schemas.enable", "false");
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.put("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.put("internal.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.put("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.put("internal.key.converter.schemas.enable", "false");
+        workerProps.put("internal.value.converter.schemas.enable", "false");
         workerConfig = new StandaloneConfig(workerProps);
         workerTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"},

http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
index 452c5cb..0fa14bd 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
@@ -82,7 +82,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
     private Capture<org.apache.kafka.clients.producer.Callback> producerCallbacks;
 
-    private static final Properties EMPTY_TASK_PROPS = new Properties();
+    private static final Map<String, String> EMPTY_TASK_PROPS = Collections.emptyMap();
     private static final List<SourceRecord> RECORDS = Arrays.asList(
             new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD)
     );
@@ -90,13 +90,13 @@ public class WorkerSourceTaskTest extends ThreadedTest {
     @Override
     public void setup() {
         super.setup();
-        Properties workerProps = new Properties();
-        workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.setProperty("internal.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.setProperty("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.setProperty("internal.key.converter.schemas.enable", "false");
-        workerProps.setProperty("internal.value.converter.schemas.enable", "false");
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.put("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.put("internal.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.put("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.put("internal.key.converter.schemas.enable", "false");
+        workerProps.put("internal.value.converter.schemas.enable", "false");
         config = new StandaloneConfig(workerProps);
         producerCallbacks = EasyMock.newCapture();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
index 05015a4..f99c711 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.copycat.runtime;
 
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.copycat.connector.Connector;
 import org.apache.kafka.copycat.connector.ConnectorContext;
 import org.apache.kafka.copycat.connector.Task;
@@ -46,10 +45,10 @@ import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -70,13 +69,13 @@ public class WorkerTest extends ThreadedTest {
     public void setup() {
         super.setup();
 
-        Properties workerProps = new Properties();
-        workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.setProperty("internal.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.setProperty("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.setProperty("internal.key.converter.schemas.enable", "false");
-        workerProps.setProperty("internal.value.converter.schemas.enable", "false");
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.put("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.put("internal.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.put("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.put("internal.key.converter.schemas.enable", "false");
+        workerProps.put("internal.value.converter.schemas.enable", "false");
         config = new StandaloneConfig(workerProps);
     }
 
@@ -94,7 +93,7 @@ public class WorkerTest extends ThreadedTest {
         PowerMock.mockStatic(Worker.class);
         PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{TestConnector.class}).andReturn(connector);
 
-        Properties props = new Properties();
+        Map<String, String> props = new HashMap<>();
         props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
         props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
         props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
@@ -117,7 +116,7 @@ public class WorkerTest extends ThreadedTest {
         worker = new Worker(new MockTime(), config, offsetBackingStore);
         worker.start();
 
-        ConnectorConfig config = new ConnectorConfig(Utils.propsToStringMap(props));
+        ConnectorConfig config = new ConnectorConfig(props);
         assertEquals(Collections.emptySet(), worker.connectorNames());
         worker.addConnector(config, ctx);
         assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
@@ -164,7 +163,7 @@ public class WorkerTest extends ThreadedTest {
         PowerMock.mockStatic(Worker.class);
         PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{TestConnector.class}).andReturn(connector);
 
-        Properties props = new Properties();
+        Map<String, String> props = new HashMap<>();
         props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
         props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
         props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
@@ -177,8 +176,8 @@ public class WorkerTest extends ThreadedTest {
 
         // Reconfigure
         EasyMock.<Class<? extends Task>>expect(connector.taskClass()).andReturn(TestSourceTask.class);
-        Properties taskProps = new Properties();
-        taskProps.setProperty("foo", "bar");
+        Map<String, String> taskProps = new HashMap<>();
+        taskProps.put("foo", "bar");
         EasyMock.expect(connector.taskConfigs(2)).andReturn(Arrays.asList(taskProps, taskProps));
 
         // Remove
@@ -193,7 +192,7 @@ public class WorkerTest extends ThreadedTest {
         worker = new Worker(new MockTime(), config, offsetBackingStore);
         worker.start();
 
-        ConnectorConfig config = new ConnectorConfig(Utils.propsToStringMap(props));
+        ConnectorConfig config = new ConnectorConfig(props);
         assertEquals(Collections.emptySet(), worker.connectorNames());
         worker.addConnector(config, ctx);
         assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
@@ -204,10 +203,10 @@ public class WorkerTest extends ThreadedTest {
             // expected
         }
         List<Map<String, String>> taskConfigs = worker.connectorTaskConfigs(CONNECTOR_ID, 2, Arrays.asList("foo", "bar"));
-        Properties expectedTaskProps = new Properties();
-        expectedTaskProps.setProperty("foo", "bar");
-        expectedTaskProps.setProperty(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
-        expectedTaskProps.setProperty(SinkTask.TOPICS_CONFIG, "foo,bar");
+        Map<String, String> expectedTaskProps = new HashMap<>();
+        expectedTaskProps.put("foo", "bar");
+        expectedTaskProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
+        expectedTaskProps.put(SinkTask.TOPICS_CONFIG, "foo,bar");
         assertEquals(2, taskConfigs.size());
         assertEquals(expectedTaskProps, taskConfigs.get(0));
         assertEquals(expectedTaskProps, taskConfigs.get(1));
@@ -246,7 +245,7 @@ public class WorkerTest extends ThreadedTest {
                 EasyMock.anyObject(WorkerConfig.class),
                 EasyMock.anyObject(Time.class))
                 .andReturn(workerTask);
-        Properties origProps = new Properties();
+        Map<String, String> origProps = new HashMap<>();
         origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
         workerTask.start(origProps);
         EasyMock.expectLastCall();
@@ -266,7 +265,7 @@ public class WorkerTest extends ThreadedTest {
         worker = new Worker(new MockTime(), config, offsetBackingStore);
         worker.start();
         assertEquals(Collections.emptySet(), worker.taskIds());
-        worker.addTask(taskId, new TaskConfig(Utils.propsToStringMap(origProps)));
+        worker.addTask(taskId, new TaskConfig(origProps));
         assertEquals(new HashSet<>(Arrays.asList(taskId)), worker.taskIds());
         worker.stopTask(taskId);
         assertEquals(Collections.emptySet(), worker.taskIds());
@@ -315,7 +314,7 @@ public class WorkerTest extends ThreadedTest {
                 EasyMock.anyObject(WorkerConfig.class),
                 EasyMock.anyObject(Time.class))
                 .andReturn(workerTask);
-        Properties origProps = new Properties();
+        Map<String, String> origProps = new HashMap<>();
         origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
         workerTask.start(origProps);
         EasyMock.expectLastCall();
@@ -335,7 +334,7 @@ public class WorkerTest extends ThreadedTest {
 
         worker = new Worker(new MockTime(), config, offsetBackingStore);
         worker.start();
-        worker.addTask(TASK_ID, new TaskConfig(Utils.propsToStringMap(origProps)));
+        worker.addTask(TASK_ID, new TaskConfig(origProps));
         worker.stop();
 
         PowerMock.verifyAll();
@@ -344,7 +343,7 @@ public class WorkerTest extends ThreadedTest {
 
     private static class TestConnector extends Connector {
         @Override
-        public void start(Properties props) {
+        public void start(Map<String, String> props) {
 
         }
 
@@ -354,7 +353,7 @@ public class WorkerTest extends ThreadedTest {
         }
 
         @Override
-        public List<Properties> taskConfigs(int maxTasks) {
+        public List<Map<String, String>> taskConfigs(int maxTasks) {
             return null;
         }
 
@@ -369,7 +368,7 @@ public class WorkerTest extends ThreadedTest {
         }
 
         @Override
-        public void start(Properties props) {
+        public void start(Map<String, String> props) {
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/c006c591/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
index 7873447..512cb5c 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
@@ -55,7 +55,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
@@ -65,7 +64,7 @@ import static org.junit.Assert.assertTrue;
 @PrepareForTest(DistributedHerder.class)
 @PowerMockIgnore("javax.management.*")
 public class DistributedHerderTest {
-    private static final Properties HERDER_CONFIG = new Properties();
+    private static final Map<String, String> HERDER_CONFIG = new HashMap<>();
     static {
         HERDER_CONFIG.put(KafkaConfigStorage.CONFIG_TOPIC_CONFIG, "config-topic");
         HERDER_CONFIG.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");


Mime
View raw message