kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-4508; System test that runs client against older versions of the broker
Date Wed, 04 Jan 2017 13:55:57 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 7959bf506 -> 0cd5afdb6


KAFKA-4508; System test that runs client against older versions of the broker

In reality, we’ll only test older brokers after KAFKA-4462 is fully implemented.

Author: Colin P. Mccabe <cmccabe@confluent.io>

Reviewers: Apurva Mehta <apurva.1618@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #2263 from cmccabe/KAFKA-4508


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0cd5afdb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0cd5afdb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0cd5afdb

Branch: refs/heads/trunk
Commit: 0cd5afdb611d6d35009b9806be6bdf9874999bb3
Parents: 7959bf5
Author: Colin P. Mccabe <cmccabe@confluent.io>
Authored: Wed Jan 4 13:53:29 2017 +0000
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Wed Jan 4 13:55:36 2017 +0000

----------------------------------------------------------------------
 tests/kafkatest/services/kafka/kafka.py         |  5 ++
 .../client/test_producer_consumer_compat.py     | 75 ++++++++++++++++++++
 2 files changed, 80 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0cd5afdb/tests/kafkatest/services/kafka/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index f773d8d..716c2d2 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -119,6 +119,11 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
             node.version = version
             node.config = KafkaConfig(**{config_property.BROKER_ID: self.idx(node)})
 
+
+    def set_version(self, version):
+        for node in self.nodes:
+            node.version = version
+
     @property
     def security_config(self):
         return SecurityConfig(self.context, self.security_protocol, self.interbroker_security_protocol,

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cd5afdb/tests/kafkatest/tests/client/test_producer_consumer_compat.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/test_producer_consumer_compat.py b/tests/kafkatest/tests/client/test_producer_consumer_compat.py
new file mode 100644
index 0000000..5a14fc5
--- /dev/null
+++ b/tests/kafkatest/tests/client/test_producer_consumer_compat.py
@@ -0,0 +1,75 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int_with_prefix
+from kafkatest.version import TRUNK, V_0_10_0_0, KafkaVersion
+
+class TestProducerConsumerCompat(ProduceConsumeValidateTest):
+    """
+    These tests validate that we can use a new client to consume from older
+    brokers.
+    """
+
+    def __init__(self, test_context):
+        """:type test_context: ducktape.tests.test.TestContext"""
+        super(TestProducerConsumerCompat, self).__init__(test_context=test_context)
+
+        self.topic = "test_topic"
+        self.zk = ZookeeperService(test_context, num_nodes=3)
+        self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics={self.topic:{
+                                                                    "partitions": 10,
+                                                                    "replication-factor":
2}})
+        self.num_partitions = 10
+        self.timeout_sec = 60
+        self.producer_throughput = 1000
+        self.num_producers = 2
+        self.messages_per_producer = 1000
+        self.num_consumers = 1
+
+    def setUp(self):
+        self.zk.start()
+
+    def min_cluster_size(self):
+        # Override this since we're adding services outside of the constructor
+        return super(TestProducerConsumerCompat, self).min_cluster_size() + self.num_producers
+ self.num_consumers
+
+    # TODO: when KAFKA-4462 is fully implemented, we should test other versions here.
+    @parametrize(broker_version=str(TRUNK))
+    def test_produce_consume(self, broker_version):
+        print("running producer_consumer_compat with broker_version = %s" % broker_version)
+        self.kafka.set_version(KafkaVersion(broker_version))
+        self.kafka.security_protocol = "PLAINTEXT"
+        self.kafka.interbroker_security_protocol = self.kafka.security_protocol
+        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
+                                           self.topic, throughput=self.producer_throughput,
+                                           message_validator=is_int_with_prefix)
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
self.topic,
+                                        consumer_timeout_ms=60000,
+                                        message_validator=is_int_with_prefix)
+        self.kafka.start()
+
+        self.run_produce_consume_validate(lambda: wait_until(
+            lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
+            timeout_sec=120, backoff_sec=1,
+            err_msg="Producer did not produce all messages in reasonable amount of time"))
+


Mime
View raw message