kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject [01/26] kafka git commit: KAFKA-2774: Rename Copycat to Kafka Connect
Date Mon, 09 Nov 2015 06:11:40 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.9.0 48013222f -> 417e283d6


http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/tests/kafkatest/tests/connect_distributed_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect_distributed_test.py b/tests/kafkatest/tests/connect_distributed_test.py
new file mode 100644
index 0000000..55901c2
--- /dev/null
+++ b/tests/kafkatest/tests/connect_distributed_test.py
@@ -0,0 +1,97 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.connect import ConnectDistributedService
+from ducktape.utils.util import wait_until
+import hashlib, subprocess, json, itertools
+
+class ConnectDistributedFileTest(KafkaTest):
+    """
+    Simple test of Kafka Connect in distributed mode, producing data from files on one cluster and consuming it on
+    another, validating the total output is identical to the input.
+    """
+
+    INPUT_FILE = "/mnt/connect.input"
+    OUTPUT_FILE = "/mnt/connect.output"
+
+    TOPIC = "test"
+    OFFSETS_TOPIC = "connect-offsets"
+    CONFIG_TOPIC = "connect-configs"
+
+    # Since tasks can be assigned to any node and we're testing with files, we need to make sure the content is the same
+    # across all nodes.
+    FIRST_INPUT_LIST = ["foo", "bar", "baz"]
+    FIRST_INPUTS = "\n".join(FIRST_INPUT_LIST) + "\n"
+    SECOND_INPUT_LIST = ["razz", "ma", "tazz"]
+    SECOND_INPUTS = "\n".join(SECOND_INPUT_LIST) + "\n"
+
+    SCHEMA = { "type": "string", "optional": False }
+
+    def __init__(self, test_context):
+        super(ConnectDistributedFileTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
+            'test' : { 'partitions': 1, 'replication-factor': 1 }
+        })
+
+        self.cc = ConnectDistributedService(test_context, 2, self.kafka, [self.INPUT_FILE, self.OUTPUT_FILE])
+
+    def test_file_source_and_sink(self, converter="org.apache.kafka.connect.json.JsonConverter", schemas=True):
+        assert converter != None, "converter type must be set"
+        # Template parameters
+        self.key_converter = converter
+        self.value_converter = converter
+        self.schemas = schemas
+
+        self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
+
+        self.cc.start()
+
+        self.logger.info("Creating connectors")
+        for connector_props in [self.render("connect-file-source.properties"), self.render("connect-file-sink.properties")]:
+            connector_config = dict([line.strip().split('=', 1) for line in connector_props.split('\n') if line.strip() and not line.strip().startswith('#')])
+            self.cc.create_connector(connector_config)
+
+        # Generating data on the source node should generate new records and create new output on the sink node. Timeouts
+        # here need to be more generous than they are for standalone mode because a) it takes longer to write configs,
+        # do rebalancing of the group, etc, and b) without explicit leave group support, rebalancing takes awhile
+        for node in self.cc.nodes:
+            node.account.ssh("echo -e -n " + repr(self.FIRST_INPUTS) + " >> " + self.INPUT_FILE)
+        wait_until(lambda: self.validate_output(self.FIRST_INPUT_LIST), timeout_sec=120, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.")
+
+        # Restarting both should result in them picking up where they left off,
+        # only processing new data.
+        self.cc.restart()
+
+        for node in self.cc.nodes:
+            node.account.ssh("echo -e -n " + repr(self.SECOND_INPUTS) + " >> " + self.INPUT_FILE)
+        wait_until(lambda: self.validate_output(self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST), timeout_sec=120, err_msg="Sink output file never converged to the same state as the input file")
+
+    def validate_output(self, input):
+        input_set = set(input)
+        # Output needs to be collected from all nodes because we can't be sure where the tasks will be scheduled.
+        # Between the first and second rounds, we might even end up with half the data on each node.
+        output_set = set(itertools.chain(*[
+            [line.strip() for line in self.file_contents(node, self.OUTPUT_FILE)] for node in self.cc.nodes
+        ]))
+        return input_set == output_set
+
+
+    def file_contents(self, node, file):
+        try:
+            # Convert to a list here or the CalledProcessError may be returned during a call to the generator instead of
+            # immediately
+            return list(node.account.ssh_capture("cat " + file))
+        except subprocess.CalledProcessError:
+            return []

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/tests/kafkatest/tests/connect_rest_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect_rest_test.py b/tests/kafkatest/tests/connect_rest_test.py
new file mode 100644
index 0000000..8e713d4
--- /dev/null
+++ b/tests/kafkatest/tests/connect_rest_test.py
@@ -0,0 +1,163 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.connect import ConnectDistributedService, ConnectRestError
+from ducktape.utils.util import wait_until
+import hashlib, subprocess, json, itertools
+
+class ConnectRestApiTest(KafkaTest):
+    """
+    Test of Kafka Connect's REST API endpoints.
+    """
+
+    INPUT_FILE = "/mnt/connect.input"
+    INPUT_FILE2 = "/mnt/connect.input2"
+    OUTPUT_FILE = "/mnt/connect.output"
+
+    TOPIC = "test"
+    OFFSETS_TOPIC = "connect-offsets"
+    CONFIG_TOPIC = "connect-configs"
+
+    # Since tasks can be assigned to any node and we're testing with files, we need to make sure the content is the same
+    # across all nodes.
+    INPUT_LIST = ["foo", "bar", "baz"]
+    INPUTS = "\n".join(INPUT_LIST) + "\n"
+    LONGER_INPUT_LIST = ["foo", "bar", "baz", "razz", "ma", "tazz"]
+    LONER_INPUTS = "\n".join(LONGER_INPUT_LIST) + "\n"
+
+    SCHEMA = { "type": "string", "optional": False }
+
+    def __init__(self, test_context):
+        super(ConnectRestApiTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
+            'test' : { 'partitions': 1, 'replication-factor': 1 }
+        })
+
+        self.cc = ConnectDistributedService(test_context, 2, self.kafka, [self.INPUT_FILE, self.INPUT_FILE2, self.OUTPUT_FILE])
+
+    def test_rest_api(self):
+        # Template parameters
+        self.key_converter = "org.apache.kafka.connect.json.JsonConverter"
+        self.value_converter = "org.apache.kafka.connect.json.JsonConverter"
+        self.schemas = True
+
+        self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
+
+        self.cc.start()
+
+        assert self.cc.list_connectors() == []
+
+        self.logger.info("Creating connectors")
+        source_connector_props = self.render("connect-file-source.properties")
+        sink_connector_props = self.render("connect-file-sink.properties")
+        for connector_props in [source_connector_props, sink_connector_props]:
+            connector_config = self._config_dict_from_props(connector_props)
+            self.cc.create_connector(connector_config)
+
+        # We should see the connectors appear
+        wait_until(lambda: set(self.cc.list_connectors()) == set(["local-file-source", "local-file-sink"]),
+                   timeout_sec=10, err_msg="Connectors that were just created did not appear in connector listing")
+
+        # We'll only do very simple validation that the connectors and tasks really ran.
+        for node in self.cc.nodes:
+            node.account.ssh("echo -e -n " + repr(self.INPUTS) + " >> " + self.INPUT_FILE)
+        wait_until(lambda: self.validate_output(self.INPUT_LIST), timeout_sec=120, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.")
+
+
+        # Trying to create the same connector again should cause an error
+        try:
+            self.cc.create_connector(self._config_dict_from_props(source_connector_props))
+            assert False, "creating the same connector should have caused a conflict"
+        except ConnectRestError:
+            pass # expected
+
+        # Validate that we can get info about connectors
+        expected_source_info = {
+            'name': 'local-file-source',
+            'config': self._config_dict_from_props(source_connector_props),
+            'tasks': [{ 'connector': 'local-file-source', 'task': 0 }]
+        }
+        source_info = self.cc.get_connector("local-file-source")
+        assert expected_source_info == source_info, "Incorrect info:" + json.dumps(source_info)
+        source_config = self.cc.get_connector_config("local-file-source")
+        assert expected_source_info['config'] == source_config, "Incorrect config: " + json.dumps(source_config)
+        expected_sink_info = {
+            'name': 'local-file-sink',
+            'config': self._config_dict_from_props(sink_connector_props),
+            'tasks': [{ 'connector': 'local-file-sink', 'task': 0 }]
+        }
+        sink_info = self.cc.get_connector("local-file-sink")
+        assert expected_sink_info == sink_info, "Incorrect info:" + json.dumps(sink_info)
+        sink_config = self.cc.get_connector_config("local-file-sink")
+        assert expected_sink_info['config'] == sink_config, "Incorrect config: " + json.dumps(sink_config)
+
+
+        # Validate that we can get info about tasks. This info should definitely be available now without waiting since
+        # we've already seen data appear in files.
+        # TODO: It would be nice to validate a complete listing, but that doesn't make sense for the file connectors
+        expected_source_task_info = [{
+            'id': { 'connector': 'local-file-source', 'task': 0 },
+            'config': {
+                'task.class': 'org.apache.kafka.connect.file.FileStreamSourceTask',
+                'file': self.INPUT_FILE,
+                'topic': self.TOPIC
+            }
+        }]
+        source_task_info = self.cc.get_connector_tasks("local-file-source")
+        assert expected_source_task_info == source_task_info, "Incorrect info:" + json.dumps(source_task_info)
+        expected_sink_task_info = [{
+            'id': { 'connector': 'local-file-sink', 'task': 0 },
+            'config': {
+                'task.class': 'org.apache.kafka.connect.file.FileStreamSinkTask',
+                'file': self.OUTPUT_FILE,
+                'topics': self.TOPIC
+            }
+        }]
+        sink_task_info = self.cc.get_connector_tasks("local-file-sink")
+        assert expected_sink_task_info == sink_task_info, "Incorrect info:" + json.dumps(sink_task_info)
+
+        file_source_config = self._config_dict_from_props(source_connector_props)
+        file_source_config['file'] = self.INPUT_FILE2
+        self.cc.set_connector_config("local-file-source", file_source_config)
+
+        # We should also be able to verify that the modified configs caused the tasks to move to the new file and pick up
+        # more data.
+        for node in self.cc.nodes:
+            node.account.ssh("echo -e -n " + repr(self.LONER_INPUTS) + " >> " + self.INPUT_FILE2)
+        wait_until(lambda: self.validate_output(self.LONGER_INPUT_LIST), timeout_sec=120, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.")
+
+        self.cc.delete_connector("local-file-source")
+        self.cc.delete_connector("local-file-sink")
+        wait_until(lambda: len(self.cc.list_connectors()) == 0, timeout_sec=10, err_msg="Deleted connectors did not disappear from REST listing")
+
+    def validate_output(self, input):
+        input_set = set(input)
+        # Output needs to be collected from all nodes because we can't be sure where the tasks will be scheduled.
+        output_set = set(itertools.chain(*[
+            [line.strip() for line in self.file_contents(node, self.OUTPUT_FILE)] for node in self.cc.nodes
+            ]))
+        return input_set == output_set
+
+
+    def file_contents(self, node, file):
+        try:
+            # Convert to a list here or the CalledProcessError may be returned during a call to the generator instead of
+            # immediately
+            return list(node.account.ssh_capture("cat " + file))
+        except subprocess.CalledProcessError:
+            return []
+
+    def _config_dict_from_props(self, connector_props):
+        return dict([line.strip().split('=', 1) for line in connector_props.split('\n') if line.strip() and not line.strip().startswith('#')])
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/tests/kafkatest/tests/connect_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect_test.py b/tests/kafkatest/tests/connect_test.py
new file mode 100644
index 0000000..90f219a
--- /dev/null
+++ b/tests/kafkatest/tests/connect_test.py
@@ -0,0 +1,93 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.connect import ConnectStandaloneService
+from kafkatest.services.console_consumer import ConsoleConsumer
+from ducktape.utils.util import wait_until
+from ducktape.mark import parametrize
+import hashlib, subprocess, json
+
+class ConnectStandaloneFileTest(KafkaTest):
+    """
+    Simple test of Kafka Connect that produces data from a file in one
+    standalone process and consumes it on another, validating the output is
+    identical to the input.
+    """
+
+    INPUT_FILE = "/mnt/connect.input"
+    OUTPUT_FILE = "/mnt/connect.output"
+
+    OFFSETS_FILE = "/mnt/connect.offsets"
+
+    TOPIC = "test"
+
+    FIRST_INPUT_LIST = ["foo", "bar", "baz"]
+    FIRST_INPUT = "\n".join(FIRST_INPUT_LIST) + "\n"
+    SECOND_INPUT_LIST = ["razz", "ma", "tazz"]
+    SECOND_INPUT = "\n".join(SECOND_INPUT_LIST) + "\n"
+
+    SCHEMA = { "type": "string", "optional": False }
+
+    def __init__(self, test_context):
+        super(ConnectStandaloneFileTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
+            'test' : { 'partitions': 1, 'replication-factor': 1 }
+        })
+
+        self.source = ConnectStandaloneService(test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE])
+        self.sink = ConnectStandaloneService(test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE])
+        self.consumer_validator = ConsoleConsumer(test_context, 1, self.kafka, self.TOPIC, consumer_timeout_ms=1000)
+
+    @parametrize(converter="org.apache.kafka.connect.json.JsonConverter", schemas=True)
+    @parametrize(converter="org.apache.kafka.connect.json.JsonConverter", schemas=False)
+    @parametrize(converter="org.apache.kafka.connect.storage.StringConverter", schemas=None)
+    def test_file_source_and_sink(self, converter="org.apache.kafka.connect.json.JsonConverter", schemas=True):
+        assert converter != None, "converter type must be set"
+        # Template parameters
+        self.key_converter = converter
+        self.value_converter = converter
+        self.schemas = schemas
+
+        self.source.set_configs(lambda node: self.render("connect-standalone.properties", node=node), [self.render("connect-file-source.properties")])
+        self.sink.set_configs(lambda node: self.render("connect-standalone.properties", node=node), [self.render("connect-file-sink.properties")])
+
+        self.source.start()
+        self.sink.start()
+
+        # Generating data on the source node should generate new records and create new output on the sink node
+        self.source.node.account.ssh("echo -e -n " + repr(self.FIRST_INPUT) + " >> " + self.INPUT_FILE)
+        wait_until(lambda: self.validate_output(self.FIRST_INPUT), timeout_sec=60, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.")
+
+        # Restarting both should result in them picking up where they left off,
+        # only processing new data.
+        self.source.restart()
+        self.sink.restart()
+
+        self.source.node.account.ssh("echo -e -n " + repr(self.SECOND_INPUT) + " >> " + self.INPUT_FILE)
+        wait_until(lambda: self.validate_output(self.FIRST_INPUT + self.SECOND_INPUT), timeout_sec=60, err_msg="Sink output file never converged to the same state as the input file")
+
+        # Validate the format of the data in the Kafka topic
+        self.consumer_validator.run()
+        expected = json.dumps([line if not self.schemas else { "schema": self.SCHEMA, "payload": line } for line in self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST])
+        decoder = (json.loads if converter.endswith("JsonConverter") else str)
+        actual = json.dumps([decoder(x) for x in self.consumer_validator.messages_consumed[1]])
+        assert expected == actual, "Expected %s but saw %s in Kafka" % (expected, actual)
+
+    def validate_output(self, value):
+        try:
+            output_hash = list(self.sink.node.account.ssh_capture("md5sum " + self.OUTPUT_FILE))[0].strip().split()[0]
+            return output_hash == hashlib.md5(value).hexdigest()
+        except subprocess.CalledProcessError:
+            return False

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/tests/kafkatest/tests/copycat_distributed_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/copycat_distributed_test.py b/tests/kafkatest/tests/copycat_distributed_test.py
deleted file mode 100644
index 10aa5e2..0000000
--- a/tests/kafkatest/tests/copycat_distributed_test.py
+++ /dev/null
@@ -1,97 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from kafkatest.tests.kafka_test import KafkaTest
-from kafkatest.services.copycat import CopycatDistributedService
-from ducktape.utils.util import wait_until
-import hashlib, subprocess, json, itertools
-
-class CopycatDistributedFileTest(KafkaTest):
-    """
-    Simple test of Copycat in distributed mode, producing data from files on on Copycat cluster and consuming it on
-    another, validating the total output is identical to the input.
-    """
-
-    INPUT_FILE = "/mnt/copycat.input"
-    OUTPUT_FILE = "/mnt/copycat.output"
-
-    TOPIC = "test"
-    OFFSETS_TOPIC = "copycat-offsets"
-    CONFIG_TOPIC = "copycat-configs"
-
-    # Since tasks can be assigned to any node and we're testing with files, we need to make sure the content is the same
-    # across all nodes.
-    FIRST_INPUT_LIST = ["foo", "bar", "baz"]
-    FIRST_INPUTS = "\n".join(FIRST_INPUT_LIST) + "\n"
-    SECOND_INPUT_LIST = ["razz", "ma", "tazz"]
-    SECOND_INPUTS = "\n".join(SECOND_INPUT_LIST) + "\n"
-
-    SCHEMA = { "type": "string", "optional": False }
-
-    def __init__(self, test_context):
-        super(CopycatDistributedFileTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
-            'test' : { 'partitions': 1, 'replication-factor': 1 }
-        })
-
-        self.cc = CopycatDistributedService(test_context, 2, self.kafka, [self.INPUT_FILE, self.OUTPUT_FILE])
-
-    def test_file_source_and_sink(self, converter="org.apache.kafka.copycat.json.JsonConverter", schemas=True):
-        assert converter != None, "converter type must be set"
-        # Template parameters
-        self.key_converter = converter
-        self.value_converter = converter
-        self.schemas = schemas
-
-        self.cc.set_configs(lambda node: self.render("copycat-distributed.properties", node=node))
-
-        self.cc.start()
-
-        self.logger.info("Creating connectors")
-        for connector_props in [self.render("copycat-file-source.properties"), self.render("copycat-file-sink.properties")]:
-            connector_config = dict([line.strip().split('=', 1) for line in connector_props.split('\n') if line.strip() and not line.strip().startswith('#')])
-            self.cc.create_connector(connector_config)
-
-        # Generating data on the source node should generate new records and create new output on the sink node. Timeouts
-        # here need to be more generous than they are for standalone mode because a) it takes longer to write configs,
-        # do rebalancing of the group, etc, and b) without explicit leave group support, rebalancing takes awhile
-        for node in self.cc.nodes:
-            node.account.ssh("echo -e -n " + repr(self.FIRST_INPUTS) + " >> " + self.INPUT_FILE)
-        wait_until(lambda: self.validate_output(self.FIRST_INPUT_LIST), timeout_sec=120, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.")
-
-        # Restarting both should result in them picking up where they left off,
-        # only processing new data.
-        self.cc.restart()
-
-        for node in self.cc.nodes:
-            node.account.ssh("echo -e -n " + repr(self.SECOND_INPUTS) + " >> " + self.INPUT_FILE)
-        wait_until(lambda: self.validate_output(self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST), timeout_sec=120, err_msg="Sink output file never converged to the same state as the input file")
-
-    def validate_output(self, input):
-        input_set = set(input)
-        # Output needs to be collected from all nodes because we can't be sure where the tasks will be scheduled.
-        # Between the first and second rounds, we might even end up with half the data on each node.
-        output_set = set(itertools.chain(*[
-            [line.strip() for line in self.file_contents(node, self.OUTPUT_FILE)] for node in self.cc.nodes
-        ]))
-        return input_set == output_set
-
-
-    def file_contents(self, node, file):
-        try:
-            # Convert to a list here or the CalledProcessError may be returned during a call to the generator instead of
-            # immediately
-            return list(node.account.ssh_capture("cat " + file))
-        except subprocess.CalledProcessError:
-            return []

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/tests/kafkatest/tests/copycat_rest_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/copycat_rest_test.py b/tests/kafkatest/tests/copycat_rest_test.py
deleted file mode 100644
index ad80e47..0000000
--- a/tests/kafkatest/tests/copycat_rest_test.py
+++ /dev/null
@@ -1,163 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from kafkatest.tests.kafka_test import KafkaTest
-from kafkatest.services.copycat import CopycatDistributedService, CopycatRestError
-from ducktape.utils.util import wait_until
-import hashlib, subprocess, json, itertools
-
-class CopycatRestApiTest(KafkaTest):
-    """
-    Test of Copycat's REST API endpoints.
-    """
-
-    INPUT_FILE = "/mnt/copycat.input"
-    INPUT_FILE2 = "/mnt/copycat.input2"
-    OUTPUT_FILE = "/mnt/copycat.output"
-
-    TOPIC = "test"
-    OFFSETS_TOPIC = "copycat-offsets"
-    CONFIG_TOPIC = "copycat-configs"
-
-    # Since tasks can be assigned to any node and we're testing with files, we need to make sure the content is the same
-    # across all nodes.
-    INPUT_LIST = ["foo", "bar", "baz"]
-    INPUTS = "\n".join(INPUT_LIST) + "\n"
-    LONGER_INPUT_LIST = ["foo", "bar", "baz", "razz", "ma", "tazz"]
-    LONER_INPUTS = "\n".join(LONGER_INPUT_LIST) + "\n"
-
-    SCHEMA = { "type": "string", "optional": False }
-
-    def __init__(self, test_context):
-        super(CopycatRestApiTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
-            'test' : { 'partitions': 1, 'replication-factor': 1 }
-        })
-
-        self.cc = CopycatDistributedService(test_context, 2, self.kafka, [self.INPUT_FILE, self.INPUT_FILE2, self.OUTPUT_FILE])
-
-    def test_rest_api(self):
-        # Template parameters
-        self.key_converter = "org.apache.kafka.copycat.json.JsonConverter"
-        self.value_converter = "org.apache.kafka.copycat.json.JsonConverter"
-        self.schemas = True
-
-        self.cc.set_configs(lambda node: self.render("copycat-distributed.properties", node=node))
-
-        self.cc.start()
-
-        assert self.cc.list_connectors() == []
-
-        self.logger.info("Creating connectors")
-        source_connector_props = self.render("copycat-file-source.properties")
-        sink_connector_props = self.render("copycat-file-sink.properties")
-        for connector_props in [source_connector_props, sink_connector_props]:
-            connector_config = self._config_dict_from_props(connector_props)
-            self.cc.create_connector(connector_config)
-
-        # We should see the connectors appear
-        wait_until(lambda: set(self.cc.list_connectors()) == set(["local-file-source", "local-file-sink"]),
-                   timeout_sec=10, err_msg="Connectors that were just created did not appear in connector listing")
-
-        # We'll only do very simple validation that the connectors and tasks really ran.
-        for node in self.cc.nodes:
-            node.account.ssh("echo -e -n " + repr(self.INPUTS) + " >> " + self.INPUT_FILE)
-        wait_until(lambda: self.validate_output(self.INPUT_LIST), timeout_sec=120, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.")
-
-
-        # Trying to create the same connector again should cause an error
-        try:
-            self.cc.create_connector(self._config_dict_from_props(source_connector_props))
-            assert False, "creating the same connector should have caused a conflict"
-        except CopycatRestError:
-            pass # expected
-
-        # Validate that we can get info about connectors
-        expected_source_info = {
-            'name': 'local-file-source',
-            'config': self._config_dict_from_props(source_connector_props),
-            'tasks': [{ 'connector': 'local-file-source', 'task': 0 }]
-        }
-        source_info = self.cc.get_connector("local-file-source")
-        assert expected_source_info == source_info, "Incorrect info:" + json.dumps(source_info)
-        source_config = self.cc.get_connector_config("local-file-source")
-        assert expected_source_info['config'] == source_config, "Incorrect config: " + json.dumps(source_config)
-        expected_sink_info = {
-            'name': 'local-file-sink',
-            'config': self._config_dict_from_props(sink_connector_props),
-            'tasks': [{ 'connector': 'local-file-sink', 'task': 0 }]
-        }
-        sink_info = self.cc.get_connector("local-file-sink")
-        assert expected_sink_info == sink_info, "Incorrect info:" + json.dumps(sink_info)
-        sink_config = self.cc.get_connector_config("local-file-sink")
-        assert expected_sink_info['config'] == sink_config, "Incorrect config: " + json.dumps(sink_config)
-
-
-        # Validate that we can get info about tasks. This info should definitely be available now without waiting since
-        # we've already seen data appear in files.
-        # TODO: It would be nice to validate a complete listing, but that doesn't make sense for the file connectors
-        expected_source_task_info = [{
-            'id': { 'connector': 'local-file-source', 'task': 0 },
-            'config': {
-                'task.class': 'org.apache.kafka.copycat.file.FileStreamSourceTask',
-                'file': self.INPUT_FILE,
-                'topic': self.TOPIC
-            }
-        }]
-        source_task_info = self.cc.get_connector_tasks("local-file-source")
-        assert expected_source_task_info == source_task_info, "Incorrect info:" + json.dumps(source_task_info)
-        expected_sink_task_info = [{
-            'id': { 'connector': 'local-file-sink', 'task': 0 },
-            'config': {
-                'task.class': 'org.apache.kafka.copycat.file.FileStreamSinkTask',
-                'file': self.OUTPUT_FILE,
-                'topics': self.TOPIC
-            }
-        }]
-        sink_task_info = self.cc.get_connector_tasks("local-file-sink")
-        assert expected_sink_task_info == sink_task_info, "Incorrect info:" + json.dumps(sink_task_info)
-
-        file_source_config = self._config_dict_from_props(source_connector_props)
-        file_source_config['file'] = self.INPUT_FILE2
-        self.cc.set_connector_config("local-file-source", file_source_config)
-
-        # We should also be able to verify that the modified configs caused the tasks to move to the new file and pick up
-        # more data.
-        for node in self.cc.nodes:
-            node.account.ssh("echo -e -n " + repr(self.LONER_INPUTS) + " >> " + self.INPUT_FILE2)
-        wait_until(lambda: self.validate_output(self.LONGER_INPUT_LIST), timeout_sec=120, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.")
-
-        self.cc.delete_connector("local-file-source")
-        self.cc.delete_connector("local-file-sink")
-        wait_until(lambda: len(self.cc.list_connectors()) == 0, timeout_sec=10, err_msg="Deleted connectors did not disappear from REST listing")
-
-    def validate_output(self, input):
-        input_set = set(input)
-        # Output needs to be collected from all nodes because we can't be sure where the tasks will be scheduled.
-        output_set = set(itertools.chain(*[
-            [line.strip() for line in self.file_contents(node, self.OUTPUT_FILE)] for node in self.cc.nodes
-            ]))
-        return input_set == output_set
-
-
-    def file_contents(self, node, file):
-        try:
-            # Convert to a list here or the CalledProcessError may be returned during a call to the generator instead of
-            # immediately
-            return list(node.account.ssh_capture("cat " + file))
-        except subprocess.CalledProcessError:
-            return []
-
-    def _config_dict_from_props(self, connector_props):
-        return dict([line.strip().split('=', 1) for line in connector_props.split('\n') if line.strip() and not line.strip().startswith('#')])
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/tests/kafkatest/tests/copycat_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/copycat_test.py b/tests/kafkatest/tests/copycat_test.py
deleted file mode 100644
index 6ded1bf..0000000
--- a/tests/kafkatest/tests/copycat_test.py
+++ /dev/null
@@ -1,93 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from kafkatest.tests.kafka_test import KafkaTest
-from kafkatest.services.copycat import CopycatStandaloneService
-from kafkatest.services.console_consumer import ConsoleConsumer
-from ducktape.utils.util import wait_until
-from ducktape.mark import parametrize
-import hashlib, subprocess, json
-
-class CopycatStandaloneFileTest(KafkaTest):
-    """
-    Simple test of Copycat that produces data from a file in one Copycat
-    standalone process and consumes it on another, validating the output is
-    identical to the input.
-    """
-
-    INPUT_FILE = "/mnt/copycat.input"
-    OUTPUT_FILE = "/mnt/copycat.output"
-
-    OFFSETS_FILE = "/mnt/copycat.offsets"
-
-    TOPIC = "test"
-
-    FIRST_INPUT_LIST = ["foo", "bar", "baz"]
-    FIRST_INPUT = "\n".join(FIRST_INPUT_LIST) + "\n"
-    SECOND_INPUT_LIST = ["razz", "ma", "tazz"]
-    SECOND_INPUT = "\n".join(SECOND_INPUT_LIST) + "\n"
-
-    SCHEMA = { "type": "string", "optional": False }
-
-    def __init__(self, test_context):
-        super(CopycatStandaloneFileTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
-            'test' : { 'partitions': 1, 'replication-factor': 1 }
-        })
-
-        self.source = CopycatStandaloneService(test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE])
-        self.sink = CopycatStandaloneService(test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE])
-        self.consumer_validator = ConsoleConsumer(test_context, 1, self.kafka, self.TOPIC, consumer_timeout_ms=1000)
-
-    @parametrize(converter="org.apache.kafka.copycat.json.JsonConverter", schemas=True)
-    @parametrize(converter="org.apache.kafka.copycat.json.JsonConverter", schemas=False)
-    @parametrize(converter="org.apache.kafka.copycat.storage.StringConverter", schemas=None)
-    def test_file_source_and_sink(self, converter="org.apache.kafka.copycat.json.JsonConverter", schemas=True):
-        assert converter != None, "converter type must be set"
-        # Template parameters
-        self.key_converter = converter
-        self.value_converter = converter
-        self.schemas = schemas
-
-        self.source.set_configs(lambda node: self.render("copycat-standalone.properties", node=node), [self.render("copycat-file-source.properties")])
-        self.sink.set_configs(lambda node: self.render("copycat-standalone.properties", node=node), [self.render("copycat-file-sink.properties")])
-
-        self.source.start()
-        self.sink.start()
-
-        # Generating data on the source node should generate new records and create new output on the sink node
-        self.source.node.account.ssh("echo -e -n " + repr(self.FIRST_INPUT) + " >> " + self.INPUT_FILE)
-        wait_until(lambda: self.validate_output(self.FIRST_INPUT), timeout_sec=60, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.")
-
-        # Restarting both should result in them picking up where they left off,
-        # only processing new data.
-        self.source.restart()
-        self.sink.restart()
-
-        self.source.node.account.ssh("echo -e -n " + repr(self.SECOND_INPUT) + " >> " + self.INPUT_FILE)
-        wait_until(lambda: self.validate_output(self.FIRST_INPUT + self.SECOND_INPUT), timeout_sec=60, err_msg="Sink output file never converged to the same state as the input file")
-
-        # Validate the format of the data in the Kafka topic
-        self.consumer_validator.run()
-        expected = json.dumps([line if not self.schemas else { "schema": self.SCHEMA, "payload": line } for line in self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST])
-        decoder = (json.loads if converter.endswith("JsonConverter") else str)
-        actual = json.dumps([decoder(x) for x in self.consumer_validator.messages_consumed[1]])
-        assert expected == actual, "Expected %s but saw %s in Kafka" % (expected, actual)
-
-    def validate_output(self, value):
-        try:
-            output_hash = list(self.sink.node.account.ssh_capture("md5sum " + self.OUTPUT_FILE))[0].strip().split()[0]
-            return output_hash == hashlib.md5(value).hexdigest()
-        except subprocess.CalledProcessError:
-            return False

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/tests/kafkatest/tests/templates/connect-distributed.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/templates/connect-distributed.properties b/tests/kafkatest/tests/templates/connect-distributed.properties
new file mode 100644
index 0000000..4a61b92
--- /dev/null
+++ b/tests/kafkatest/tests/templates/connect-distributed.properties
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+bootstrap.servers={{ kafka.bootstrap_servers() }}
+
+group.id={{ group|default("connect-cluster") }}
+
+key.converter={{ key_converter|default("org.apache.kafka.connect.json.JsonConverter") }}
+value.converter={{ value_converter|default("org.apache.kafka.connect.json.JsonConverter") }}
+{% if key_converter is not defined or key_converter.endswith("JsonConverter") %}
+key.converter.schemas.enable={{ schemas|default(True)|string|lower }}
+{% endif %}
+{% if value_converter is not defined or value_converter.endswith("JsonConverter") %}
+value.converter.schemas.enable={{ schemas|default(True)|string|lower }}
+{% endif %}
+
+internal.key.converter=org.apache.kafka.connect.json.JsonConverter
+internal.value.converter=org.apache.kafka.connect.json.JsonConverter
+internal.key.converter.schemas.enable=false
+internal.value.converter.schemas.enable=false
+
+offset.storage.topic={{ OFFSETS_TOPIC }}
+config.storage.topic={{ CONFIG_TOPIC }}
+
+# Make sure data gets flushed frequently so tests don't have to wait to ensure they see data in output systems
+offset.flush.interval.ms=5000
+
+rest.advertised.host.name = {{ node.account.hostname }}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/tests/kafkatest/tests/templates/connect-file-sink.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/templates/connect-file-sink.properties b/tests/kafkatest/tests/templates/connect-file-sink.properties
new file mode 100644
index 0000000..f52c26e
--- /dev/null
+++ b/tests/kafkatest/tests/templates/connect-file-sink.properties
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name=local-file-sink
+connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
+tasks.max=1
+file={{ OUTPUT_FILE }}
+topics={{ TOPIC }}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/tests/kafkatest/tests/templates/connect-file-source.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/templates/connect-file-source.properties b/tests/kafkatest/tests/templates/connect-file-source.properties
new file mode 100644
index 0000000..e8a6f05
--- /dev/null
+++ b/tests/kafkatest/tests/templates/connect-file-source.properties
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name=local-file-source
+connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
+tasks.max=1
+file={{ INPUT_FILE }}
+topic={{ TOPIC }}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/tests/kafkatest/tests/templates/connect-standalone.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/templates/connect-standalone.properties b/tests/kafkatest/tests/templates/connect-standalone.properties
new file mode 100644
index 0000000..bf1daf7
--- /dev/null
+++ b/tests/kafkatest/tests/templates/connect-standalone.properties
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+bootstrap.servers={{ kafka.bootstrap_servers() }}
+
+key.converter={{ key_converter|default("org.apache.kafka.connect.json.JsonConverter") }}
+value.converter={{ value_converter|default("org.apache.kafka.connect.json.JsonConverter") }}
+{% if key_converter is not defined or key_converter.endswith("JsonConverter") %}
+key.converter.schemas.enable={{ schemas|default(True)|string|lower }}
+{% endif %}
+{% if value_converter is not defined or value_converter.endswith("JsonConverter") %}
+value.converter.schemas.enable={{ schemas|default(True)|string|lower }}
+{% endif %}
+
+internal.key.converter=org.apache.kafka.connect.json.JsonConverter
+internal.value.converter=org.apache.kafka.connect.json.JsonConverter
+internal.key.converter.schemas.enable=false
+internal.value.converter.schemas.enable=false
+
+offset.storage.file.filename={{ OFFSETS_FILE }}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/tests/kafkatest/tests/templates/copycat-distributed.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/templates/copycat-distributed.properties b/tests/kafkatest/tests/templates/copycat-distributed.properties
deleted file mode 100644
index 1d76411..0000000
--- a/tests/kafkatest/tests/templates/copycat-distributed.properties
+++ /dev/null
@@ -1,40 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-bootstrap.servers={{ kafka.bootstrap_servers() }}
-
-group.id={{ group|default("copycat-cluster") }}
-
-key.converter={{ key_converter|default("org.apache.kafka.copycat.json.JsonConverter") }}
-value.converter={{ value_converter|default("org.apache.kafka.copycat.json.JsonConverter") }}
-{% if key_converter is not defined or key_converter.endswith("JsonConverter") %}
-key.converter.schemas.enable={{ schemas|default(True)|string|lower }}
-{% endif %}
-{% if value_converter is not defined or value_converter.endswith("JsonConverter") %}
-value.converter.schemas.enable={{ schemas|default(True)|string|lower }}
-{% endif %}
-
-internal.key.converter=org.apache.kafka.copycat.json.JsonConverter
-internal.value.converter=org.apache.kafka.copycat.json.JsonConverter
-internal.key.converter.schemas.enable=false
-internal.value.converter.schemas.enable=false
-
-offset.storage.topic={{ OFFSETS_TOPIC }}
-config.storage.topic={{ CONFIG_TOPIC }}
-
-# Make sure data gets flushed frequently so tests don't have to wait to ensure they see data in output systems
-offset.flush.interval.ms=5000
-
-rest.advertised.host.name = {{ node.account.hostname }}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/tests/kafkatest/tests/templates/copycat-file-sink.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/templates/copycat-file-sink.properties b/tests/kafkatest/tests/templates/copycat-file-sink.properties
deleted file mode 100644
index 77c43c7..0000000
--- a/tests/kafkatest/tests/templates/copycat-file-sink.properties
+++ /dev/null
@@ -1,20 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-name=local-file-sink
-connector.class=org.apache.kafka.copycat.file.FileStreamSinkConnector
-tasks.max=1
-file={{ OUTPUT_FILE }}
-topics={{ TOPIC }}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/tests/kafkatest/tests/templates/copycat-file-source.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/templates/copycat-file-source.properties b/tests/kafkatest/tests/templates/copycat-file-source.properties
deleted file mode 100644
index 68dabc2..0000000
--- a/tests/kafkatest/tests/templates/copycat-file-source.properties
+++ /dev/null
@@ -1,20 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-name=local-file-source
-connector.class=org.apache.kafka.copycat.file.FileStreamSourceConnector
-tasks.max=1
-file={{ INPUT_FILE }}
-topic={{ TOPIC }}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/tests/kafkatest/tests/templates/copycat-standalone.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/templates/copycat-standalone.properties b/tests/kafkatest/tests/templates/copycat-standalone.properties
deleted file mode 100644
index c89490a..0000000
--- a/tests/kafkatest/tests/templates/copycat-standalone.properties
+++ /dev/null
@@ -1,32 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-bootstrap.servers={{ kafka.bootstrap_servers() }}
-
-key.converter={{ key_converter|default("org.apache.kafka.copycat.json.JsonConverter") }}
-value.converter={{ value_converter|default("org.apache.kafka.copycat.json.JsonConverter") }}
-{% if key_converter is not defined or key_converter.endswith("JsonConverter") %}
-key.converter.schemas.enable={{ schemas|default(True)|string|lower }}
-{% endif %}
-{% if value_converter is not defined or value_converter.endswith("JsonConverter") %}
-value.converter.schemas.enable={{ schemas|default(True)|string|lower }}
-{% endif %}
-
-internal.key.converter=org.apache.kafka.copycat.json.JsonConverter
-internal.value.converter=org.apache.kafka.copycat.json.JsonConverter
-internal.key.converter.schemas.enable=false
-internal.value.converter.schemas.enable=false
-
-offset.storage.file.filename={{ OFFSETS_FILE }}


Mime
View raw message