Repository: kafka
Updated Branches:
refs/heads/trunk e89a9ce1a -> 0d8cbbcb2
HOTFIX: Renamed tests to match expected suffix
ewencp gwenshap granders could you have a look please? Thanks.
Author: Eno Thereska <eno.thereska@gmail.com>
Reviewers: Ewen Cheslack-Postava <ewen@confuent.io>
Closes #1096 from enothereska/systest-hotfix-name
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0d8cbbcb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0d8cbbcb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0d8cbbcb
Branch: refs/heads/trunk
Commit: 0d8cbbcb208ccaf1cb84df0440331d4cef064391
Parents: e89a9ce
Author: Eno Thereska <eno.thereska@gmail.com>
Authored: Fri Mar 18 12:01:56 2016 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Fri Mar 18 12:01:56 2016 -0700
----------------------------------------------------------------------
.../tests/compatibility_test_new_broker.py | 78 -----------------
.../tests/compatibility_test_new_broker_test.py | 78 +++++++++++++++++
.../kafkatest/tests/consumer_rolling_upgrade.py | 82 -----------------
.../tests/consumer_rolling_upgrade_test.py | 82 +++++++++++++++++
tests/kafkatest/tests/message_format_change.py | 92 --------------------
.../tests/message_format_change_test.py | 92 ++++++++++++++++++++
6 files changed, 252 insertions(+), 252 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/0d8cbbcb/tests/kafkatest/tests/compatibility_test_new_broker.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/compatibility_test_new_broker.py b/tests/kafkatest/tests/compatibility_test_new_broker.py
deleted file mode 100644
index 2c261df..0000000
--- a/tests/kafkatest/tests/compatibility_test_new_broker.py
+++ /dev/null
@@ -1,78 +0,0 @@
-# Copyright 2015 Confluent Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.tests.test import Test
-from ducktape.mark import parametrize
-from ducktape.utils.util import wait_until
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.kafka.version import LATEST_0_9, LATEST_0_8_2, TRUNK, KafkaVersion
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.utils import is_int
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.services.kafka import config_property
-
-# Compatibility tests for moving to a new broker (e.g., 0.10.x) and using a mix of old and
new clients (e.g., 0.9.x)
-class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
-
- def __init__(self, test_context):
- super(ClientCompatibilityTestNewBroker, self).__init__(test_context=test_context)
-
- def setUp(self):
- self.topic = "test_topic"
- self.zk = ZookeeperService(self.test_context, num_nodes=1)
-
- self.zk.start()
-
- # Producer and consumer
- self.producer_throughput = 10000
- self.num_producers = 1
- self.num_consumers = 1
- self.messages_per_producer = 1000
-
- @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2),
compression_types=["none"], timestamp_type=None)
- @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_9), compression_types=["none"],
timestamp_type=None)
- @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["none"],
timestamp_type=None)
- @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["none"],
timestamp_type=None)
- @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["snappy"],
new_consumer=True, timestamp_type=None)
- @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["snappy"],
new_consumer=True, timestamp_type=str("CreateTime"))
- @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["snappy"],
new_consumer=True, timestamp_type=str("LogAppendTime"))
- @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9), compression_types=["snappy"],
new_consumer=True, timestamp_type=str("LogAppendTime"))
- @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["none"],
timestamp_type=str("LogAppendTime"))
- def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=False,
timestamp_type=None):
-
- self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK,
topics={self.topic: {
- "partitions": 3,
- "replication-factor":
3,
- 'configs': {"min.insync.replicas":
2}}})
- for node in self.kafka.nodes:
- if timestamp_type is not None:
- node.config[config_property.MESSAGE_TIMESTAMP_TYPE] = timestamp_type
- self.kafka.start()
-
- self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
- self.topic, throughput=self.producer_throughput,
- message_validator=is_int,
- compression_types=compression_types,
- version=KafkaVersion(producer_version))
-
- self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
- self.topic, consumer_timeout_ms=30000, new_consumer=new_consumer,
- message_validator=is_int, version=KafkaVersion(consumer_version))
-
- self.run_produce_consume_validate(lambda: wait_until(
- lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
- timeout_sec=120, backoff_sec=1,
- err_msg="Producer did not produce all messages in reasonable amount of time"))
http://git-wip-us.apache.org/repos/asf/kafka/blob/0d8cbbcb/tests/kafkatest/tests/compatibility_test_new_broker_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/compatibility_test_new_broker_test.py b/tests/kafkatest/tests/compatibility_test_new_broker_test.py
new file mode 100644
index 0000000..2c261df
--- /dev/null
+++ b/tests/kafkatest/tests/compatibility_test_new_broker_test.py
@@ -0,0 +1,78 @@
+# Copyright 2015 Confluent Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+from ducktape.mark import parametrize
+from ducktape.utils.util import wait_until
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka.version import LATEST_0_9, LATEST_0_8_2, TRUNK, KafkaVersion
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.utils import is_int
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.services.kafka import config_property
+
+# Compatibility tests for moving to a new broker (e.g., 0.10.x) and using a mix of old and
new clients (e.g., 0.9.x)
+class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
+
+ def __init__(self, test_context):
+ super(ClientCompatibilityTestNewBroker, self).__init__(test_context=test_context)
+
+ def setUp(self):
+ self.topic = "test_topic"
+ self.zk = ZookeeperService(self.test_context, num_nodes=1)
+
+ self.zk.start()
+
+ # Producer and consumer
+ self.producer_throughput = 10000
+ self.num_producers = 1
+ self.num_consumers = 1
+ self.messages_per_producer = 1000
+
+ @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2),
compression_types=["none"], timestamp_type=None)
+ @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_9), compression_types=["none"],
timestamp_type=None)
+ @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["none"],
timestamp_type=None)
+ @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["none"],
timestamp_type=None)
+ @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["snappy"],
new_consumer=True, timestamp_type=None)
+ @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["snappy"],
new_consumer=True, timestamp_type=str("CreateTime"))
+ @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["snappy"],
new_consumer=True, timestamp_type=str("LogAppendTime"))
+ @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9), compression_types=["snappy"],
new_consumer=True, timestamp_type=str("LogAppendTime"))
+ @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["none"],
timestamp_type=str("LogAppendTime"))
+ def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=False,
timestamp_type=None):
+
+ self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK,
topics={self.topic: {
+ "partitions": 3,
+ "replication-factor":
3,
+ 'configs': {"min.insync.replicas":
2}}})
+ for node in self.kafka.nodes:
+ if timestamp_type is not None:
+ node.config[config_property.MESSAGE_TIMESTAMP_TYPE] = timestamp_type
+ self.kafka.start()
+
+ self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
+ self.topic, throughput=self.producer_throughput,
+ message_validator=is_int,
+ compression_types=compression_types,
+ version=KafkaVersion(producer_version))
+
+ self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
+ self.topic, consumer_timeout_ms=30000, new_consumer=new_consumer,
+ message_validator=is_int, version=KafkaVersion(consumer_version))
+
+ self.run_produce_consume_validate(lambda: wait_until(
+ lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
+ timeout_sec=120, backoff_sec=1,
+ err_msg="Producer did not produce all messages in reasonable amount of time"))
http://git-wip-us.apache.org/repos/asf/kafka/blob/0d8cbbcb/tests/kafkatest/tests/consumer_rolling_upgrade.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/consumer_rolling_upgrade.py b/tests/kafkatest/tests/consumer_rolling_upgrade.py
deleted file mode 100644
index 3cd3c7c..0000000
--- a/tests/kafkatest/tests/consumer_rolling_upgrade.py
+++ /dev/null
@@ -1,82 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.utils.util import wait_until
-
-from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
-from kafkatest.services.kafka import TopicPartition
-
-class ConsumerRollingUpgradeTest(VerifiableConsumerTest):
- TOPIC = "test_topic"
- NUM_PARTITIONS = 4
- RANGE = "org.apache.kafka.clients.consumer.RangeAssignor"
- ROUND_ROBIN = "org.apache.kafka.clients.consumer.RoundRobinAssignor"
-
- def __init__(self, test_context):
- super(ConsumerRollingUpgradeTest, self).__init__(test_context, num_consumers=2, num_producers=0,
- num_zk=1, num_brokers=1, topics={
- self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 }
- })
-
- def _verify_range_assignment(self, consumer):
- # range assignment should give us two partition sets: (0, 1) and (2, 3)
- assignment = set([frozenset(partitions) for partitions in consumer.current_assignment().values()])
- assert assignment == set([
- frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 1)]),
- frozenset([TopicPartition(self.TOPIC, 2), TopicPartition(self.TOPIC, 3)])])
-
- def _verify_roundrobin_assignment(self, consumer):
- assignment = set([frozenset(x) for x in consumer.current_assignment().values()])
- assert assignment == set([
- frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 2)]),
- frozenset([TopicPartition(self.TOPIC, 1), TopicPartition(self.TOPIC, 3)])])
-
- def rolling_update_test(self):
- """
- Verify rolling updates of partition assignment strategies works correctly. In this
- test, we use a rolling restart to change the group's assignment strategy from "range"
- to "roundrobin." We verify after every restart that all members are still in the
group
- and that the correct assignment strategy was used.
- """
-
- # initialize the consumer using range assignment
- consumer = self.setup_consumer(self.TOPIC, assignment_strategy=self.RANGE)
-
- consumer.start()
- self.await_all_members(consumer)
- self._verify_range_assignment(consumer)
-
- # change consumer configuration to prefer round-robin assignment, but still support
range assignment
- consumer.assignment_strategy = self.ROUND_ROBIN + "," + self.RANGE
-
- # restart one of the nodes and verify that we are still using range assignment
- consumer.stop_node(consumer.nodes[0])
- consumer.start_node(consumer.nodes[0])
- self.await_all_members(consumer)
- self._verify_range_assignment(consumer)
-
- # now restart the other node and verify that we have switched to round-robin
- consumer.stop_node(consumer.nodes[1])
- consumer.start_node(consumer.nodes[1])
- self.await_all_members(consumer)
- self._verify_roundrobin_assignment(consumer)
-
- # if we want, we can now drop support for range assignment
- consumer.assignment_strategy = self.ROUND_ROBIN
- for node in consumer.nodes:
- consumer.stop_node(node)
- consumer.start_node(node)
- self.await_all_members(consumer)
- self._verify_roundrobin_assignment(consumer)
http://git-wip-us.apache.org/repos/asf/kafka/blob/0d8cbbcb/tests/kafkatest/tests/consumer_rolling_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/consumer_rolling_upgrade_test.py b/tests/kafkatest/tests/consumer_rolling_upgrade_test.py
new file mode 100644
index 0000000..3cd3c7c
--- /dev/null
+++ b/tests/kafkatest/tests/consumer_rolling_upgrade_test.py
@@ -0,0 +1,82 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.utils.util import wait_until
+
+from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
+from kafkatest.services.kafka import TopicPartition
+
+class ConsumerRollingUpgradeTest(VerifiableConsumerTest):
+ TOPIC = "test_topic"
+ NUM_PARTITIONS = 4
+ RANGE = "org.apache.kafka.clients.consumer.RangeAssignor"
+ ROUND_ROBIN = "org.apache.kafka.clients.consumer.RoundRobinAssignor"
+
+ def __init__(self, test_context):
+ super(ConsumerRollingUpgradeTest, self).__init__(test_context, num_consumers=2, num_producers=0,
+ num_zk=1, num_brokers=1, topics={
+ self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 }
+ })
+
+ def _verify_range_assignment(self, consumer):
+ # range assignment should give us two partition sets: (0, 1) and (2, 3)
+ assignment = set([frozenset(partitions) for partitions in consumer.current_assignment().values()])
+ assert assignment == set([
+ frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 1)]),
+ frozenset([TopicPartition(self.TOPIC, 2), TopicPartition(self.TOPIC, 3)])])
+
+ def _verify_roundrobin_assignment(self, consumer):
+ assignment = set([frozenset(x) for x in consumer.current_assignment().values()])
+ assert assignment == set([
+ frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 2)]),
+ frozenset([TopicPartition(self.TOPIC, 1), TopicPartition(self.TOPIC, 3)])])
+
+ def rolling_update_test(self):
+ """
+ Verify rolling updates of partition assignment strategies works correctly. In this
+ test, we use a rolling restart to change the group's assignment strategy from "range"
+ to "roundrobin." We verify after every restart that all members are still in the
group
+ and that the correct assignment strategy was used.
+ """
+
+ # initialize the consumer using range assignment
+ consumer = self.setup_consumer(self.TOPIC, assignment_strategy=self.RANGE)
+
+ consumer.start()
+ self.await_all_members(consumer)
+ self._verify_range_assignment(consumer)
+
+ # change consumer configuration to prefer round-robin assignment, but still support
range assignment
+ consumer.assignment_strategy = self.ROUND_ROBIN + "," + self.RANGE
+
+ # restart one of the nodes and verify that we are still using range assignment
+ consumer.stop_node(consumer.nodes[0])
+ consumer.start_node(consumer.nodes[0])
+ self.await_all_members(consumer)
+ self._verify_range_assignment(consumer)
+
+ # now restart the other node and verify that we have switched to round-robin
+ consumer.stop_node(consumer.nodes[1])
+ consumer.start_node(consumer.nodes[1])
+ self.await_all_members(consumer)
+ self._verify_roundrobin_assignment(consumer)
+
+ # if we want, we can now drop support for range assignment
+ consumer.assignment_strategy = self.ROUND_ROBIN
+ for node in consumer.nodes:
+ consumer.stop_node(node)
+ consumer.start_node(node)
+ self.await_all_members(consumer)
+ self._verify_roundrobin_assignment(consumer)
http://git-wip-us.apache.org/repos/asf/kafka/blob/0d8cbbcb/tests/kafkatest/tests/message_format_change.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/message_format_change.py b/tests/kafkatest/tests/message_format_change.py
deleted file mode 100644
index 357fd17..0000000
--- a/tests/kafkatest/tests/message_format_change.py
+++ /dev/null
@@ -1,92 +0,0 @@
-# Copyright 2015 Confluent Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.tests.test import Test
-from ducktape.mark import parametrize
-from ducktape.utils.util import wait_until
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.kafka.version import LATEST_0_9, LATEST_0_10, TRUNK, KafkaVersion
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.utils import is_int
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.services.kafka import config_property
-import time
-
-
-class MessageFormatChangeTest(ProduceConsumeValidateTest):
-
- def __init__(self, test_context):
- super(MessageFormatChangeTest, self).__init__(test_context=test_context)
-
- def setUp(self):
- self.topic = "test_topic"
- self.zk = ZookeeperService(self.test_context, num_nodes=1)
-
- self.zk.start()
-
- # Producer and consumer
- self.producer_throughput = 10000
- self.num_producers = 1
- self.num_consumers = 1
- self.messages_per_producer = 100
-
- def produce_and_consume(self, producer_version, consumer_version, group):
- self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
- self.topic,
- throughput=self.producer_throughput,
- message_validator=is_int,
- version=KafkaVersion(producer_version))
- self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
- self.topic, consumer_timeout_ms=30000,
- message_validator=is_int, version=KafkaVersion(consumer_version))
- self.consumer.group_id = group
- self.run_produce_consume_validate(lambda: wait_until(
- lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
- timeout_sec=120, backoff_sec=1,
- err_msg="Producer did not produce all messages in reasonable amount of time"))
-
- @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK))
- @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9))
- def test_compatibility(self, producer_version, consumer_version):
- """ This tests performs the following checks:
- The workload is a mix of 0.9.x and 0.10.x producers and consumers
- that produce to and consume from a 0.10.x cluster
- 1. initially the topic is using message format 0.9.0
- 2. change the message format version for topic to 0.10.0 on the fly.
- 3. change the message format version for topic back to 0.9.0 on the fly.
- - The producers and consumers should not have any issue.
- - Note that for 0.9.x consumers/producers we only do steps 1 and 2
- """
- self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK,
topics={self.topic: {
- "partitions": 3,
- "replication-factor":
3,
- 'configs': {"min.insync.replicas":
2}}})
-
- self.kafka.start()
- self.logger.info("First format change to 0.9.0")
- self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
- self.produce_and_consume(producer_version, consumer_version, "group1")
-
- self.logger.info("Second format change to 0.10.0")
- self.kafka.alter_message_format(self.topic, str(LATEST_0_10))
- self.produce_and_consume(producer_version, consumer_version, "group2")
-
- if producer_version == str(TRUNK) and consumer_version == str(TRUNK):
- self.logger.info("Third format change back to 0.9.0")
- self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
- self.produce_and_consume(producer_version, consumer_version, "group3")
-
-
http://git-wip-us.apache.org/repos/asf/kafka/blob/0d8cbbcb/tests/kafkatest/tests/message_format_change_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/message_format_change_test.py b/tests/kafkatest/tests/message_format_change_test.py
new file mode 100644
index 0000000..357fd17
--- /dev/null
+++ b/tests/kafkatest/tests/message_format_change_test.py
@@ -0,0 +1,92 @@
+# Copyright 2015 Confluent Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+from ducktape.mark import parametrize
+from ducktape.utils.util import wait_until
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka.version import LATEST_0_9, LATEST_0_10, TRUNK, KafkaVersion
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.utils import is_int
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.services.kafka import config_property
+import time
+
+
+class MessageFormatChangeTest(ProduceConsumeValidateTest):
+
+ def __init__(self, test_context):
+ super(MessageFormatChangeTest, self).__init__(test_context=test_context)
+
+ def setUp(self):
+ self.topic = "test_topic"
+ self.zk = ZookeeperService(self.test_context, num_nodes=1)
+
+ self.zk.start()
+
+ # Producer and consumer
+ self.producer_throughput = 10000
+ self.num_producers = 1
+ self.num_consumers = 1
+ self.messages_per_producer = 100
+
+ def produce_and_consume(self, producer_version, consumer_version, group):
+ self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
+ self.topic,
+ throughput=self.producer_throughput,
+ message_validator=is_int,
+ version=KafkaVersion(producer_version))
+ self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
+ self.topic, consumer_timeout_ms=30000,
+ message_validator=is_int, version=KafkaVersion(consumer_version))
+ self.consumer.group_id = group
+ self.run_produce_consume_validate(lambda: wait_until(
+ lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
+ timeout_sec=120, backoff_sec=1,
+ err_msg="Producer did not produce all messages in reasonable amount of time"))
+
+ @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK))
+ @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9))
+ def test_compatibility(self, producer_version, consumer_version):
+ """ This tests performs the following checks:
+ The workload is a mix of 0.9.x and 0.10.x producers and consumers
+ that produce to and consume from a 0.10.x cluster
+ 1. initially the topic is using message format 0.9.0
+ 2. change the message format version for topic to 0.10.0 on the fly.
+ 3. change the message format version for topic back to 0.9.0 on the fly.
+ - The producers and consumers should not have any issue.
+ - Note that for 0.9.x consumers/producers we only do steps 1 and 2
+ """
+ self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK,
topics={self.topic: {
+ "partitions": 3,
+ "replication-factor":
3,
+ 'configs': {"min.insync.replicas":
2}}})
+
+ self.kafka.start()
+ self.logger.info("First format change to 0.9.0")
+ self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
+ self.produce_and_consume(producer_version, consumer_version, "group1")
+
+ self.logger.info("Second format change to 0.10.0")
+ self.kafka.alter_message_format(self.topic, str(LATEST_0_10))
+ self.produce_and_consume(producer_version, consumer_version, "group2")
+
+ if producer_version == str(TRUNK) and consumer_version == str(TRUNK):
+ self.logger.info("Third format change back to 0.9.0")
+ self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
+ self.produce_and_consume(producer_version, consumer_version, "group3")
+
+
|