kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: HOTFIX: Renamed tests to match expected suffix
Date Fri, 18 Mar 2016 19:02:02 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk e89a9ce1a -> 0d8cbbcb2


HOTFIX: Renamed tests to match expected suffix

ewencp gwenshap granders could you have a look please? Thanks.

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Ewen Cheslack-Postava <ewen@confuent.io>

Closes #1096 from enothereska/systest-hotfix-name


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0d8cbbcb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0d8cbbcb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0d8cbbcb

Branch: refs/heads/trunk
Commit: 0d8cbbcb208ccaf1cb84df0440331d4cef064391
Parents: e89a9ce
Author: Eno Thereska <eno.thereska@gmail.com>
Authored: Fri Mar 18 12:01:56 2016 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Fri Mar 18 12:01:56 2016 -0700

----------------------------------------------------------------------
 .../tests/compatibility_test_new_broker.py      | 78 -----------------
 .../tests/compatibility_test_new_broker_test.py | 78 +++++++++++++++++
 .../kafkatest/tests/consumer_rolling_upgrade.py | 82 -----------------
 .../tests/consumer_rolling_upgrade_test.py      | 82 +++++++++++++++++
 tests/kafkatest/tests/message_format_change.py  | 92 --------------------
 .../tests/message_format_change_test.py         | 92 ++++++++++++++++++++
 6 files changed, 252 insertions(+), 252 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0d8cbbcb/tests/kafkatest/tests/compatibility_test_new_broker.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/compatibility_test_new_broker.py b/tests/kafkatest/tests/compatibility_test_new_broker.py
