kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-3202: System test that changes message version on the fly
Date Thu, 17 Mar 2016 22:37:40 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 579d473ce -> f57dabbe5


KAFKA-3202: System test that changes message version on the fly

becketqin apovzner please have a look. becketqin the test fails when the producer and consumer
are 0.9.x and the message format changes on the fly.

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Ewen Cheslack-Postava, Ismael Juma, Gwen Shapira

Closes #1070 from enothereska/kafka-3202-format-change-fly


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f57dabbe
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f57dabbe
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f57dabbe

Branch: refs/heads/trunk
Commit: f57dabbe56b3db40c06a1946d4cecd0a144b6ac4
Parents: 579d473
Author: Eno Thereska <eno.thereska@gmail.com>
Authored: Thu Mar 17 15:37:37 2016 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Thu Mar 17 15:37:37 2016 -0700

----------------------------------------------------------------------
 tests/kafkatest/services/kafka/kafka.py        | 13 ++-
 tests/kafkatest/services/kafka/version.py      |  4 +
 tests/kafkatest/tests/message_format_change.py | 92 +++++++++++++++++++++
 3 files changed, 107 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f57dabbe/tests/kafkatest/services/kafka/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index 788d41b..33ece35 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -242,7 +242,7 @@ class KafkaService(JmxMixin, Service):
         cmd += "--zookeeper %(zk_connect)s --create --topic %(topic)s --partitions %(partitions)d
--replication-factor %(replication)d" % {
                 'zk_connect': self.zk.connect_setting(),
                 'topic': topic_cfg.get("topic"),
-                'partitions': topic_cfg.get('partitions', 1),
+                'partitions': topic_cfg.get('partitions', 1), 
                 'replication': topic_cfg.get('replication-factor', 1)
             }
 
@@ -267,6 +267,15 @@ class KafkaService(JmxMixin, Service):
         for line in node.account.ssh_capture(cmd):
             output += line
         return output
+    
+    def alter_message_format(self, topic, msg_format_version, node=None):
+        if node is None:
+            node = self.nodes[0]
+        self.logger.info("Altering message format version for topic %s with format %s", topic,
msg_format_version)
+        cmd = "/opt/%s/bin/kafka-configs.sh --zookeeper %s --entity-name %s --entity-type
topics --alter --add-config message.format.version=%s" % \
+              (kafka_dir(node), self.zk.connect_setting(), topic, msg_format_version)
+        self.logger.info("Running alter message format command...\n%s" % cmd)
+        node.account.ssh(cmd)
 
     def parse_describe_topic(self, topic_description):
         """Parse output of kafka-topics.sh --describe (or describe_topic() method above),
which is a string of form
@@ -508,4 +517,4 @@ class KafkaService(JmxMixin, Service):
         for line in node.account.ssh_capture(cmd):
             output += line
         self.logger.debug(output)
-        return output
\ No newline at end of file
+        return output

http://git-wip-us.apache.org/repos/asf/kafka/blob/f57dabbe/tests/kafkatest/services/kafka/version.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/version.py b/tests/kafkatest/services/kafka/version.py
index 761d91b..dc2582b 100644
--- a/tests/kafkatest/services/kafka/version.py
+++ b/tests/kafkatest/services/kafka/version.py
@@ -63,3 +63,7 @@ LATEST_0_8_2 = V_0_8_2_2
 V_0_9_0_0 = KafkaVersion("0.9.0.0")
 V_0_9_0_1 = KafkaVersion("0.9.0.1")
 LATEST_0_9 = V_0_9_0_1
+
+# 0.10.0.X versions
+V_0_10_0_0 = KafkaVersion("0.10.0.0")
+LATEST_0_10 = V_0_10_0_0

http://git-wip-us.apache.org/repos/asf/kafka/blob/f57dabbe/tests/kafkatest/tests/message_format_change.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/message_format_change.py b/tests/kafkatest/tests/message_format_change.py
new file mode 100644
index 0000000..357fd17
--- /dev/null
+++ b/tests/kafkatest/tests/message_format_change.py
@@ -0,0 +1,92 @@
+# Copyright 2015 Confluent Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+from ducktape.mark import parametrize
+from ducktape.utils.util import wait_until
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka.version import LATEST_0_9, LATEST_0_10, TRUNK, KafkaVersion
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.utils import is_int
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.services.kafka import config_property
+import time
+
+
+class MessageFormatChangeTest(ProduceConsumeValidateTest):
+
+    def __init__(self, test_context):
+        super(MessageFormatChangeTest, self).__init__(test_context=test_context)
+
+    def setUp(self):
+        self.topic = "test_topic"
+        self.zk = ZookeeperService(self.test_context, num_nodes=1)
+            
+        self.zk.start()
+
+        # Producer and consumer
+        self.producer_throughput = 10000
+        self.num_producers = 1
+        self.num_consumers = 1
+        self.messages_per_producer = 100
+
+    def produce_and_consume(self, producer_version, consumer_version, group):
+        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
+                                           self.topic,
+                                           throughput=self.producer_throughput,
+                                           message_validator=is_int,
+                                           version=KafkaVersion(producer_version))
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
+                                        self.topic, consumer_timeout_ms=30000,
+                                        message_validator=is_int, version=KafkaVersion(consumer_version))
+        self.consumer.group_id = group
+        self.run_produce_consume_validate(lambda: wait_until(
+            lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
+            timeout_sec=120, backoff_sec=1,
+            err_msg="Producer did not produce all messages in reasonable amount of time"))
+        
+    @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK))
+    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9))
+    def test_compatibility(self, producer_version, consumer_version):
+        """ This tests performs the following checks:
+        The workload is a mix of 0.9.x and 0.10.x producers and consumers 
+        that produce to and consume from a 0.10.x cluster
+        1. initially the topic is using message format 0.9.0
+        2. change the message format version for topic to 0.10.0 on the fly.
+        3. change the message format version for topic back to 0.9.0 on the fly.
+        - The producers and consumers should not have any issue.
+        - Note that for 0.9.x consumers/producers we only do steps 1 and 2
+        """
+        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK,
topics={self.topic: {
+                                                                    "partitions": 3,
+                                                                    "replication-factor":
3,
+                                                                    'configs': {"min.insync.replicas":
2}}})
+       
+        self.kafka.start()
+        self.logger.info("First format change to 0.9.0")
+        self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
+        self.produce_and_consume(producer_version, consumer_version, "group1")
+
+        self.logger.info("Second format change to 0.10.0")
+        self.kafka.alter_message_format(self.topic, str(LATEST_0_10))
+        self.produce_and_consume(producer_version, consumer_version, "group2")
+
+        if producer_version == str(TRUNK) and consumer_version == str(TRUNK):
+            self.logger.info("Third format change back to 0.9.0")
+            self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
+            self.produce_and_consume(producer_version, consumer_version, "group3")
+
+


Mime
View raw message