kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject [2/4] kafka git commit: Revert "KAFKA-4345; Run decktape test for each pull request"
Date Tue, 29 Nov 2016 17:11:56 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/core/upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/upgrade_test.py b/tests/kafkatest/tests/core/upgrade_test.py
new file mode 100644
index 0000000..15a9696
--- /dev/null
+++ b/tests/kafkatest/tests/core/upgrade_test.py
@@ -0,0 +1,128 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize
+
+import json
+
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import config_property
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, TRUNK, KafkaVersion
+
+class TestUpgrade(ProduceConsumeValidateTest):
+
+    def __init__(self, test_context):
+        super(TestUpgrade, self).__init__(test_context=test_context)
+
+    def setUp(self):
+        self.topic = "test_topic"
+        self.zk = ZookeeperService(self.test_context, num_nodes=1)
+        self.zk.start()
+
+        # Producer and consumer
+        self.producer_throughput = 10000
+        self.num_producers = 1
+        self.num_consumers = 1
+
+    def perform_upgrade(self, from_kafka_version, to_message_format_version=None):
+        self.logger.info("First pass bounce - rolling upgrade")
+        for node in self.kafka.nodes:
+            self.kafka.stop_node(node)
+            node.version = TRUNK
+            node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = from_kafka_version
+            node.config[config_property.MESSAGE_FORMAT_VERSION] = from_kafka_version
+            self.kafka.start_node(node)
+
+        self.logger.info("Second pass bounce - remove inter.broker.protocol.version config")
+        for node in self.kafka.nodes:
+            self.kafka.stop_node(node)
+            del node.config[config_property.INTER_BROKER_PROTOCOL_VERSION]
+            if to_message_format_version is None:
+                del node.config[config_property.MESSAGE_FORMAT_VERSION]
+            else:
+                node.config[config_property.MESSAGE_FORMAT_VERSION] = to_message_format_version
+            self.kafka.start_node(node)
+
+    @parametrize(from_kafka_version=str(LATEST_0_10_0), to_message_format_version=None, compression_types=["snappy"], new_consumer=False)
+    @parametrize(from_kafka_version=str(LATEST_0_10_0), to_message_format_version=None, compression_types=["snappy"])
+    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"], new_consumer=False)
+    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"], security_protocol="SASL_SSL")
+    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["snappy"])
+    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["lz4"], new_consumer=False)
+    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["lz4"])
+    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False)
+    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["snappy"])
+    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["lz4"], new_consumer=False)
+    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["lz4"])
+    @parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["none"], new_consumer=False)
+    @parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["snappy"], new_consumer=False)
+    def test_upgrade(self, from_kafka_version, to_message_format_version, compression_types,
+                     new_consumer=True, security_protocol="PLAINTEXT"):
+        """Test upgrade of Kafka broker cluster from 0.8.2, 0.9.0 or 0.10.0 to the current version
+
+        from_kafka_version is a Kafka version to upgrade from: either 0.8.2.X, 0.9.0.x or 0.10.0.x
+
+        If to_message_format_version is None, it means that we will upgrade to default (latest)
+        message format version. It is possible to upgrade to 0.10 brokers but still use message
+        format version 0.9
+
+        - Start 3 node broker cluster on version 'from_kafka_version'
+        - Start producer and consumer in the background
+        - Perform two-phase rolling upgrade
+            - First phase: upgrade brokers to 0.10 with inter.broker.protocol.version set to
+            from_kafka_version and log.message.format.version set to from_kafka_version
+            - Second phase: remove inter.broker.protocol.version config with rolling bounce; if
+            to_message_format_version is set to 0.9, set log.message.format.version to
+            to_message_format_version, otherwise remove log.message.format.version config
+        - Finally, validate that every message acked by the producer was consumed by the consumer
+        """
+        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk,
+                                  version=KafkaVersion(from_kafka_version),
+                                  topics={self.topic: {"partitions": 3, "replication-factor": 3,
+                                                       'configs': {"min.insync.replicas": 2}}})
+        self.kafka.security_protocol = security_protocol
+        self.kafka.interbroker_security_protocol = security_protocol
+        self.kafka.start()
+
+        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
+                                           self.topic, throughput=self.producer_throughput,
+                                           message_validator=is_int,
+                                           compression_types=compression_types,
+                                           version=KafkaVersion(from_kafka_version))
+
+        assert self.zk.query("/cluster/id") is None
+
+        # TODO - reduce the timeout
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
+                                        self.topic, consumer_timeout_ms=30000, new_consumer=new_consumer,
+                                        message_validator=is_int, version=KafkaVersion(from_kafka_version))
+
+        self.run_produce_consume_validate(core_test_action=lambda: self.perform_upgrade(from_kafka_version,
+                                                                                        to_message_format_version))
+
+        cluster_id_json = self.zk.query("/cluster/id")
+        assert cluster_id_json is not None
+        try:
+            cluster_id = json.loads(cluster_id_json)
+        except :
+            self.logger.debug("Data in /cluster/id znode could not be parsed. Data = %s" % cluster_id_json)
+
+        self.logger.debug("Cluster id [%s]", cluster_id)
+        assert len(cluster_id["id"]) == 22

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py b/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
new file mode 100644
index 0000000..0cfdf16
--- /dev/null
+++ b/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
@@ -0,0 +1,115 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import matrix
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.services.security.kafka_acls import ACLs
+from kafkatest.utils import is_int
+
+class ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest):
+    """Tests a rolling upgrade for zookeeper.
+    """
+
+    def __init__(self, test_context):
+        super(ZooKeeperSecurityUpgradeTest, self).__init__(test_context=test_context)
+
+    def setUp(self):
+        self.topic = "test_topic"
+        self.group = "group"
+        self.producer_throughput = 100
+        self.num_producers = 1
+        self.num_consumers = 1
+        self.acls = ACLs(self.test_context)
+
+        self.zk = ZookeeperService(self.test_context, num_nodes=3)
+
+        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
+            "partitions": 3,
+            "replication-factor": 3,
+            'configs': {"min.insync.replicas": 2}}})
+
+    def create_producer_and_consumer(self):
+        self.producer = VerifiableProducer(
+            self.test_context, self.num_producers, self.kafka, self.topic,
+            throughput=self.producer_throughput)
+
+        self.consumer = ConsoleConsumer(
+            self.test_context, self.num_consumers, self.kafka, self.topic,
+            consumer_timeout_ms=60000, message_validator=is_int)
+
+        self.consumer.group_id = self.group
+
+    @property
+    def no_sasl(self):
+        return self.kafka.security_protocol == "PLAINTEXT" or self.kafka.security_protocol == "SSL"
+
+    @property
+    def is_secure(self):
+        return self.kafka.security_protocol == "SASL_PLAINTEXT" \
+               or self.kafka.security_protocol == "SSL" \
+               or self.kafka.security_protocol == "SASL_SSL"
+
+    def run_zk_migration(self):
+        # change zk config (auth provider + jaas login)
+        self.zk.kafka_opts = self.zk.security_system_properties
+        self.zk.zk_sasl = True
+        if self.no_sasl:
+            self.kafka.start_minikdc(self.zk.zk_principals)
+        # restart zk
+        for node in self.zk.nodes:
+            self.zk.stop_node(node)
+            self.zk.start_node(node)
+
+        # restart broker with jaas login
+        for node in self.kafka.nodes:
+            self.kafka.stop_node(node)
+            self.kafka.start_node(node)
+
+        # run migration tool
+        for node in self.zk.nodes:
+            self.zk.zookeeper_migration(node, "secure")
+
+        # restart broker with zookeeper.set.acl=true and acls
+        self.kafka.zk_set_acl = True
+        for node in self.kafka.nodes:
+            self.kafka.stop_node(node)
+            self.kafka.start_node(node)
+
+    @matrix(security_protocol=["PLAINTEXT","SSL","SASL_SSL","SASL_PLAINTEXT"])
+    def test_zk_security_upgrade(self, security_protocol):
+        self.zk.start()
+        self.kafka.security_protocol = security_protocol
+        self.kafka.interbroker_security_protocol = security_protocol
+
+        # set acls
+        if self.is_secure:
+            self.kafka.authorizer_class_name = KafkaService.SIMPLE_AUTHORIZER
+            self.acls.set_acls(security_protocol, self.kafka, self.zk, self.topic, self.group)
+
+        if(self.no_sasl):
+            self.kafka.start()
+        else:
+            self.kafka.start(self.zk.zk_principals)
+
+        #Create Producer and Consumer
+        self.create_producer_and_consumer()
+
+        #Run upgrade
+        self.run_produce_consume_validate(self.run_zk_migration)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/core1/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core1/__init__.py b/tests/kafkatest/tests/core1/__init__.py
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/core1/consumer_group_command_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core1/consumer_group_command_test.py b/tests/kafkatest/tests/core1/consumer_group_command_test.py
deleted file mode 100644
index c3f59d9..0000000
--- a/tests/kafkatest/tests/core1/consumer_group_command_test.py
+++ /dev/null
@@ -1,106 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from ducktape.utils.util import wait_until
-from ducktape.tests.test import Test
-from ducktape.mark import matrix
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.services.security.security_config import SecurityConfig
-
-import os
-import re
-
-TOPIC = "topic-consumer-group-command"
-
-class ConsumerGroupCommandTest(Test):
-    """
-    Tests ConsumerGroupCommand
-    """
-    # Root directory for persistent output
-    PERSISTENT_ROOT = "/mnt/consumer_group_command"
-    COMMAND_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "command.properties")
-
-    def __init__(self, test_context):
-        super(ConsumerGroupCommandTest, self).__init__(test_context)
-        self.num_zk = 1
-        self.num_brokers = 1
-        self.topics = {
-            TOPIC: {'partitions': 1, 'replication-factor': 1}
-        }
-        self.zk = ZookeeperService(test_context, self.num_zk)
-
-    def setUp(self):
-        self.zk.start()
-
-    def start_kafka(self, security_protocol, interbroker_security_protocol):
-        self.kafka = KafkaService(
-            self.test_context, self.num_brokers,
-            self.zk, security_protocol=security_protocol,
-            interbroker_security_protocol=interbroker_security_protocol, topics=self.topics)
-        self.kafka.start()
-
-    def start_consumer(self, security_protocol):
-        enable_new_consumer = security_protocol == SecurityConfig.SSL
-        self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC,
-                                        consumer_timeout_ms=None, new_consumer=enable_new_consumer)
-        self.consumer.start()
-
-    def setup_and_verify(self, security_protocol, group=None):
-        self.start_kafka(security_protocol, security_protocol)
-        self.start_consumer(security_protocol)
-        consumer_node = self.consumer.nodes[0]
-        wait_until(lambda: self.consumer.alive(consumer_node),
-                   timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start")
-        kafka_node = self.kafka.nodes[0]
-        if security_protocol is not SecurityConfig.PLAINTEXT:
-            prop_file = str(self.kafka.security_config.client_config())
-            self.logger.debug(prop_file)
-            kafka_node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False)
-            kafka_node.account.create_file(self.COMMAND_CONFIG_FILE, prop_file)
-
-        # Verify ConsumerGroupCommand lists expected consumer groups
-        enable_new_consumer = security_protocol != SecurityConfig.PLAINTEXT
-        command_config_file = None
-        if enable_new_consumer:
-            command_config_file = self.COMMAND_CONFIG_FILE
-
-        if group:
-            wait_until(lambda: re.search("topic-consumer-group-command",self.kafka.describe_consumer_group(group=group, node=kafka_node, new_consumer=enable_new_consumer, command_config=command_config_file)), timeout_sec=10,
-                       err_msg="Timed out waiting to list expected consumer groups.")
-        else:
-            wait_until(lambda: "test-consumer-group" in self.kafka.list_consumer_groups(node=kafka_node, new_consumer=enable_new_consumer, command_config=command_config_file), timeout_sec=10,
-                       err_msg="Timed out waiting to list expected consumer groups.")
-
-        self.consumer.stop()
-
-    @matrix(security_protocol=['PLAINTEXT', 'SSL'])
-    def test_list_consumer_groups(self, security_protocol='PLAINTEXT'):
-        """
-        Tests if ConsumerGroupCommand is listing correct consumer groups
-        :return: None
-        """
-        self.setup_and_verify(security_protocol)
-
-    @matrix(security_protocol=['PLAINTEXT', 'SSL'])
-    def test_describe_consumer_group(self, security_protocol='PLAINTEXT'):
-        """
-        Tests if ConsumerGroupCommand is describing a consumer group correctly
-        :return: None
-        """
-        self.setup_and_verify(security_protocol, group="test-consumer-group")

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/core1/get_offset_shell_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core1/get_offset_shell_test.py b/tests/kafkatest/tests/core1/get_offset_shell_test.py
deleted file mode 100644
index 38bd9dc..0000000
--- a/tests/kafkatest/tests/core1/get_offset_shell_test.py
+++ /dev/null
@@ -1,91 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from ducktape.utils.util import wait_until
-from ducktape.tests.test import Test
-from kafkatest.services.verifiable_producer import VerifiableProducer
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.services.security.security_config import SecurityConfig
-
-TOPIC = "topic-get-offset-shell"
-MAX_MESSAGES = 100
-NUM_PARTITIONS = 1
-REPLICATION_FACTOR = 1
-
-class GetOffsetShellTest(Test):
-    """
-    Tests GetOffsetShell tool
-    """
-    def __init__(self, test_context):
-        super(GetOffsetShellTest, self).__init__(test_context)
-        self.num_zk = 1
-        self.num_brokers = 1
-        self.messages_received_count = 0
-        self.topics = {
-            TOPIC: {'partitions': NUM_PARTITIONS, 'replication-factor': REPLICATION_FACTOR}
-        }
-
-        self.zk = ZookeeperService(test_context, self.num_zk)
-
-
-
-    def setUp(self):
-        self.zk.start()
-
-    def start_kafka(self, security_protocol, interbroker_security_protocol):
-        self.kafka = KafkaService(
-            self.test_context, self.num_brokers,
-            self.zk, security_protocol=security_protocol,
-            interbroker_security_protocol=interbroker_security_protocol, topics=self.topics)
-        self.kafka.start()
-
-    def start_producer(self):
-        # This will produce to kafka cluster
-        self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka, topic=TOPIC, throughput=1000, max_messages=MAX_MESSAGES)
-        self.producer.start()
-        current_acked = self.producer.num_acked
-        wait_until(lambda: self.producer.num_acked >= current_acked + MAX_MESSAGES, timeout_sec=10,
-                   err_msg="Timeout awaiting messages to be produced and acked")
-
-    def start_consumer(self, security_protocol):
-        enable_new_consumer = security_protocol != SecurityConfig.PLAINTEXT
-        self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC,
-                                        consumer_timeout_ms=1000, new_consumer=enable_new_consumer)
-        self.consumer.start()
-
-    def test_get_offset_shell(self, security_protocol='PLAINTEXT'):
-        """
-        Tests if GetOffsetShell is getting offsets correctly
-        :return: None
-        """
-        self.start_kafka(security_protocol, security_protocol)
-        self.start_producer()
-
-        # Assert that offset fetched without any consumers consuming is 0
-        assert self.kafka.get_offset_shell(TOPIC, None, 1000, 1, -1), "%s:%s:%s" % (TOPIC, NUM_PARTITIONS - 1, 0)
-
-        self.start_consumer(security_protocol)
-
-        node = self.consumer.nodes[0]
-
-        wait_until(lambda: self.consumer.alive(node), timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start")
-
-        # Assert that offset is correctly indicated by GetOffsetShell tool
-        wait_until(lambda: "%s:%s:%s" % (TOPIC, NUM_PARTITIONS - 1, MAX_MESSAGES) in self.kafka.get_offset_shell(TOPIC, None, 1000, 1, -1), timeout_sec=10,
-                   err_msg="Timed out waiting to reach expected offset.")
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/core1/reassign_partitions_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core1/reassign_partitions_test.py b/tests/kafkatest/tests/core1/reassign_partitions_test.py
deleted file mode 100644
index 850e2aa..0000000
--- a/tests/kafkatest/tests/core1/reassign_partitions_test.py
+++ /dev/null
@@ -1,110 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.mark import parametrize
-from ducktape.utils.util import wait_until
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.utils import is_int
-import random
-
-class ReassignPartitionsTest(ProduceConsumeValidateTest):
-    """
-    These tests validate partition reassignment.
-    Create a topic with few partitions, load some data, trigger partition re-assignment with and without broker failure,
-    check that partition re-assignment can complete and there is no data loss.
-    """
-
-    def __init__(self, test_context):
-        """:type test_context: ducktape.tests.test.TestContext"""
-        super(ReassignPartitionsTest, self).__init__(test_context=test_context)
-
-        self.topic = "test_topic"
-        self.zk = ZookeeperService(test_context, num_nodes=1)
-        self.kafka = KafkaService(test_context, num_nodes=4, zk=self.zk, topics={self.topic: {
-                                                                    "partitions": 20,
-                                                                    "replication-factor": 3,
-                                                                    'configs': {"min.insync.replicas": 2}}
-                                                                })
-        self.num_partitions = 20
-        self.timeout_sec = 60
-        self.producer_throughput = 1000
-        self.num_producers = 1
-        self.num_consumers = 1
-
-    def setUp(self):
-        self.zk.start()
-
-    def min_cluster_size(self):
-        # Override this since we're adding services outside of the constructor
-        return super(ReassignPartitionsTest, self).min_cluster_size() + self.num_producers + self.num_consumers
-
-    def clean_bounce_some_brokers(self):
-        """Bounce every other broker"""
-        for node in self.kafka.nodes[::2]:
-            self.kafka.restart_node(node, clean_shutdown=True)
-
-    def reassign_partitions(self, bounce_brokers):
-        partition_info = self.kafka.parse_describe_topic(self.kafka.describe_topic(self.topic))
-        self.logger.debug("Partitions before reassignment:" + str(partition_info))
-
-        # jumble partition assignment in dictionary
-        seed = random.randint(0, 2 ** 31 - 1)
-        self.logger.debug("Jumble partition assignment with seed " + str(seed))
-        random.seed(seed)
-        # The list may still be in order, but that's ok
-        shuffled_list = range(0, self.num_partitions)
-        random.shuffle(shuffled_list)
-
-        for i in range(0, self.num_partitions):
-            partition_info["partitions"][i]["partition"] = shuffled_list[i]
-        self.logger.debug("Jumbled partitions: " + str(partition_info))
-
-        # send reassign partitions command
-        self.kafka.execute_reassign_partitions(partition_info)
-
-        if bounce_brokers:
-            # bounce a few brokers at the same time
-            self.clean_bounce_some_brokers()
-
-        # Wait until finished or timeout
-        wait_until(lambda: self.kafka.verify_reassign_partitions(partition_info), timeout_sec=self.timeout_sec, backoff_sec=.5)
-
-    @parametrize(security_protocol="PLAINTEXT", bounce_brokers=True)
-    @parametrize(security_protocol="PLAINTEXT", bounce_brokers=False)
-    def test_reassign_partitions(self, bounce_brokers, security_protocol):
-        """Reassign partitions tests.
-        Setup: 1 zk, 3 kafka nodes, 1 topic with partitions=3, replication-factor=3, and min.insync.replicas=2
-
-            - Produce messages in the background
-            - Consume messages in the background
-            - Reassign partitions
-            - If bounce_brokers is True, also bounce a few brokers while partition re-assignment is in progress
-            - When done reassigning partitions and bouncing brokers, stop producing, and finish consuming
-            - Validate that every acked message was consumed
-        """
-
-        self.kafka.security_protocol = security_protocol
-        self.kafka.interbroker_security_protocol = security_protocol
-        new_consumer = False if  self.kafka.security_protocol == "PLAINTEXT" else True
-        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
-        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, message_validator=is_int)
-        self.kafka.start()
-
-        self.run_produce_consume_validate(core_test_action=lambda: self.reassign_partitions(bounce_brokers))

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/core1/simple_consumer_shell_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core1/simple_consumer_shell_test.py b/tests/kafkatest/tests/core1/simple_consumer_shell_test.py
deleted file mode 100644
index 74a7eeb..0000000
--- a/tests/kafkatest/tests/core1/simple_consumer_shell_test.py
+++ /dev/null
@@ -1,75 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from ducktape.utils.util import wait_until
-from ducktape.tests.test import Test
-from kafkatest.services.simple_consumer_shell import SimpleConsumerShell
-from kafkatest.services.verifiable_producer import VerifiableProducer
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-TOPIC = "topic-simple-consumer-shell"
-MAX_MESSAGES = 100
-NUM_PARTITIONS = 1
-REPLICATION_FACTOR = 1
-
-class SimpleConsumerShellTest(Test):
-    """
-    Tests SimpleConsumerShell tool
-    """
-    def __init__(self, test_context):
-        super(SimpleConsumerShellTest, self).__init__(test_context)
-        self.num_zk = 1
-        self.num_brokers = 1
-        self.messages_received_count = 0
-        self.topics = {
-            TOPIC: {'partitions': NUM_PARTITIONS, 'replication-factor': REPLICATION_FACTOR}
-        }
-
-        self.zk = ZookeeperService(test_context, self.num_zk)
-
-    def setUp(self):
-        self.zk.start()
-
-    def start_kafka(self):
-        self.kafka = KafkaService(
-            self.test_context, self.num_brokers,
-            self.zk, topics=self.topics)
-        self.kafka.start()
-
-    def run_producer(self):
-        # This will produce to kafka cluster
-        self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka, topic=TOPIC, throughput=1000, max_messages=MAX_MESSAGES)
-        self.producer.start()
-        wait_until(lambda: self.producer.num_acked == MAX_MESSAGES, timeout_sec=10,
-                   err_msg="Timeout awaiting messages to be produced and acked")
-
-    def start_simple_consumer_shell(self):
-        self.simple_consumer_shell = SimpleConsumerShell(self.test_context, 1, self.kafka, TOPIC)
-        self.simple_consumer_shell.start()
-
-    def test_simple_consumer_shell(self):
-        """
-        Tests if SimpleConsumerShell is fetching expected records
-        :return: None
-        """
-        self.start_kafka()
-        self.run_producer()
-        self.start_simple_consumer_shell()
-
-        # Assert that SimpleConsumerShell is fetching expected number of messages
-        wait_until(lambda: self.simple_consumer_shell.get_output().count("\n") == (MAX_MESSAGES + 1), timeout_sec=10,
-                   err_msg="Timed out waiting to receive expected number of messages.")
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/core1/throttling_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core1/throttling_test.py b/tests/kafkatest/tests/core1/throttling_test.py
deleted file mode 100644
index 2e21322..0000000
--- a/tests/kafkatest/tests/core1/throttling_test.py
+++ /dev/null
@@ -1,173 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import time
-import math
-from ducktape.mark import parametrize
-from ducktape.utils.util import wait_until
-
-from kafkatest.services.performance import ProducerPerformanceService
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.utils import is_int
-
-
-class ThrottlingTest(ProduceConsumeValidateTest):
-    """Tests throttled partition reassignment. This is essentially similar
-    to the reassign_partitions_test, except that we throttle the reassignment
-    and verify that it takes a sensible amount of time given the throttle
-    and the amount of data being moved.
-
-    Since the correctness is time dependent, this test also simplifies the
-    cluster topology. In particular, we fix the number of brokers, the
-    replication-factor, the number of partitions, the partition size, and
-    the number of partitions being moved so that we can accurately predict
-    the time throttled reassignment should take.
-    """
-
-    def __init__(self, test_context):
-        """:type test_context: ducktape.tests.test.TestContext"""
-        super(ThrottlingTest, self).__init__(test_context=test_context)
-
-        self.topic = "test_topic"
-        self.zk = ZookeeperService(test_context, num_nodes=1)
-        # Because we are starting the producer/consumer/validate cycle _after_
-        # seeding the cluster with big data (to test throttling), we need to
-        # Start the consumer from the end of the stream. further, we need to
-        # ensure that the consumer is fully started before the producer starts
-        # so that we don't miss any messages. This timeout ensures the sufficient
-        # condition.
-        self.consumer_init_timeout_sec =  10
-        self.num_brokers = 6
-        self.num_partitions = 3
-        self.kafka = KafkaService(test_context,
-                                  num_nodes=self.num_brokers,
-                                  zk=self.zk,
-                                  topics={
-                                      self.topic: {
-                                          "partitions": self.num_partitions,
-                                          "replication-factor": 2,
-                                          "configs": {
-                                              "segment.bytes": 64 * 1024 * 1024
-                                          }
-                                      }
-                                  })
-        self.producer_throughput = 1000
-        self.timeout_sec = 400
-        self.num_records = 2000
-        self.record_size = 4096 * 100  # 400 KB
-        # 1 MB per partition on average.
-        self.partition_size = (self.num_records * self.record_size) / self.num_partitions
-        self.num_producers = 2
-        self.num_consumers = 1
-        self.throttle = 4 * 1024 * 1024  # 4 MB/s
-
-    def setUp(self):
-        self.zk.start()
-
-    def min_cluster_size(self):
-        # Override this since we're adding services outside of the constructor
-        return super(ThrottlingTest, self).min_cluster_size() +\
-            self.num_producers + self.num_consumers
-
-    def clean_bounce_some_brokers(self):
-        """Bounce every other broker"""
-        for node in self.kafka.nodes[::2]:
-            self.kafka.restart_node(node, clean_shutdown=True)
-
-    def reassign_partitions(self, bounce_brokers, throttle):
-        """This method reassigns partitions using a throttle. It makes an
-        assertion about the minimum amount of time the reassignment should take
-        given the value of the throttle, the number of partitions being moved,
-        and the size of each partition.
-        """
-        partition_info = self.kafka.parse_describe_topic(
-            self.kafka.describe_topic(self.topic))
-        self.logger.debug("Partitions before reassignment:" +
-                          str(partition_info))
-        max_num_moves = 0
-        for i in range(0, self.num_partitions):
-            old_replicas = set(partition_info["partitions"][i]["replicas"])
-            new_part = (i+1) % self.num_partitions
-            new_replicas = set(partition_info["partitions"][new_part]["replicas"])
-            max_num_moves = max(len(new_replicas - old_replicas), max_num_moves)
-            partition_info["partitions"][i]["partition"] = new_part
-        self.logger.debug("Jumbled partitions: " + str(partition_info))
-
-        self.kafka.execute_reassign_partitions(partition_info,
-                                               throttle=throttle)
-        start = time.time()
-        if bounce_brokers:
-            # bounce a few brokers at the same time
-            self.clean_bounce_some_brokers()
-
-        # Wait until finished or timeout
-        size_per_broker = max_num_moves * self.partition_size
-        self.logger.debug("Max amount of data transfer per broker: %fb",
-                          size_per_broker)
-        estimated_throttled_time = math.ceil(float(size_per_broker) /
-                                             self.throttle)
-        estimated_time_with_buffer = estimated_throttled_time * 2
-        self.logger.debug("Waiting %ds for the reassignment to complete",
-                          estimated_time_with_buffer)
-        wait_until(lambda: self.kafka.verify_reassign_partitions(partition_info),
-                   timeout_sec=estimated_time_with_buffer, backoff_sec=.5)
-        stop = time.time()
-        time_taken = stop - start
-        self.logger.debug("Transfer took %d second. Estimated time : %ds",
-                          time_taken,
-                          estimated_throttled_time)
-        assert time_taken >= estimated_throttled_time, \
-            ("Expected rebalance to take at least %ds, but it took %ds" % (
-                estimated_throttled_time,
-                time_taken))
-
-    @parametrize(bounce_brokers=False)
-    @parametrize(bounce_brokers=True)
-    def test_throttled_reassignment(self, bounce_brokers):
-        security_protocol = 'PLAINTEXT'
-        self.kafka.security_protocol = security_protocol
-        self.kafka.interbroker_security_protocol = security_protocol
-
-        producer_id = 'bulk_producer'
-        bulk_producer = ProducerPerformanceService(
-            context=self.test_context, num_nodes=1, kafka=self.kafka,
-            topic=self.topic, num_records=self.num_records,
-            record_size=self.record_size, throughput=-1, client_id=producer_id,
-            jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % producer_id],
-            jmx_attributes=['outgoing-byte-rate'])
-
-
-        self.producer = VerifiableProducer(context=self.test_context,
-                                           num_nodes=1,
-                                           kafka=self.kafka, topic=self.topic,
-                                           message_validator=is_int,
-                                           throughput=self.producer_throughput)
-
-        self.consumer = ConsoleConsumer(self.test_context,
-                                        self.num_consumers,
-                                        self.kafka,
-                                        self.topic,
-                                        consumer_timeout_ms=60000,
-                                        message_validator=is_int,
-                                        from_beginning=False)
-
-        self.kafka.start()
-        bulk_producer.run()
-        self.run_produce_consume_validate(core_test_action=
-                                          lambda: self.reassign_partitions(bounce_brokers, self.throttle))

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/core2/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core2/__init__.py b/tests/kafkatest/tests/core2/__init__.py
deleted file mode 100644
index ec20143..0000000
--- a/tests/kafkatest/tests/core2/__init__.py
+++ /dev/null
@@ -1,14 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/core2/compatibility_test_new_broker_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core2/compatibility_test_new_broker_test.py b/tests/kafkatest/tests/core2/compatibility_test_new_broker_test.py
deleted file mode 100644
index d6a0a12..0000000
--- a/tests/kafkatest/tests/core2/compatibility_test_new_broker_test.py
+++ /dev/null
@@ -1,80 +0,0 @@
-# Copyright 2015 Confluent Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.mark import parametrize
-from ducktape.utils.util import wait_until
-
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.kafka import config_property
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.utils import is_int
-from kafkatest.version import LATEST_0_10_0, LATEST_0_9, LATEST_0_8_2, TRUNK, KafkaVersion
-
-
-# Compatibility tests for moving to a new broker (e.g., 0.10.x) and using a mix of old and new clients (e.g., 0.9.x)
-class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
-
-    def __init__(self, test_context):
-        super(ClientCompatibilityTestNewBroker, self).__init__(test_context=test_context)
-
-    def setUp(self):
-        self.topic = "test_topic"
-        self.zk = ZookeeperService(self.test_context, num_nodes=1)
-            
-        self.zk.start()
-
-        # Producer and consumer
-        self.producer_throughput = 10000
-        self.num_producers = 1
-        self.num_consumers = 1
-        self.messages_per_producer = 1000
-
-    @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2), compression_types=["none"], new_consumer=False, timestamp_type=None)
-    @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False, timestamp_type=None)
-    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["none"], new_consumer=False, timestamp_type=None)
-    @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False, timestamp_type=None)
-    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["snappy"], timestamp_type=None)
-    @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["snappy"], timestamp_type=str("CreateTime"))
-    @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
-    @parametrize(producer_version=str(LATEST_0_10_0), consumer_version=str(LATEST_0_10_0), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
-    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
-    @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["none"], new_consumer=False, timestamp_type=str("LogAppendTime"))
-    def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=True, timestamp_type=None):
-       
-        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK, topics={self.topic: {
-                                                                    "partitions": 3,
-                                                                    "replication-factor": 3,
-                                                                    'configs': {"min.insync.replicas": 2}}})
-        for node in self.kafka.nodes:
-            if timestamp_type is not None:
-                node.config[config_property.MESSAGE_TIMESTAMP_TYPE] = timestamp_type
-        self.kafka.start()
-         
-        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
-                                           self.topic, throughput=self.producer_throughput,
-                                           message_validator=is_int,
-                                           compression_types=compression_types,
-                                           version=KafkaVersion(producer_version))
-
-        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
-                                        self.topic, consumer_timeout_ms=30000, new_consumer=new_consumer,
-                                        message_validator=is_int, version=KafkaVersion(consumer_version))
-
-        self.run_produce_consume_validate(lambda: wait_until(
-            lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
-            timeout_sec=120, backoff_sec=1,
-            err_msg="Producer did not produce all messages in reasonable amount of time"))

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/mirror_maker/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/mirror_maker/__init__.py b/tests/kafkatest/tests/mirror_maker/__init__.py
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/mirror_maker/mirror_maker_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/mirror_maker/mirror_maker_test.py b/tests/kafkatest/tests/mirror_maker/mirror_maker_test.py
deleted file mode 100644
index afb1972..0000000
--- a/tests/kafkatest/tests/mirror_maker/mirror_maker_test.py
+++ /dev/null
@@ -1,179 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.utils.util import wait_until
-from ducktape.mark import parametrize, matrix, ignore
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.mirror_maker import MirrorMaker
-from kafkatest.services.security.minikdc import MiniKdc
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.utils import is_int
-
-import time
-
-
-class TestMirrorMakerService(ProduceConsumeValidateTest):
-    """Sanity checks on mirror maker service class."""
-    def __init__(self, test_context):
-        super(TestMirrorMakerService, self).__init__(test_context)
-
-        self.topic = "topic"
-        self.source_zk = ZookeeperService(test_context, num_nodes=1)
-        self.target_zk = ZookeeperService(test_context, num_nodes=1)
-
-        self.source_kafka = KafkaService(test_context, num_nodes=1, zk=self.source_zk,
-                                  topics={self.topic: {"partitions": 1, "replication-factor": 1}})
-        self.target_kafka = KafkaService(test_context, num_nodes=1, zk=self.target_zk,
-                                  topics={self.topic: {"partitions": 1, "replication-factor": 1}})
-        # This will produce to source kafka cluster
-        self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.source_kafka, topic=self.topic,
-                                           throughput=1000)
-        self.mirror_maker = MirrorMaker(test_context, num_nodes=1, source=self.source_kafka, target=self.target_kafka,
-                                        whitelist=self.topic, offset_commit_interval_ms=1000)
-        # This will consume from target kafka cluster
-        self.consumer = ConsoleConsumer(test_context, num_nodes=1, kafka=self.target_kafka, topic=self.topic,
-                                        message_validator=is_int, consumer_timeout_ms=60000)
-
-    def setUp(self):
-        # Source cluster
-        self.source_zk.start()
-
-        # Target cluster
-        self.target_zk.start()
-
-    def start_kafka(self, security_protocol):
-        self.source_kafka.security_protocol = security_protocol
-        self.source_kafka.interbroker_security_protocol = security_protocol
-        self.target_kafka.security_protocol = security_protocol
-        self.target_kafka.interbroker_security_protocol = security_protocol
-        if self.source_kafka.security_config.has_sasl_kerberos:
-            minikdc = MiniKdc(self.source_kafka.context, self.source_kafka.nodes + self.target_kafka.nodes)
-            self.source_kafka.minikdc = minikdc
-            self.target_kafka.minikdc = minikdc
-            minikdc.start()
-        self.source_kafka.start()
-        self.target_kafka.start()
-
-    def bounce(self, clean_shutdown=True):
-        """Bounce mirror maker with a clean (kill -15) or hard (kill -9) shutdown"""
-
-        # Wait until messages start appearing in the target cluster
-        wait_until(lambda: len(self.consumer.messages_consumed[1]) > 0, timeout_sec=15)
-
-        # Wait for at least one offset to be committed.
-        #
-        # This step is necessary to prevent data loss with default mirror maker settings:
-        # currently, if we don't have at least one committed offset,
-        # and we bounce mirror maker, the consumer internals will throw OffsetOutOfRangeException, and the default
-        # auto.offset.reset policy ("largest") will kick in, causing mirrormaker to start consuming from the largest
-        # offset. As a result, any messages produced to the source cluster while mirrormaker was dead won't get
-        # mirrored to the target cluster.
-        # (see https://issues.apache.org/jira/browse/KAFKA-2759)
-        #
-        # This isn't necessary with kill -15 because mirror maker commits its offsets during graceful
-        # shutdown.
-        if not clean_shutdown:
-            time.sleep(self.mirror_maker.offset_commit_interval_ms / 1000.0 + .5)
-
-        for i in range(3):
-            self.logger.info("Bringing mirror maker nodes down...")
-            for node in self.mirror_maker.nodes:
-                self.mirror_maker.stop_node(node, clean_shutdown=clean_shutdown)
-
-            num_consumed = len(self.consumer.messages_consumed[1])
-            self.logger.info("Bringing mirror maker nodes back up...")
-            for node in self.mirror_maker.nodes:
-                self.mirror_maker.start_node(node)
-
-            # Ensure new messages are once again showing up on the target cluster
-            # new consumer requires higher timeout here
-            wait_until(lambda: len(self.consumer.messages_consumed[1]) > num_consumed + 100, timeout_sec=60)
-
-    def wait_for_n_messages(self, n_messages=100):
-        """Wait for a minimum number of messages to be successfully produced."""
-        wait_until(lambda: self.producer.num_acked > n_messages, timeout_sec=10,
-                     err_msg="Producer failed to produce %d messages in a reasonable amount of time." % n_messages)
-
-    @parametrize(security_protocol='PLAINTEXT', new_consumer=False)
-    @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'], new_consumer=[True])
-    def test_simple_end_to_end(self, security_protocol, new_consumer):
-        """
-        Test end-to-end behavior under non-failure conditions.
-
-        Setup: two single node Kafka clusters, each connected to its own single node zookeeper cluster.
-        One is source, and the other is target. Single-node mirror maker mirrors from source to target.
-
-        - Start mirror maker.
-        - Produce a small number of messages to the source cluster.
-        - Consume messages from target.
-        - Verify that number of consumed messages matches the number produced.
-        """
-        self.start_kafka(security_protocol)
-        self.consumer.new_consumer = new_consumer
-
-        self.mirror_maker.new_consumer = new_consumer
-        self.mirror_maker.start()
-
-        mm_node = self.mirror_maker.nodes[0]
-        with mm_node.account.monitor_log(self.mirror_maker.LOG_FILE) as monitor:
-            if new_consumer:
-                monitor.wait_until("Resetting offset for partition", timeout_sec=30, err_msg="Mirrormaker did not reset fetch offset in a reasonable amount of time.")
-            else:
-                monitor.wait_until("reset fetch offset", timeout_sec=30, err_msg="Mirrormaker did not reset fetch offset in a reasonable amount of time.")
-
-        self.run_produce_consume_validate(core_test_action=self.wait_for_n_messages)
-        self.mirror_maker.stop()
-
-    @matrix(offsets_storage=["kafka", "zookeeper"], new_consumer=[False], clean_shutdown=[True, False])
-    @matrix(new_consumer=[True], clean_shutdown=[True, False], security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'])
-    def test_bounce(self, offsets_storage="kafka", new_consumer=True, clean_shutdown=True, security_protocol='PLAINTEXT'):
-        """
-        Test end-to-end behavior under failure conditions.
-
-        Setup: two single node Kafka clusters, each connected to its own single node zookeeper cluster.
-        One is source, and the other is target. Single-node mirror maker mirrors from source to target.
-
-        - Start mirror maker.
-        - Produce to source cluster, and consume from target cluster in the background.
-        - Bounce MM process
-        - Verify every message acknowledged by the source producer is consumed by the target consumer
-        """
-        if new_consumer and not clean_shutdown:
-            # Increase timeout on downstream console consumer; mirror maker with new consumer takes extra time
-            # during hard bounce. This is because the restarted mirror maker consumer won't be able to rejoin
-            # the group until the previous session times out
-            self.consumer.consumer_timeout_ms = 60000
-
-        self.start_kafka(security_protocol)
-        self.consumer.new_consumer = new_consumer
-
-        self.mirror_maker.offsets_storage = offsets_storage
-        self.mirror_maker.new_consumer = new_consumer
-        self.mirror_maker.start()
-
-        # Wait until mirror maker has reset fetch offset at least once before continuing with the rest of the test
-        mm_node = self.mirror_maker.nodes[0]
-        with mm_node.account.monitor_log(self.mirror_maker.LOG_FILE) as monitor:
-            if new_consumer:
-                monitor.wait_until("Resetting offset for partition", timeout_sec=30, err_msg="Mirrormaker did not reset fetch offset in a reasonable amount of time.")
-            else:
-                monitor.wait_until("reset fetch offset", timeout_sec=30, err_msg="Mirrormaker did not reset fetch offset in a reasonable amount of time.")
-
-        self.run_produce_consume_validate(core_test_action=lambda: self.bounce(clean_shutdown=clean_shutdown))
-        self.mirror_maker.stop()

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/produce_consume_validate.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/produce_consume_validate.py b/tests/kafkatest/tests/produce_consume_validate.py
index 3b54ad7..801ccde 100644
--- a/tests/kafkatest/tests/produce_consume_validate.py
+++ b/tests/kafkatest/tests/produce_consume_validate.py
@@ -12,7 +12,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import traceback
 
 from ducktape.tests.test import Test
 from ducktape.utils.util import wait_until
@@ -103,7 +102,7 @@ class ProduceConsumeValidateTest(Test):
         except BaseException as e:
             for s in self.test_context.services:
                 self.mark_for_collect(s)
-            raise Exception(traceback.format_exc(e))
+            raise
 
     @staticmethod
     def annotate_missing_msgs(missing, acked, consumed, msg):

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/replication/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/replication/__init__.py b/tests/kafkatest/tests/replication/__init__.py
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/replication/replication_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/replication/replication_test.py b/tests/kafkatest/tests/replication/replication_test.py
deleted file mode 100644
index f815034..0000000
--- a/tests/kafkatest/tests/replication/replication_test.py
+++ /dev/null
@@ -1,154 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.utils.util import wait_until
-
-from ducktape.mark import matrix
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.utils import is_int
-
-import signal
-
-def broker_node(test, broker_type):
-    """ Discover node of requested type. For leader type, discovers leader for our topic and partition 0
-    """
-    if broker_type == "leader":
-        node = test.kafka.leader(test.topic, partition=0)
-    elif broker_type == "controller":
-        node = test.kafka.controller()
-    else:
-        raise Exception("Unexpected broker type %s." % (broker_type))
-
-    return node
-
-def clean_shutdown(test, broker_type):
-    """Discover broker node of requested type and shut it down cleanly.
-    """
-    node = broker_node(test, broker_type)
-    test.kafka.signal_node(node, sig=signal.SIGTERM)
-
-
-def hard_shutdown(test, broker_type):
-    """Discover broker node of requested type and shut it down with a hard kill."""
-    node = broker_node(test, broker_type)
-    test.kafka.signal_node(node, sig=signal.SIGKILL)
-
-
-def clean_bounce(test, broker_type):
-    """Chase the leader of one partition and restart it cleanly."""
-    for i in range(5):
-        prev_broker_node = broker_node(test, broker_type)
-        test.kafka.restart_node(prev_broker_node, clean_shutdown=True)
-
-
-def hard_bounce(test, broker_type):
-    """Chase the leader and restart it with a hard kill."""
-    for i in range(5):
-        prev_broker_node = broker_node(test, broker_type)
-        test.kafka.signal_node(prev_broker_node, sig=signal.SIGKILL)
-
-        # Since this is a hard kill, we need to make sure the process is down and that
-        # zookeeper has registered the loss by expiring the broker's session timeout.
-
-        wait_until(lambda: len(test.kafka.pids(prev_broker_node)) == 0 and not test.kafka.is_registered(prev_broker_node),
-                   timeout_sec=test.kafka.zk_session_timeout + 5,
-                   err_msg="Failed to see timely deregistration of hard-killed broker %s" % str(prev_broker_node.account))
-
-        test.kafka.start_node(prev_broker_node)
-
-failures = {
-    "clean_shutdown": clean_shutdown,
-    "hard_shutdown": hard_shutdown,
-    "clean_bounce": clean_bounce,
-    "hard_bounce": hard_bounce
-}
-
-
-class ReplicationTest(ProduceConsumeValidateTest):
-    """
-    Note that consuming is a bit tricky, at least with console consumer. The goal is to consume all messages
-    (foreach partition) in the topic. In this case, waiting for the last message may cause the consumer to stop
-    too soon since console consumer is consuming multiple partitions from a single thread and therefore we lose
-    ordering guarantees.
-
-    Waiting on a count of consumed messages can be unreliable: if we stop consuming when num_consumed == num_acked,
-    we might exit early if some messages are duplicated (though not an issue here since producer retries==0)
-
-    Therefore rely here on the consumer.timeout.ms setting which times out on the interval between successively
-    consumed messages. Since we run the producer to completion before running the consumer, this is a reliable
-    indicator that nothing is left to consume.
-    """
-
-    def __init__(self, test_context):
-        """:type test_context: ducktape.tests.test.TestContext"""
-        super(ReplicationTest, self).__init__(test_context=test_context)
-
-        self.topic = "test_topic"
-        self.zk = ZookeeperService(test_context, num_nodes=1)
-        self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
-                                                                    "partitions": 3,
-                                                                    "replication-factor": 3,
-                                                                    'configs': {"min.insync.replicas": 2}}
-                                                                })
-        self.producer_throughput = 1000
-        self.num_producers = 1
-        self.num_consumers = 1
-
-    def setUp(self):
-        self.zk.start()
-
-    def min_cluster_size(self):
-        """Override this since we're adding services outside of the constructor"""
-        return super(ReplicationTest, self).min_cluster_size() + self.num_producers + self.num_consumers
-
-
-    @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
-            broker_type=["leader"],
-            security_protocol=["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"])
-    @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
-            broker_type=["controller"],
-            security_protocol=["PLAINTEXT", "SASL_SSL"])
-    @matrix(failure_mode=["hard_bounce"],
-            broker_type=["leader"],
-            security_protocol=["SASL_SSL"], client_sasl_mechanism=["PLAIN"], interbroker_sasl_mechanism=["PLAIN", "GSSAPI"])
-    def test_replication_with_broker_failure(self, failure_mode, security_protocol, broker_type, client_sasl_mechanism="GSSAPI", interbroker_sasl_mechanism="GSSAPI"):
-        """Replication tests.
-        These tests verify that replication provides simple durability guarantees by checking that data acked by
-        brokers is still available for consumption in the face of various failure scenarios.
-
-        Setup: 1 zk, 3 kafka nodes, 1 topic with partitions=3, replication-factor=3, and min.insync.replicas=2
-
-            - Produce messages in the background
-            - Consume messages in the background
-            - Drive broker failures (shutdown, or bounce repeatedly with kill -15 or kill -9)
-            - When done driving failures, stop producing, and finish consuming
-            - Validate that every acked message was consumed
-        """
-
-        self.kafka.security_protocol = security_protocol
-        self.kafka.interbroker_security_protocol = security_protocol
-        self.kafka.client_sasl_mechanism = client_sasl_mechanism
-        self.kafka.interbroker_sasl_mechanism = interbroker_sasl_mechanism
-        new_consumer = False if  self.kafka.security_protocol == "PLAINTEXT" else True
-        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
-        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, message_validator=is_int)
-        self.kafka.start()
-        
-        self.run_produce_consume_validate(core_test_action=lambda: failures[failure_mode](self, broker_type))

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/security1/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/security1/__init__.py b/tests/kafkatest/tests/security1/__init__.py
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/security1/security_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/security1/security_test.py b/tests/kafkatest/tests/security1/security_test.py
deleted file mode 100644
index b6bc656..0000000
--- a/tests/kafkatest/tests/security1/security_test.py
+++ /dev/null
@@ -1,106 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.mark import parametrize
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.services.security.security_config import SecurityConfig
-from kafkatest.services.security.security_config import SslStores
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.utils import is_int
-import time
-
-class TestSslStores(SslStores):
-    def __init__(self):
-        super(TestSslStores, self).__init__()
-        self.invalid_hostname = False
-        self.generate_ca()
-        self.generate_truststore()
-
-    def hostname(self, node):
-        if (self.invalid_hostname):
-            return "invalidhost"
-        else:
-            return super(TestSslStores, self).hostname(node)
-
-class SecurityTest(ProduceConsumeValidateTest):
-    """
-    These tests validate security features.
-    """
-
-    def __init__(self, test_context):
-        """:type test_context: ducktape.tests.test.TestContext"""
-        super(SecurityTest, self).__init__(test_context=test_context)
-
-        self.topic = "test_topic"
-        self.zk = ZookeeperService(test_context, num_nodes=1)
-        self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk, topics={self.topic: {
-                                                                    "partitions": 2,
-                                                                    "replication-factor": 1}
-                                                                })
-        self.num_partitions = 2
-        self.timeout_sec = 10000
-        self.producer_throughput = 1000
-        self.num_producers = 1
-        self.num_consumers = 1
-
-    def setUp(self):
-        self.zk.start()
-
-    @parametrize(security_protocol='PLAINTEXT', interbroker_security_protocol='SSL')
-    @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
-    def test_client_ssl_endpoint_validation_failure(self, security_protocol, interbroker_security_protocol):
-        """
-        Test that invalid hostname in certificate results in connection failures.
-        When security_protocol=SSL, client SSL handshakes are expected to fail due to hostname verification failure.
-        When security_protocol=PLAINTEXT and interbroker_security_protocol=SSL, controller connections fail
-        with hostname verification failure. Hence clients are expected to fail with LEADER_NOT_AVAILABLE.
-        """
-
-        self.kafka.security_protocol = security_protocol
-        self.kafka.interbroker_security_protocol = interbroker_security_protocol
-        SecurityConfig.ssl_stores = TestSslStores()
-
-        SecurityConfig.ssl_stores.invalid_hostname = True
-        self.kafka.start()
-        self.create_producer_and_consumer()
-        self.producer.log_level = "TRACE"
-        self.producer.start()
-        self.consumer.start()
-        time.sleep(10)
-        assert self.producer.num_acked == 0, "Messages published successfully, endpoint validation did not fail with invalid hostname"
-        error = 'SSLHandshakeException' if security_protocol is 'SSL' else 'LEADER_NOT_AVAILABLE'
-        for node in self.producer.nodes:
-            node.account.ssh("grep %s %s" % (error, self.producer.LOG_FILE))
-        for node in self.consumer.nodes:
-            node.account.ssh("grep %s %s" % (error, self.consumer.LOG_FILE))
-
-        self.producer.stop()
-        self.consumer.stop()
-        self.producer.log_level = "INFO"
-
-        SecurityConfig.ssl_stores.invalid_hostname = False
-        for node in self.kafka.nodes:
-            self.kafka.restart_node(node, clean_shutdown=True)
-        self.create_producer_and_consumer()
-        self.run_produce_consume_validate()
-
-    def create_producer_and_consumer(self):
-        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
-        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=10000, message_validator=is_int)
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/security1/zookeeper_security_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/security1/zookeeper_security_upgrade_test.py b/tests/kafkatest/tests/security1/zookeeper_security_upgrade_test.py
deleted file mode 100644
index 0cfdf16..0000000
--- a/tests/kafkatest/tests/security1/zookeeper_security_upgrade_test.py
+++ /dev/null
@@ -1,115 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.mark import matrix
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.services.security.kafka_acls import ACLs
-from kafkatest.utils import is_int
-
-class ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest):
-    """Tests a rolling upgrade for zookeeper.
-    """
-
-    def __init__(self, test_context):
-        super(ZooKeeperSecurityUpgradeTest, self).__init__(test_context=test_context)
-
-    def setUp(self):
-        self.topic = "test_topic"
-        self.group = "group"
-        self.producer_throughput = 100
-        self.num_producers = 1
-        self.num_consumers = 1
-        self.acls = ACLs(self.test_context)
-
-        self.zk = ZookeeperService(self.test_context, num_nodes=3)
-
-        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
-            "partitions": 3,
-            "replication-factor": 3,
-            'configs': {"min.insync.replicas": 2}}})
-
-    def create_producer_and_consumer(self):
-        self.producer = VerifiableProducer(
-            self.test_context, self.num_producers, self.kafka, self.topic,
-            throughput=self.producer_throughput)
-
-        self.consumer = ConsoleConsumer(
-            self.test_context, self.num_consumers, self.kafka, self.topic,
-            consumer_timeout_ms=60000, message_validator=is_int)
-
-        self.consumer.group_id = self.group
-
-    @property
-    def no_sasl(self):
-        return self.kafka.security_protocol == "PLAINTEXT" or self.kafka.security_protocol == "SSL"
-
-    @property
-    def is_secure(self):
-        return self.kafka.security_protocol == "SASL_PLAINTEXT" \
-               or self.kafka.security_protocol == "SSL" \
-               or self.kafka.security_protocol == "SASL_SSL"
-
-    def run_zk_migration(self):
-        # change zk config (auth provider + jaas login)
-        self.zk.kafka_opts = self.zk.security_system_properties
-        self.zk.zk_sasl = True
-        if self.no_sasl:
-            self.kafka.start_minikdc(self.zk.zk_principals)
-        # restart zk
-        for node in self.zk.nodes:
-            self.zk.stop_node(node)
-            self.zk.start_node(node)
-
-        # restart broker with jaas login
-        for node in self.kafka.nodes:
-            self.kafka.stop_node(node)
-            self.kafka.start_node(node)
-
-        # run migration tool
-        for node in self.zk.nodes:
-            self.zk.zookeeper_migration(node, "secure")
-
-        # restart broker with zookeeper.set.acl=true and acls
-        self.kafka.zk_set_acl = True
-        for node in self.kafka.nodes:
-            self.kafka.stop_node(node)
-            self.kafka.start_node(node)
-
-    @matrix(security_protocol=["PLAINTEXT","SSL","SASL_SSL","SASL_PLAINTEXT"])
-    def test_zk_security_upgrade(self, security_protocol):
-        self.zk.start()
-        self.kafka.security_protocol = security_protocol
-        self.kafka.interbroker_security_protocol = security_protocol
-
-        # set acls
-        if self.is_secure:
-            self.kafka.authorizer_class_name = KafkaService.SIMPLE_AUTHORIZER
-            self.acls.set_acls(security_protocol, self.kafka, self.zk, self.topic, self.group)
-
-        if(self.no_sasl):
-            self.kafka.start()
-        else:
-            self.kafka.start(self.zk.zk_principals)
-
-        #Create Producer and Consumer
-        self.create_producer_and_consumer()
-
-        #Run upgrade
-        self.run_produce_consume_validate(self.run_zk_migration)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5d28149/tests/kafkatest/tests/security2/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/security2/__init__.py b/tests/kafkatest/tests/security2/__init__.py
deleted file mode 100644
index e69de29..0000000


Mime
View raw message