deleted file mode 100644
index 2c261df..0000000
--- a/tests/kafkatest/tests/compatibility_test_new_broker.py
+++ /dev/null
@@ -1,78 +0,0 @@
-# Copyright 2015 Confluent Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.tests.test import Test
-from ducktape.mark import parametrize
-from ducktape.utils.util import wait_until
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.kafka.version import LATEST_0_9, LATEST_0_8_2, TRUNK, KafkaVersion
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.utils import is_int
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.services.kafka import config_property
-
-# Compatibility tests for moving to a new broker (e.g., 0.10.x) and using a mix of old and
new clients (e.g., 0.9.x)
-class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
-
-    def __init__(self, test_context):
-        super(ClientCompatibilityTestNewBroker, self).__init__(test_context=test_context)
-
-    def setUp(self):
-        self.topic = "test_topic"
-        self.zk = ZookeeperService(self.test_context, num_nodes=1)
-            
-        self.zk.start()
-
-        # Producer and consumer
-        self.producer_throughput = 10000
-        self.num_producers = 1
-        self.num_consumers = 1
-        self.messages_per_producer = 1000
-
-    @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2),
compression_types=["none"], timestamp_type=None)
-    @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_9), compression_types=["none"],
timestamp_type=None)
-    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["none"],
timestamp_type=None)
-    @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["none"],
timestamp_type=None)
-    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["snappy"],
new_consumer=True, timestamp_type=None)
-    @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["snappy"],
new_consumer=True, timestamp_type=str("CreateTime"))
-    @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["snappy"],
new_consumer=True, timestamp_type=str("LogAppendTime"))
-    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9), compression_types=["snappy"],
new_consumer=True, timestamp_type=str("LogAppendTime"))
-    @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["none"],
timestamp_type=str("LogAppendTime"))
-    def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=False,
timestamp_type=None):
-       
-        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK,
topics={self.topic: {
-                                                                    "partitions": 3,
-                                                                    "replication-factor":
3,
-                                                                    'configs': {"min.insync.replicas":
2}}})
-        for node in self.kafka.nodes:
-            if timestamp_type is not None:
-                node.config[config_property.MESSAGE_TIMESTAMP_TYPE] = timestamp_type
-        self.kafka.start()
-         
-        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
-                                           self.topic, throughput=self.producer_throughput,
-                                           message_validator=is_int,
-                                           compression_types=compression_types,
-                                           version=KafkaVersion(producer_version))
-
-        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
-                                        self.topic, consumer_timeout_ms=30000, new_consumer=new_consumer,
-                                        message_validator=is_int, version=KafkaVersion(consumer_version))
-
-        self.run_produce_consume_validate(lambda: wait_until(
-            lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
-            timeout_sec=120, backoff_sec=1,
-            err_msg="Producer did not produce all messages in reasonable amount of time"))

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d8cbbcb/tests/kafkatest/tests/compatibility_test_new_broker_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/compatibility_test_new_broker_test.py b/tests/kafkatest/tests/compatibility_test_new_broker_test.py
new file mode 100644
index 0000000..2c261df
--- /dev/null
+++ b/tests/kafkatest/tests/compatibility_test_new_broker_test.py
@@ -0,0 +1,78 @@
+# Copyright 2015 Confluent Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+from ducktape.mark import parametrize
+from ducktape.utils.util import wait_until
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka.version import LATEST_0_9, LATEST_0_8_2, TRUNK, KafkaVersion
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.utils import is_int
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.services.kafka import config_property
+
+# Compatibility tests for moving to a new broker (e.g., 0.10.x) and using a mix of old and
new clients (e.g., 0.9.x)
+class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
+
+    def __init__(self, test_context):
+        super(ClientCompatibilityTestNewBroker, self).__init__(test_context=test_context)
+
+    def setUp(self):
+        self.topic = "test_topic"
+        self.zk = ZookeeperService(self.test_context, num_nodes=1)
+            
+        self.zk.start()
+
+        # Producer and consumer
+        self.producer_throughput = 10000
+        self.num_producers = 1
+        self.num_consumers = 1
+        self.messages_per_producer = 1000
+
+    @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2),
compression_types=["none"], timestamp_type=None)
+    @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_9), compression_types=["none"],
timestamp_type=None)
+    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["none"],
timestamp_type=None)
+    @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["none"],
timestamp_type=None)
+    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["snappy"],
new_consumer=True, timestamp_type=None)
+    @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["snappy"],
new_consumer=True, timestamp_type=str("CreateTime"))
+    @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["snappy"],
new_consumer=True, timestamp_type=str("LogAppendTime"))
+    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9), compression_types=["snappy"],
new_consumer=True, timestamp_type=str("LogAppendTime"))
+    @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["none"],
timestamp_type=str("LogAppendTime"))
+    def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=False,
timestamp_type=None):
+       
+        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK,
topics={self.topic: {
+                                                                    "partitions": 3,
+                                                                    "replication-factor":
3,
+                                                                    'configs': {"min.insync.replicas":
2}}})
+        for node in self.kafka.nodes:
+            if timestamp_type is not None:
+                node.config[config_property.MESSAGE_TIMESTAMP_TYPE] = timestamp_type
+        self.kafka.start()
+         
+        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
+                                           self.topic, throughput=self.producer_throughput,
+                                           message_validator=is_int,
+                                           compression_types=compression_types,
+                                           version=KafkaVersion(producer_version))
+
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
+                                        self.topic, consumer_timeout_ms=30000, new_consumer=new_consumer,
+                                        message_validator=is_int, version=KafkaVersion(consumer_version))
+
+        self.run_produce_consume_validate(lambda: wait_until(
+            lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
+            timeout_sec=120, backoff_sec=1,
+            err_msg="Producer did not produce all messages in reasonable amount of time"))

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d8cbbcb/tests/kafkatest/tests/consumer_rolling_upgrade.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/consumer_rolling_upgrade.py b/tests/kafkatest/tests/consumer_rolling_upgrade.py
deleted file mode 100644
index 3cd3c7c..0000000
--- a/tests/kafkatest/tests/consumer_rolling_upgrade.py
+++ /dev/null
@@ -1,82 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.utils.util import wait_until
-
-from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
-from kafkatest.services.kafka import TopicPartition
-
-class ConsumerRollingUpgradeTest(VerifiableConsumerTest):
-    TOPIC = "test_topic"
-    NUM_PARTITIONS = 4
-    RANGE = "org.apache.kafka.clients.consumer.RangeAssignor"
-    ROUND_ROBIN = "org.apache.kafka.clients.consumer.RoundRobinAssignor"
-
-    def __init__(self, test_context):
-        super(ConsumerRollingUpgradeTest, self).__init__(test_context, num_consumers=2, num_producers=0,
-                                                         num_zk=1, num_brokers=1, topics={
-            self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 }
-        })
-
-    def _verify_range_assignment(self, consumer):
-        # range assignment should give us two partition sets: (0, 1) and (2, 3)
-        assignment = set([frozenset(partitions) for partitions in consumer.current_assignment().values()])
-        assert assignment == set([
-            frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 1)]),
-            frozenset([TopicPartition(self.TOPIC, 2), TopicPartition(self.TOPIC, 3)])])
-
-    def _verify_roundrobin_assignment(self, consumer):
-        assignment = set([frozenset(x) for x in consumer.current_assignment().values()])
-        assert assignment == set([
-            frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 2)]),
-            frozenset([TopicPartition(self.TOPIC, 1), TopicPartition(self.TOPIC, 3)])])
-
-    def rolling_update_test(self):
-        """
-        Verify rolling updates of partition assignment strategies works correctly. In this
-        test, we use a rolling restart to change the group's assignment strategy from "range"

-        to "roundrobin." We verify after every restart that all members are still in the
group
-        and that the correct assignment strategy was used.
-        """
-
-        # initialize the consumer using range assignment
-        consumer = self.setup_consumer(self.TOPIC, assignment_strategy=self.RANGE)
-
-        consumer.start()
-        self.await_all_members(consumer)
-        self._verify_range_assignment(consumer)
-
-        # change consumer configuration to prefer round-robin assignment, but still support
range assignment
-        consumer.assignment_strategy = self.ROUND_ROBIN + "," + self.RANGE
-
-        # restart one of the nodes and verify that we are still using range assignment
-        consumer.stop_node(consumer.nodes[0])
-        consumer.start_node(consumer.nodes[0])
-        self.await_all_members(consumer)
-        self._verify_range_assignment(consumer)
-        
-        # now restart the other node and verify that we have switched to round-robin
-        consumer.stop_node(consumer.nodes[1])
-        consumer.start_node(consumer.nodes[1])
-        self.await_all_members(consumer)
-        self._verify_roundrobin_assignment(consumer)
-
-        # if we want, we can now drop support for range assignment
-        consumer.assignment_strategy = self.ROUND_ROBIN
-        for node in consumer.nodes:
-            consumer.stop_node(node)
-            consumer.start_node(node)
-            self.await_all_members(consumer)
-            self._verify_roundrobin_assignment(consumer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d8cbbcb/tests/kafkatest/tests/consumer_rolling_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/consumer_rolling_upgrade_test.py b/tests/kafkatest/tests/consumer_rolling_upgrade_test.py
new file mode 100644
index 0000000..3cd3c7c
--- /dev/null
+++ b/tests/kafkatest/tests/consumer_rolling_upgrade_test.py
@@ -0,0 +1,82 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.utils.util import wait_until
+
+from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
+from kafkatest.services.kafka import TopicPartition
+
+class ConsumerRollingUpgradeTest(VerifiableConsumerTest):
+    TOPIC = "test_topic"
+    NUM_PARTITIONS = 4
+    RANGE = "org.apache.kafka.clients.consumer.RangeAssignor"
+    ROUND_ROBIN = "org.apache.kafka.clients.consumer.RoundRobinAssignor"
+
+    def __init__(self, test_context):
+        super(ConsumerRollingUpgradeTest, self).__init__(test_context, num_consumers=2, num_producers=0,
+                                                         num_zk=1, num_brokers=1, topics={
+            self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 }
+        })
+
+    def _verify_range_assignment(self, consumer):
+        # range assignment should give us two partition sets: (0, 1) and (2, 3)
+        assignment = set([frozenset(partitions) for partitions in consumer.current_assignment().values()])
+        assert assignment == set([
+            frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 1)]),
+            frozenset([TopicPartition(self.TOPIC, 2), TopicPartition(self.TOPIC, 3)])])
+
+    def _verify_roundrobin_assignment(self, consumer):
+        assignment = set([frozenset(x) for x in consumer.current_assignment().values()])
+        assert assignment == set([
+            frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 2)]),
+            frozenset([TopicPartition(self.TOPIC, 1), TopicPartition(self.TOPIC, 3)])])
+
+    def rolling_update_test(self):
+        """
+        Verify rolling updates of partition assignment strategies works correctly. In this
+        test, we use a rolling restart to change the group's assignment strategy from "range"

+        to "roundrobin." We verify after every restart that all members are still in the
group
+        and that the correct assignment strategy was used.
+        """
+
+        # initialize the consumer using range assignment
+        consumer = self.setup_consumer(self.TOPIC, assignment_strategy=self.RANGE)
+
+        consumer.start()
+        self.await_all_members(consumer)
+        self._verify_range_assignment(consumer)
+
+        # change consumer configuration to prefer round-robin assignment, but still support
range assignment
+        consumer.assignment_strategy = self.ROUND_ROBIN + "," + self.RANGE
+
+        # restart one of the nodes and verify that we are still using range assignment
+        consumer.stop_node(consumer.nodes[0])
+        consumer.start_node(consumer.nodes[0])
+        self.await_all_members(consumer)
+        self._verify_range_assignment(consumer)
+        
+        # now restart the other node and verify that we have switched to round-robin
+        consumer.stop_node(consumer.nodes[1])
+        consumer.start_node(consumer.nodes[1])
+        self.await_all_members(consumer)
+        self._verify_roundrobin_assignment(consumer)
+
+        # if we want, we can now drop support for range assignment
+        consumer.assignment_strategy = self.ROUND_ROBIN
+        for node in consumer.nodes:
+            consumer.stop_node(node)
+            consumer.start_node(node)
+            self.await_all_members(consumer)
+            self._verify_roundrobin_assignment(consumer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d8cbbcb/tests/kafkatest/tests/message_format_change.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/message_format_change.py b/tests/kafkatest/tests/message_format_change.py
deleted file mode 100644
index 357fd17..0000000
--- a/tests/kafkatest/tests/message_format_change.py
+++ /dev/null
@@ -1,92 +0,0 @@
-# Copyright 2015 Confluent Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.tests.test import Test
-from ducktape.mark import parametrize
-from ducktape.utils.util import wait_until
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.kafka.version import LATEST_0_9, LATEST_0_10, TRUNK, KafkaVersion
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.utils import is_int
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.services.kafka import config_property
-import time
-
-
-class MessageFormatChangeTest(ProduceConsumeValidateTest):
-
-    def __init__(self, test_context):
-        super(MessageFormatChangeTest, self).__init__(test_context=test_context)
-
-    def setUp(self):
-        self.topic = "test_topic"
-        self.zk = ZookeeperService(self.test_context, num_nodes=1)
-            
-        self.zk.start()
-
-        # Producer and consumer
-        self.producer_throughput = 10000
-        self.num_producers = 1
-        self.num_consumers = 1
-        self.messages_per_producer = 100
-
-    def produce_and_consume(self, producer_version, consumer_version, group):
-        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
-                                           self.topic,
-                                           throughput=self.producer_throughput,
-                                           message_validator=is_int,
-                                           version=KafkaVersion(producer_version))
-        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
-                                        self.topic, consumer_timeout_ms=30000,
-                                        message_validator=is_int, version=KafkaVersion(consumer_version))
-        self.consumer.group_id = group
-        self.run_produce_consume_validate(lambda: wait_until(
-            lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
-            timeout_sec=120, backoff_sec=1,
-            err_msg="Producer did not produce all messages in reasonable amount of time"))
-        
-    @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK))
-    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9))
-    def test_compatibility(self, producer_version, consumer_version):
-        """ This tests performs the following checks:
-        The workload is a mix of 0.9.x and 0.10.x producers and consumers 
-        that produce to and consume from a 0.10.x cluster
-        1. initially the topic is using message format 0.9.0
-        2. change the message format version for topic to 0.10.0 on the fly.
-        3. change the message format version for topic back to 0.9.0 on the fly.
-        - The producers and consumers should not have any issue.
-        - Note that for 0.9.x consumers/producers we only do steps 1 and 2
-        """
-        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK,
topics={self.topic: {
-                                                                    "partitions": 3,
-                                                                    "replication-factor":
3,
-                                                                    'configs': {"min.insync.replicas":
2}}})
-       
-        self.kafka.start()
-        self.logger.info("First format change to 0.9.0")
-        self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
-        self.produce_and_consume(producer_version, consumer_version, "group1")
-
-        self.logger.info("Second format change to 0.10.0")
-        self.kafka.alter_message_format(self.topic, str(LATEST_0_10))
-        self.produce_and_consume(producer_version, consumer_version, "group2")
-
-        if producer_version == str(TRUNK) and consumer_version == str(TRUNK):
-            self.logger.info("Third format change back to 0.9.0")
-            self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
-            self.produce_and_consume(producer_version, consumer_version, "group3")
-
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d8cbbcb/tests/kafkatest/tests/message_format_change_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/message_format_change_test.py b/tests/kafkatest/tests/message_format_change_test.py
new file mode 100644
index 0000000..357fd17
--- /dev/null
+++ b/tests/kafkatest/tests/message_format_change_test.py
@@ -0,0 +1,92 @@
+# Copyright 2015 Confluent Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+from ducktape.mark import parametrize
+from ducktape.utils.util import wait_until
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka.version import LATEST_0_9, LATEST_0_10, TRUNK, KafkaVersion
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.utils import is_int
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.services.kafka import config_property
+import time
+
+
+class MessageFormatChangeTest(ProduceConsumeValidateTest):
+
+    def __init__(self, test_context):
+        super(MessageFormatChangeTest, self).__init__(test_context=test_context)
+
+    def setUp(self):
+        self.topic = "test_topic"
+        self.zk = ZookeeperService(self.test_context, num_nodes=1)
+            
+        self.zk.start()
+
+        # Producer and consumer
+        self.producer_throughput = 10000
+        self.num_producers = 1
+        self.num_consumers = 1
+        self.messages_per_producer = 100
+
+    def produce_and_consume(self, producer_version, consumer_version, group):
+        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
+                                           self.topic,
+                                           throughput=self.producer_throughput,
+                                           message_validator=is_int,
+                                           version=KafkaVersion(producer_version))
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
+                                        self.topic, consumer_timeout_ms=30000,
+                                        message_validator=is_int, version=KafkaVersion(consumer_version))
+        self.consumer.group_id = group
+        self.run_produce_consume_validate(lambda: wait_until(
+            lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
+            timeout_sec=120, backoff_sec=1,
+            err_msg="Producer did not produce all messages in reasonable amount of time"))
+        
+    @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK))
+    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9))
+    def test_compatibility(self, producer_version, consumer_version):
+        """ This tests performs the following checks:
+        The workload is a mix of 0.9.x and 0.10.x producers and consumers 
+        that produce to and consume from a 0.10.x cluster
+        1. initially the topic is using message format 0.9.0
+        2. change the message format version for topic to 0.10.0 on the fly.
+        3. change the message format version for topic back to 0.9.0 on the fly.
+        - The producers and consumers should not have any issue.
+        - Note that for 0.9.x consumers/producers we only do steps 1 and 2
+        """
+        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK,
topics={self.topic: {
+                                                                    "partitions": 3,
+                                                                    "replication-factor":
3,
+                                                                    'configs': {"min.insync.replicas":
2}}})
+       
+        self.kafka.start()
+        self.logger.info("First format change to 0.9.0")
+        self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
+        self.produce_and_consume(producer_version, consumer_version, "group1")
+
+        self.logger.info("Second format change to 0.10.0")
+        self.kafka.alter_message_format(self.topic, str(LATEST_0_10))
+        self.produce_and_consume(producer_version, consumer_version, "group2")
+
+        if producer_version == str(TRUNK) and consumer_version == str(TRUNK):
+            self.logger.info("Third format change back to 0.9.0")
+            self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
+            self.produce_and_consume(producer_version, consumer_version, "group3")
+
+


Mime
View raw message