kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch trunk updated: MINOR: Rolling bounce upgrade fixed broker system test (#4690)
Date Thu, 22 Mar 2018 23:02:19 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 286216b  MINOR: Rolling bounce upgrade fixed broker system test (#4690)
286216b is described below

commit 286216b56e27d8db18f7268b63cbd8d7f8bd8830
Author: Bill Bejeck <bbejeck@gmail.com>
AuthorDate: Thu Mar 22 19:02:16 2018 -0400

    MINOR: Rolling bounce upgrade fixed broker system test (#4690)
    
    Reviewers: Guozhang Wang <guozhang@confluent.io>, John Roesler <john@confluent.io>,
Matthias J. Sax <matthias@confluent.io>
---
 tests/kafkatest/tests/streams/base_streams_test.py |  99 ++++++++++++++++
 .../streams/streams_broker_down_resilience_test.py | 124 ++++++++-------------
 .../streams_multiple_rolling_upgrade_test.py       | 116 +++++++++++++++++++
 .../tests/streams/streams_standby_replica_test.py  |  73 ++----------
 tests/kafkatest/version.py                         |   6 +-
 5 files changed, 277 insertions(+), 141 deletions(-)

diff --git a/tests/kafkatest/tests/streams/base_streams_test.py b/tests/kafkatest/tests/streams/base_streams_test.py
new file mode 100644
index 0000000..6502773
--- /dev/null
+++ b/tests/kafkatest/tests/streams/base_streams_test.py
@@ -0,0 +1,99 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.utils.util import wait_until
+from kafkatest.services.verifiable_consumer import VerifiableConsumer
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.tests.kafka_test import KafkaTest
+
+
+class BaseStreamsTest(KafkaTest):
+    """
+    Helper class that contains methods for producing and consuming
+    messages and verification of results from log files
+
+    Extends KafkaTest which manages setting up Kafka Cluster and Zookeeper
+    see tests/kafkatest/tests/kafka_test.py for more info
+    """
+    def __init__(self, test_context,  topics, num_zk=1, num_brokers=3):
+        super(BaseStreamsTest, self).__init__(test_context, num_zk, num_brokers, topics)
+
+    def get_consumer(self, client_id, topic, num_messages):
+        return VerifiableConsumer(self.test_context,
+                                  1,
+                                  self.kafka,
+                                  topic,
+                                  client_id,
+                                  max_messages=num_messages)
+
+    def get_producer(self, topic, num_messages):
+        return VerifiableProducer(self.test_context,
+                                  1,
+                                  self.kafka,
+                                  topic,
+                                  max_messages=num_messages,
+                                  acks=1)
+
+    def assert_produce_consume(self,
+                               streams_source_topic,
+                               streams_sink_topic,
+                               client_id,
+                               test_state,
+                               num_messages=5,
+                               timeout_sec=60):
+
+        self.assert_produce(streams_source_topic, test_state, num_messages, timeout_sec)
+
+        self.assert_consume(client_id, test_state, streams_sink_topic, num_messages, timeout_sec)
+
+    def assert_produce(self, topic, test_state, num_messages=5, timeout_sec=60):
+        producer = self.get_producer(topic, num_messages)
+        producer.start()
+
+        wait_until(lambda: producer.num_acked >= num_messages,
+                   timeout_sec=timeout_sec,
+                   err_msg="At %s failed to send messages " % test_state)
+
+    def assert_consume(self, client_id, test_state, topic, num_messages=5, timeout_sec=60):
+        consumer = self.get_consumer(client_id, topic, num_messages)
+        consumer.start()
+
+        wait_until(lambda: consumer.total_consumed() >= num_messages,
+                   timeout_sec=timeout_sec,
+                   err_msg="At %s streams did not process messages in %s seconds " % (test_state,
timeout_sec))
+
+    @staticmethod
+    def get_configs(extra_configs=""):
+        # Consumer max.poll.interval > min(max.block.ms, ((retries + 1) * request.timeout)
+        consumer_poll_ms = "consumer.max.poll.interval.ms=50000"
+        retries_config = "producer.retries=2"
+        request_timeout = "producer.request.timeout.ms=15000"
+        max_block_ms = "producer.max.block.ms=30000"
+
+        # java code expects configs in key=value,key=value format
+        updated_configs = consumer_poll_ms + "," + retries_config + "," + request_timeout
+ "," + max_block_ms + extra_configs
+
+        return updated_configs
+
+    def wait_for_verification(self, processor, message, file, num_lines=1):
+        wait_until(lambda: self.verify_from_file(processor, message, file) >= num_lines,
+                   timeout_sec=60,
+                   err_msg="Did expect to read '%s' from %s" % (message, processor.node.account))
+
+    @staticmethod
+    def verify_from_file(processor, message, file):
+        result = processor.node.account.ssh_output("grep '%s' %s | wc -l" % (message, file),
allow_fail=False)
+        return int(result)
+
diff --git a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
index add6247..11dc424 100644
--- a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
@@ -14,16 +14,11 @@
 # limitations under the License.
 
 import time
-from ducktape.tests.test import Test
-from ducktape.utils.util import wait_until
-from kafkatest.services.kafka import KafkaService
 from kafkatest.services.streams import StreamsBrokerDownResilienceService
-from kafkatest.services.verifiable_consumer import VerifiableConsumer
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.tests.streams.base_streams_test import BaseStreamsTest
 
 
-class StreamsBrokerDownResilience(Test):
+class StreamsBrokerDownResilience(BaseStreamsTest):
     """
     This test validates that Streams is resilient to a broker
     being down longer than specified timeouts in configs
@@ -31,73 +26,14 @@ class StreamsBrokerDownResilience(Test):
 
     inputTopic = "streamsResilienceSource"
     outputTopic = "streamsResilienceSink"
+    client_id = "streams-broker-resilience-verify-consumer"
     num_messages = 5
 
     def __init__(self, test_context):
-        super(StreamsBrokerDownResilience, self).__init__(test_context=test_context)
-        self.zk = ZookeeperService(test_context, num_nodes=1)
-        self.kafka = KafkaService(test_context,
-                                  num_nodes=1,
-                                  zk=self.zk,
-                                  topics={
-                                      self.inputTopic: {'partitions': 3, 'replication-factor':
1},
-                                      self.outputTopic: {'partitions': 1, 'replication-factor':
1}
-                                  })
-
-    def get_consumer(self, num_messages):
-        return VerifiableConsumer(self.test_context,
-                                  1,
-                                  self.kafka,
-                                  self.outputTopic,
-                                  "stream-broker-resilience-verify-consumer",
-                                  max_messages=num_messages)
-
-    def get_producer(self, num_messages):
-        return VerifiableProducer(self.test_context,
-                                  1,
-                                  self.kafka,
-                                  self.inputTopic,
-                                  max_messages=num_messages,
-                                  acks=1)
-
-    def assert_produce_consume(self, test_state, num_messages=5):
-        producer = self.get_producer(num_messages)
-        producer.start()
-
-        wait_until(lambda: producer.num_acked >= num_messages,
-                   timeout_sec=30,
-                   err_msg="At %s failed to send messages " % test_state)
-
-        consumer = self.get_consumer(num_messages)
-        consumer.start()
-
-        wait_until(lambda: consumer.total_consumed() >= num_messages,
-                   timeout_sec=60,
-                   err_msg="At %s streams did not process messages in 60 seconds " % test_state)
-
-    @staticmethod
-    def get_configs(extra_configs=""):
-        # Consumer max.poll.interval > min(max.block.ms, ((retries + 1) * request.timeout)
-        consumer_poll_ms = "consumer.max.poll.interval.ms=50000"
-        retries_config = "producer.retries=2"
-        request_timeout = "producer.request.timeout.ms=15000"
-        max_block_ms = "producer.max.block.ms=30000"
-
-        # java code expects configs in key=value,key=value format
-        updated_configs = consumer_poll_ms + "," + retries_config + "," + request_timeout
+ "," + max_block_ms + extra_configs
-
-        return updated_configs
-
-    def wait_for_verification(self, processor, message, file, num_lines=1):
-        wait_until(lambda: self.verify_from_file(processor, message, file) >= num_lines,
-                   timeout_sec=60,
-                   err_msg="Did expect to read '%s' from %s" % (message, processor.node.account))
-
-    @staticmethod
-    def verify_from_file(processor, message, file):
-        result = processor.node.account.ssh_output("grep '%s' %s | wc -l" % (message, file),
allow_fail=False)
-        return int(result)
-
+        super(StreamsBrokerDownResilience, self).__init__(test_context,
+                                                          topics={self.inputTopic: {'partitions':
3, 'replication-factor': 1},
+                                                                  self.outputTopic: {'partitions':
1, 'replication-factor': 1}},
+                                                          num_brokers=1)
 
     def setUp(self):
         self.zk.start()
@@ -114,7 +50,10 @@ class StreamsBrokerDownResilience(Test):
 
         # until KIP-91 is merged we'll only send 5 messages to assert Kafka Streams is running
before taking the broker down
         # After KIP-91 is merged we'll continue to send messages the duration of the test
-        self.assert_produce_consume("before_broker_stop")
+        self.assert_produce_consume(self.inputTopic,
+                                    self.outputTopic,
+                                    self.client_id,
+                                    "before_broker_stop")
 
         node = self.kafka.leader(self.inputTopic)
 
@@ -124,7 +63,11 @@ class StreamsBrokerDownResilience(Test):
 
         self.kafka.start_node(node)
 
-        self.assert_produce_consume("after_broker_stop")
+        self.assert_produce_consume(self.inputTopic,
+                                    self.outputTopic,
+                                    self.client_id,
+                                    "after_broker_stop",
+                                    timeout_sec=120)
 
         self.kafka.stop()
 
@@ -156,7 +99,12 @@ class StreamsBrokerDownResilience(Test):
         self.kafka.start_node(node)
 
         # assert streams can process when starting with broker down
-        self.assert_produce_consume("running_with_broker_down_initially", num_messages=9)
+        self.assert_produce_consume(self.inputTopic,
+                                    self.outputTopic,
+                                    self.client_id,
+                                    "running_with_broker_down_initially",
+                                    num_messages=9,
+                                    timeout_sec=120)
 
         message = "processed3messages"
         # need to show all 3 instances processed messages
@@ -184,7 +132,12 @@ class StreamsBrokerDownResilience(Test):
         self.wait_for_verification(processor_3, "State transition from REBALANCING to RUNNING",
processor_3.LOG_FILE)
 
         # assert streams can process when starting with broker up
-        self.assert_produce_consume("waiting for rebalance to complete", num_messages=9)
+        self.assert_produce_consume(self.inputTopic,
+                                    self.outputTopic,
+                                    self.client_id,
+                                    "waiting for rebalance to complete",
+                                    num_messages=9,
+                                    timeout_sec=120)
 
         message = "processed3messages"
 
@@ -204,7 +157,12 @@ class StreamsBrokerDownResilience(Test):
 
         self.kafka.start_node(node)
 
-        self.assert_produce_consume("sending_message_after_stopping_streams_instance_bouncing_broker",
num_messages=9)
+        self.assert_produce_consume(self.inputTopic,
+                                    self.outputTopic,
+                                    self.client_id,
+                                    "sending_message_after_stopping_streams_instance_bouncing_broker",
+                                    num_messages=9,
+                                    timeout_sec=120)
 
         self.wait_for_verification(processor_3, "processed9messages", processor_3.STDOUT_FILE)
 
@@ -228,7 +186,12 @@ class StreamsBrokerDownResilience(Test):
         self.wait_for_verification(processor_3, "State transition from REBALANCING to RUNNING",
processor_3.LOG_FILE)
 
         # assert streams can process when starting with broker up
-        self.assert_produce_consume("waiting for rebalance to complete", num_messages=9)
+        self.assert_produce_consume(self.inputTopic,
+                                    self.outputTopic,
+                                    self.client_id,
+                                    "waiting for rebalance to complete",
+                                    num_messages=9,
+                                    timeout_sec=120)
 
         message = "processed3messages"
 
@@ -245,6 +208,11 @@ class StreamsBrokerDownResilience(Test):
 
         self.kafka.start_node(node)
 
-        self.assert_produce_consume("sending_message_after_hard_bouncing_streams_instance_bouncing_broker",
num_messages=9)
+        self.assert_produce_consume(self.inputTopic,
+                                    self.outputTopic,
+                                    self.client_id,
+                                    "sending_message_after_hard_bouncing_streams_instance_bouncing_broker",
+                                    num_messages=9,
+                                    timeout_sec=120)
 
         self.kafka.stop()
diff --git a/tests/kafkatest/tests/streams/streams_multiple_rolling_upgrade_test.py b/tests/kafkatest/tests/streams/streams_multiple_rolling_upgrade_test.py
new file mode 100644
index 0000000..3fbf67d
--- /dev/null
+++ b/tests/kafkatest/tests/streams/streams_multiple_rolling_upgrade_test.py
@@ -0,0 +1,116 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ducktape.mark import matrix
+from ducktape.mark.resource import cluster
+from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
+from kafkatest.tests.streams.base_streams_test import BaseStreamsTest
+from kafkatest.version import LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0,  DEV_BRANCH, KafkaVersion
+
+
+class StreamsMultipleRollingUpgradeTest(BaseStreamsTest):
+    """
+     This test will verify a rolling upgrade of multiple streams
+     applications against all versions of streams against a single
+     broker version.
+
+     As new releases come out, just update the streams_upgrade_versions array to have the
latest version
+     included in the list.
+
+     A prerequisite for this test to succeed
+     is the inclusion of all parametrized versions of kafka in kafka/vagrant/base.sh
+     (search for get_kafka()).
+     As new versions are released the kafka/tests/kafkatest/version.py file
+     needs to be updated as well.
+
+     You can find what's been uploaded to S3 with the following command
+
+     aws s3api list-objects --bucket kafka-packages --query 'Contents[].{Key:Key}
+    """
+    # adding new version to this list will cover broker and streams version
+    streams_upgrade_versions = [str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0),
str(DEV_BRANCH)]
+
+    def __init__(self, test_context):
+        super(StreamsMultipleRollingUpgradeTest, self).__init__(test_context,
+                                                                topics={
+                                                                    'echo': {'partitions':
5, 'replication-factor': 1},
+                                                                    'data': {'partitions':
5, 'replication-factor': 1},
+                                                                    'min': {'partitions':
5, 'replication-factor': 1},
+                                                                    'max': {'partitions':
5, 'replication-factor': 1},
+                                                                    'sum': {'partitions':
5, 'replication-factor': 1},
+                                                                    'dif': {'partitions':
5, 'replication-factor': 1},
+                                                                    'cnt': {'partitions':
5, 'replication-factor': 1},
+                                                                    'avg': {'partitions':
5, 'replication-factor': 1},
+                                                                    'wcnt': {'partitions':
5, 'replication-factor': 1},
+                                                                    'tagg': {'partitions':
5, 'replication-factor': 1}
+                                                                })
+
+        self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
+        self.processor_1 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
+        self.processor_2 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
+        self.processor_3 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
+
+        # already on trunk version at end of upgrades so get rid of it
+        self.streams_downgrade_versions = self.streams_upgrade_versions[:-1]
+        self.streams_downgrade_versions.reverse()
+
+        self.processors = [self.processor_1, self.processor_2, self.processor_3]
+
+        self.started = False
+
+    def setUp(self):
+        self.zk.start()
+
+    def upgrade_and_verify_start(self, processors, to_version):
+        for processor in processors:
+            self.logger.info("Updating node %s to version %s" % (processor.node.account,
to_version))
+            node = processor.node
+            if self.started:
+                self.stop(processor)
+            node.version = KafkaVersion(to_version)
+            processor.start()
+            self.wait_for_verification(processor, "initializing processor: topic", processor.STDOUT_FILE)
+
+        self.started = True
+
+    def stop(self, processor):
+        processor.stop()
+        self.wait_for_verification(processor, "SMOKE-TEST-CLIENT-CLOSED", processor.STDOUT_FILE)
+
+    def update_processors_and_verify(self, versions):
+        for version in versions:
+            self.upgrade_and_verify_start(self.processors, version)
+        self.run_data_and_verify()
+
+    def run_data_and_verify(self):
+        self.driver.start()
+        self.wait_for_verification(self.driver, "ALL-RECORDS-DELIVERED", self.driver.STDOUT_FILE)
+        self.driver.stop()
+
+    @cluster(num_nodes=9)
+    @matrix(broker_version=streams_upgrade_versions)
+    def test_rolling_upgrade_downgrade_multiple_apps(self, broker_version):
+        self.kafka.set_version(KafkaVersion(broker_version))
+        self.kafka.start()
+
+        # verification step run after each upgrade
+        self.update_processors_and_verify(self.streams_upgrade_versions)
+
+        # with order reversed now we test downgrading, verification run after each downgrade
+        self.update_processors_and_verify(self.streams_downgrade_versions)
+
+        for processor in self.processors:
+            self.stop(processor)
diff --git a/tests/kafkatest/tests/streams/streams_standby_replica_test.py b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
index 533d4b5..b77326f 100644
--- a/tests/kafkatest/tests/streams/streams_standby_replica_test.py
+++ b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
@@ -13,16 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ducktape.tests.test import Test
-from ducktape.utils.util import wait_until
-from kafkatest.services.kafka import KafkaService
 from kafkatest.services.streams import StreamsRepeatingIntegerKeyProducerService
 from kafkatest.services.streams import StreamsStandbyTaskService
-from kafkatest.services.verifiable_consumer import VerifiableConsumer
-from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.tests.streams.base_streams_test import BaseStreamsTest
 
 
-class StreamsStandbyTask(Test):
+class StreamsStandbyTask(BaseStreamsTest):
     """
     This test validates using standby tasks helps with rebalance times
     additionally verifies standby replicas continue to work in the
@@ -32,64 +28,17 @@ class StreamsStandbyTask(Test):
     streams_source_topic = "standbyTaskSource1"
     streams_sink_topic_1 = "standbyTaskSink1"
     streams_sink_topic_2 = "standbyTaskSink2"
+    client_id = "stream-broker-resilience-verify-consumer"
 
     num_messages = 60000
 
     def __init__(self, test_context):
-        super(StreamsStandbyTask, self).__init__(test_context=test_context)
-        self.zk = ZookeeperService(test_context, num_nodes=1)
-        self.kafka = KafkaService(test_context,
-                                  num_nodes=3,
-                                  zk=self.zk,
-                                  topics={
-                                      self.streams_source_topic: {'partitions': 6, 'replication-factor':
1},
-                                      self.streams_sink_topic_1: {'partitions': 1, 'replication-factor':
1},
-                                      self.streams_sink_topic_2: {'partitions': 1, 'replication-factor':
1}
-                                  })
-
-    def get_consumer(self, topic, num_messages):
-        return VerifiableConsumer(self.test_context,
-                                  1,
-                                  self.kafka,
-                                  topic,
-                                  "stream-broker-resilience-verify-consumer",
-                                  max_messages=num_messages)
-
-    def assert_consume(self, test_state, topic, num_messages=5):
-        consumer = self.get_consumer(topic, num_messages)
-        consumer.start()
-
-        wait_until(lambda: consumer.total_consumed() >= num_messages,
-                   timeout_sec=120,
-                   err_msg="At %s streams did not process messages in 60 seconds " % test_state)
-
-    @staticmethod
-    def get_configs(extra_configs=""):
-        # Consumer max.poll.interval > min(max.block.ms, ((retries + 1) * request.timeout)
-        consumer_poll_ms = "consumer.max.poll.interval.ms=50000"
-        retries_config = "producer.retries=2"
-        request_timeout = "producer.request.timeout.ms=15000"
-        max_block_ms = "producer.max.block.ms=30000"
-
-        # java code expects configs in key=value,key=value format
-        updated_configs = consumer_poll_ms + "," + retries_config + "," + request_timeout
+ "," + max_block_ms + extra_configs
-
-        return updated_configs
-
-    def wait_for_verification(self, processor, message, file, num_lines=1, timeout_sec=20):
-        wait_until(lambda: self.verify_from_file(processor, message, file) >= num_lines,
-                   timeout_sec=timeout_sec,
-                   err_msg="Did expect to read '%s' from %s" % (message, processor.node.account))
-
-    @staticmethod
-    def verify_from_file(processor, message, file):
-        result = processor.node.account.ssh_output("grep '%s' %s | wc -l" % (message, file),
allow_fail=False)
-        return int(result)
-
-
-    def setUp(self):
-        self.zk.start()
-        self.kafka.start()
+        super(StreamsStandbyTask, self).__init__(test_context,
+                                                 topics={
+                                                     self.streams_source_topic: {'partitions':
6, 'replication-factor': 1},
+                                                     self.streams_sink_topic_1: {'partitions':
1, 'replication-factor': 1},
+                                                     self.streams_sink_topic_2: {'partitions':
1, 'replication-factor': 1}
+                                                 })
 
     def test_standby_tasks_rebalance(self):
 
@@ -157,8 +106,8 @@ class StreamsStandbyTask(Test):
         self.wait_for_verification(processor_3, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_3.STDOUT_FILE)
         self.wait_for_verification(processor_2, "ACTIVE_TASKS:2 STANDBY_TASKS:2", processor_2.STDOUT_FILE,
num_lines=2)
 
-        self.assert_consume("assert all messages consumed from %s" % self.streams_sink_topic_1,
self.streams_sink_topic_1, self.num_messages)
-        self.assert_consume("assert all messages consumed from %s" % self.streams_sink_topic_2,
self.streams_sink_topic_2, self.num_messages)
+        self.assert_consume(self.client_id, "assert all messages consumed from %s" % self.streams_sink_topic_1,
self.streams_sink_topic_1, self.num_messages)
+        self.assert_consume(self.client_id, "assert all messages consumed from %s" % self.streams_sink_topic_2,
self.streams_sink_topic_2, self.num_messages)
 
         self.wait_for_verification(driver, "Producer shut down now, sent total {0} of requested
{0}".format(str(self.num_messages)),
                                    driver.STDOUT_FILE)
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index f88fd31..b7071e7 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -99,4 +99,8 @@ LATEST_0_11 = LATEST_0_11_0
 # 1.0.x versions
 V_1_0_0 = KafkaVersion("1.0.0")
 V_1_0_1 = KafkaVersion("1.0.1")
-LATEST_1_0 = V_1_0_1
\ No newline at end of file
+LATEST_1_0 = V_1_0_1
+
+# 1.1.x versions
+V_1_1_0 = KafkaVersion("1.1.0")
+LATEST_1_1 = V_1_1_0

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.

Mime
View raw message