kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-3188: Compatibility test for old and new clients with 0.10 broker
Date Thu, 17 Mar 2016 20:17:04 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 30e78fa00 -> a1eb12d7c


KAFKA-3188: Compatibility test for old and new clients with 0.10 broker

apovzner becketqin please have a look if you can. Thanks.

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Anna Povzner, Gwen Shapira

Closes #1059 from enothereska/kafka-3188-compatibility


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a1eb12d7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a1eb12d7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a1eb12d7

Branch: refs/heads/trunk
Commit: a1eb12d7c6ad9422b9cf24b670d1b4c11227b03e
Parents: 30e78fa
Author: Eno Thereska <eno.thereska@gmail.com>
Authored: Thu Mar 17 13:17:01 2016 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Thu Mar 17 13:17:01 2016 -0700

----------------------------------------------------------------------
 .../kafkatest/services/kafka/config_property.py |  2 +-
 .../tests/compatibility_test_new_broker.py      | 78 ++++++++++++++++++++
 2 files changed, 79 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a1eb12d7/tests/kafkatest/services/kafka/config_property.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/config_property.py b/tests/kafkatest/services/kafka/config_property.py
index 8f30f13..e1801ef 100644
--- a/tests/kafkatest/services/kafka/config_property.py
+++ b/tests/kafkatest/services/kafka/config_property.py
@@ -41,7 +41,7 @@ ZOOKEEPER_CONNECT = "zookeeper.connect"
 ZOOKEEPER_CONNECTION_TIMEOUT_MS = "zookeeper.connection.timeout.ms"
 INTER_BROKER_PROTOCOL_VERSION = "inter.broker.protocol.version"
 MESSAGE_FORMAT_VERSION = "log.message.format.version"
-
+MESSAGE_TIMESTAMP_TYPE = "message.timestamp.type"
 
 
 """

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1eb12d7/tests/kafkatest/tests/compatibility_test_new_broker.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/compatibility_test_new_broker.py b/tests/kafkatest/tests/compatibility_test_new_broker.py
new file mode 100644
index 0000000..2c261df
--- /dev/null
+++ b/tests/kafkatest/tests/compatibility_test_new_broker.py
@@ -0,0 +1,78 @@
+# Copyright 2015 Confluent Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+from ducktape.mark import parametrize
+from ducktape.utils.util import wait_until
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka.version import LATEST_0_9, LATEST_0_8_2, TRUNK, KafkaVersion
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.utils import is_int
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.services.kafka import config_property
+
+# Compatibility tests for moving to a new broker (e.g., 0.10.x) and using a mix of old and
new clients (e.g., 0.9.x)
+class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
+
+    def __init__(self, test_context):
+        super(ClientCompatibilityTestNewBroker, self).__init__(test_context=test_context)
+
+    def setUp(self):
+        self.topic = "test_topic"
+        self.zk = ZookeeperService(self.test_context, num_nodes=1)
+            
+        self.zk.start()
+
+        # Producer and consumer
+        self.producer_throughput = 10000
+        self.num_producers = 1
+        self.num_consumers = 1
+        self.messages_per_producer = 1000
+
+    @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2),
compression_types=["none"], timestamp_type=None)
+    @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_9), compression_types=["none"],
timestamp_type=None)
+    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["none"],
timestamp_type=None)
+    @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["none"],
timestamp_type=None)
+    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["snappy"],
new_consumer=True, timestamp_type=None)
+    @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["snappy"],
new_consumer=True, timestamp_type=str("CreateTime"))
+    @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["snappy"],
new_consumer=True, timestamp_type=str("LogAppendTime"))
+    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9), compression_types=["snappy"],
new_consumer=True, timestamp_type=str("LogAppendTime"))
+    @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["none"],
timestamp_type=str("LogAppendTime"))
+    def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=False,
timestamp_type=None):
+       
+        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK,
topics={self.topic: {
+                                                                    "partitions": 3,
+                                                                    "replication-factor":
3,
+                                                                    'configs': {"min.insync.replicas":
2}}})
+        for node in self.kafka.nodes:
+            if timestamp_type is not None:
+                node.config[config_property.MESSAGE_TIMESTAMP_TYPE] = timestamp_type
+        self.kafka.start()
+         
+        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
+                                           self.topic, throughput=self.producer_throughput,
+                                           message_validator=is_int,
+                                           compression_types=compression_types,
+                                           version=KafkaVersion(producer_version))
+
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
+                                        self.topic, consumer_timeout_ms=30000, new_consumer=new_consumer,
+                                        message_validator=is_int, version=KafkaVersion(consumer_version))
+
+        self.run_produce_consume_validate(lambda: wait_until(
+            lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
+            timeout_sec=120, backoff_sec=1,
+            err_msg="Producer did not produce all messages in reasonable amount of time"))


Mime
View raw message