kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: Remove sleep calls and ignore annotation from streams upgrade test (#6046)
Date Mon, 07 Jan 2019 07:04:10 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 404bdef  MINOR: Remove sleep calls and ignore annotation from streams upgrade test
(#6046)
404bdef is described below

commit 404bdef08db8c88f1b7a921737279feabfd0cb1f
Author: Bill Bejeck <bbejeck@gmail.com>
AuthorDate: Mon Jan 7 02:03:54 2019 -0500

    MINOR: Remove sleep calls and ignore annotation from streams upgrade test (#6046)
    
    The StreamsUpgradeTest::test_upgrade_downgrade_brokers used sleep calls in the test which
led to flaky test performance and as a result, we placed an @ignore annotation on the test.
This PR uses log events instead of the sleep calls hence we can now remove the @ignore setting.
    
    Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>, Matthias J. Sax <matthias@confluent.io>,
Guozhang Wang <wangguoz@gmail.com>
---
 build.gradle                                       | 12 +++
 gradle/dependencies.gradle                         |  4 +-
 settings.gradle                                    |  2 +-
 .../kafka/streams/tests/StreamsUpgradeTest.java    | 98 ++++++++++++++++++++++
 tests/docker/Dockerfile                            |  6 +-
 .../tests/streams/streams_upgrade_test.py          | 93 ++++++++++++--------
 tests/kafkatest/version.py                         |  7 +-
 vagrant/base.sh                                    |  6 +-
 8 files changed, 185 insertions(+), 43 deletions(-)

diff --git a/build.gradle b/build.gradle
index 59354dd..65d774d 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1207,6 +1207,18 @@ project(':streams:upgrade-system-tests-20') {
   }
 }
 
+project(':streams:upgrade-system-tests-21') {
+  archivesBaseName = "kafka-streams-upgrade-system-tests-21"
+
+  dependencies {
+    testCompile libs.kafkaStreams_21
+  }
+
+  systemTestLibs {
+    dependsOn testJar
+  }
+}
+
 project(':jmh-benchmarks') {
 
   apply plugin: 'com.github.johnrengelman.shadow'
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 1621be9..2c2488f 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -71,7 +71,8 @@ versions += [
   kafka_0110: "0.11.0.3",
   kafka_10: "1.0.2",
   kafka_11: "1.1.1",
-  kafka_20: "2.0.0",
+  kafka_20: "2.0.1",
+  kafka_21: "2.1.0",
   lz4: "1.5.0",
   mavenArtifact: "3.6.0",
   metrics: "2.2.0",
@@ -126,6 +127,7 @@ libs += [
   kafkaStreams_10: "org.apache.kafka:kafka-streams:$versions.kafka_10",
   kafkaStreams_11: "org.apache.kafka:kafka-streams:$versions.kafka_11",
   kafkaStreams_20: "org.apache.kafka:kafka-streams:$versions.kafka_20",
+  kafkaStreams_21: "org.apache.kafka:kafka-streams:$versions.kafka_21",
   log4j: "log4j:log4j:$versions.log4j",
   lz4: "org.lz4:lz4-java:$versions.lz4",
   metrics: "com.yammer.metrics:metrics-core:$versions.metrics",
diff --git a/settings.gradle b/settings.gradle
index dff6c2c..a74df21 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -16,5 +16,5 @@
 include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:streams-scala', 'streams:test-utils',
'streams:examples',
         'streams:upgrade-system-tests-0100', 'streams:upgrade-system-tests-0101', 'streams:upgrade-system-tests-0102',
         'streams:upgrade-system-tests-0110', 'streams:upgrade-system-tests-10', 'streams:upgrade-system-tests-11',
'streams:upgrade-system-tests-20',
-        'log4j-appender', 'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json',
'connect:file',
+        'streams:upgrade-system-tests-21' , 'log4j-appender', 'connect:api', 'connect:transforms',
'connect:runtime', 'connect:json', 'connect:file',
         'connect:basic-auth-extension', 'jmh-benchmarks'
diff --git a/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
b/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
new file mode 100644
index 0000000..3e719cf
--- /dev/null
+++ b/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import java.util.Properties;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+public class StreamsUpgradeTest {
+
+
+    @SuppressWarnings("unchecked")
+    public static void main(final String[] args) throws Exception {
+        if (args.length < 2) {
+            System.err.println("StreamsUpgradeTest requires three argument (kafka-url, properties-file)
but only " + args.length + " provided: "
+                + (args.length > 0 ? args[0] : ""));
+        }
+        final String kafka = args[0];
+        final String propFileName = args.length > 1 ? args[1] : null;
+
+        final Properties streamsProperties = Utils.loadProps(propFileName);
+
+        System.out.println("StreamsTest instance started (StreamsUpgradeTest v2.1)");
+        System.out.println("kafka=" + kafka);
+        System.out.println("props=" + streamsProperties);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream dataStream = builder.stream("data");
+        dataStream.process(printProcessorSupplier());
+        dataStream.to("echo");
+
+        final Properties config = new Properties();
+        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
+        config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        config.putAll(streamsProperties);
+
+        final KafkaStreams streams = new KafkaStreams(builder.build(), config);
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                streams.close();
+                System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+                System.out.flush();
+            }
+        });
+    }
+
+    private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
+        return new ProcessorSupplier<K, V>() {
+            public Processor<K, V> get() {
+                return new AbstractProcessor<K, V>() {
+                    private int numRecordsProcessed = 0;
+
+                    @Override
+                    public void init(final ProcessorContext context) {
+                        System.out.println("initializing processor: topic=data taskId=" +
context.taskId());
+                        numRecordsProcessed = 0;
+                    }
+
+                    @Override
+                    public void process(final K key, final V value) {
+                        numRecordsProcessed++;
+                        if (numRecordsProcessed % 100 == 0) {
+                            System.out.println("processed " + numRecordsProcessed + " records
from topic=data");
+                        }
+                    }
+
+                    @Override
+                    public void close() {}
+                };
+            }
+        };
+    }
+}
diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile
index 7c1efd6..e5cf439 100644
--- a/tests/docker/Dockerfile
+++ b/tests/docker/Dockerfile
@@ -49,7 +49,8 @@ RUN mkdir -p "/opt/kafka-0.10.2.2" && chmod a+rw /opt/kafka-0.10.2.2
&& curl -s
 RUN mkdir -p "/opt/kafka-0.11.0.3" && chmod a+rw /opt/kafka-0.11.0.3 && curl
-s "$KAFKA_MIRROR/kafka_2.11-0.11.0.3.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.3"
 RUN mkdir -p "/opt/kafka-1.0.2" && chmod a+rw /opt/kafka-1.0.2 && curl -s
"$KAFKA_MIRROR/kafka_2.11-1.0.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.0.2"
 RUN mkdir -p "/opt/kafka-1.1.1" && chmod a+rw /opt/kafka-1.1.1 && curl -s
"$KAFKA_MIRROR/kafka_2.11-1.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.1.1"
-RUN mkdir -p "/opt/kafka-2.0.0" && chmod a+rw /opt/kafka-2.0.0 && curl -s
"$KAFKA_MIRROR/kafka_2.12-2.0.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.0.0"
+RUN mkdir -p "/opt/kafka-2.0.1" && chmod a+rw /opt/kafka-2.0.1 && curl -s
"$KAFKA_MIRROR/kafka_2.12-2.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.0.1"
+RUN mkdir -p "/opt/kafka-2.1.0" && chmod a+rw /opt/kafka-2.1.0 && curl -s
"$KAFKA_MIRROR/kafka_2.12-2.1.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.1.0"
 
 # Streams test dependencies
 RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.0.1-test.jar" -o /opt/kafka-0.10.0.1/libs/kafka-streams-0.10.0.1-test.jar
@@ -58,7 +59,8 @@ RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.2.2-test.jar" -o /opt/kafka-0.10.2
 RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.11.0.3-test.jar" -o /opt/kafka-0.11.0.3/libs/kafka-streams-0.11.0.3-test.jar
 RUN curl -s "$KAFKA_MIRROR/kafka-streams-1.0.2-test.jar" -o /opt/kafka-1.0.2/libs/kafka-streams-1.0.2-test.jar
 RUN curl -s "$KAFKA_MIRROR/kafka-streams-1.1.1-test.jar" -o /opt/kafka-1.1.1/libs/kafka-streams-1.1.1-test.jar
-RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.0.0-test.jar" -o /opt/kafka-2.0.0/libs/kafka-streams-2.0.0-test.jar
+RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.0.1-test.jar" -o /opt/kafka-2.0.1/libs/kafka-streams-2.0.1-test.jar
+RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.1.0-test.jar" -o /opt/kafka-2.1.0/libs/kafka-streams-2.1.0-test.jar
 
 # The version of Kibosh to use for testing.
 # If you update this, also update vagrant/base.sy
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index baf507b..57eaad5 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -15,17 +15,19 @@
 
 import random
 import time
-from ducktape.mark import ignore
 from ducktape.mark import matrix
 from ducktape.mark.resource import cluster
 from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
 from kafkatest.services.kafka import KafkaService
-from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService,
StreamsUpgradeTestJobRunnerService
+from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService,
\
+    StreamsUpgradeTestJobRunnerService
 from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0,
LATEST_1_0, LATEST_1_1, LATEST_2_0, DEV_BRANCH, DEV_VERSION, KafkaVersion
+from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0,
LATEST_1_0, LATEST_1_1, \
+    LATEST_2_0, LATEST_2_1, DEV_BRANCH, DEV_VERSION, KafkaVersion
 
 # broker 0.10.0 is not compatible with newer Kafka Streams versions
-broker_upgrade_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0),
str(LATEST_1_1), str(LATEST_2_0), str(DEV_BRANCH)]
+broker_upgrade_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0),
str(LATEST_1_1), str(LATEST_2_0), str(LATEST_2_1), str(DEV_BRANCH)]
 
 metadata_1_versions = [str(LATEST_0_10_0)]
 metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0),
str(LATEST_1_1)]
@@ -38,7 +40,8 @@ class StreamsUpgradeTest(Test):
     """
     Test upgrading Kafka Streams (all version combination)
     If metadata was changes, upgrade is more difficult
-    Metadata version was bumped in 0.10.1.0
+    Metadata version was bumped in 0.10.1.0 and
+    subsequently bumped in 2.0.0
     """
 
     def __init__(self, test_context):
@@ -50,6 +53,8 @@ class StreamsUpgradeTest(Test):
         self.leader = None
         self.leader_counter = {}
 
+    processed_msg = "processed [0-9]* records"
+
     def perform_broker_upgrade(self, to_version):
         self.logger.info("First pass bounce - rolling broker upgrade")
         for node in self.kafka.nodes:
@@ -57,7 +62,6 @@ class StreamsUpgradeTest(Test):
             node.version = KafkaVersion(to_version)
             self.kafka.start_node(node)
 
-    @ignore
     @cluster(num_nodes=6)
     @matrix(from_version=broker_upgrade_versions, to_version=broker_upgrade_versions)
     def test_upgrade_downgrade_brokers(self, from_version, to_version):
@@ -69,6 +73,7 @@ class StreamsUpgradeTest(Test):
             return
 
         self.replication = 3
+        self.num_kafka_nodes = 3
         self.partitions = 1
         self.isr = 2
         self.topics = {
@@ -99,31 +104,48 @@ class StreamsUpgradeTest(Test):
         self.zk.start()
 
         # number of nodes needs to be >= 3 for the smoke test
-        self.kafka = KafkaService(self.test_context, num_nodes=3,
+        self.kafka = KafkaService(self.test_context, num_nodes=self.num_kafka_nodes,
                                   zk=self.zk, version=KafkaVersion(from_version), topics=self.topics)
         self.kafka.start()
 
         # allow some time for topics to be created
-        time.sleep(10)
+        wait_until(lambda: self.get_topics_count() >= (len(self.topics) * self.num_kafka_nodes),
+                   timeout_sec=60,
+                   err_msg="Broker did not create all topics in 60 seconds ")
 
         self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
-        self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
-        
-        self.driver.start()
-        self.processor1.start()
-        time.sleep(15)
 
-        self.perform_broker_upgrade(to_version)
+        processor = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
 
-        time.sleep(15)
-        self.driver.wait()
-        self.driver.stop()
+        with self.driver.node.account.monitor_log(self.driver.STDOUT_FILE) as driver_monitor:
+            self.driver.start()
+
+            with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor:
+                processor.start()
+                monitor.wait_until(self.processed_msg,
+                                   timeout_sec=60,
+                                   err_msg="Never saw output '%s' on" % self.processed_msg
+ str(processor.node))
+
+            connected_message = "Discovered group coordinator"
+            with processor.node.account.monitor_log(processor.LOG_FILE) as log_monitor:
+                with processor.node.account.monitor_log(processor.STDOUT_FILE) as stdout_monitor:
+                    self.perform_broker_upgrade(to_version)
+
+                    log_monitor.wait_until(connected_message,
+                                           timeout_sec=120,
+                                           err_msg=("Never saw output '%s' on " % connected_message)
+ str(processor.node.account))
+
+                    stdout_monitor.wait_until(self.processed_msg,
+                                              timeout_sec=60,
+                                              err_msg="Never saw output '%s' on" % self.processed_msg
+ str(processor.node.account))
 
-        self.processor1.stop()
+            driver_monitor.wait_until('ALL-RECORDS-DELIVERED\|PROCESSED-MORE-THAN-GENERATED',
+                                      timeout_sec=180,
+                                      err_msg="Never saw output '%s' on" % 'ALL-RECORDS-DELIVERED|PROCESSED-MORE-THAN-GENERATED'
+ str(self.driver.node.account))
 
-        node = self.driver.node
-        node.account.ssh("grep -E 'ALL-RECORDS-DELIVERED|PROCESSED-MORE-THAN-GENERATED' %s"
% self.driver.STDOUT_FILE, allow_fail=False)
-        self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE,
allow_fail=False)
+        self.driver.stop()
+        processor.stop()
+        processor.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE,
allow_fail=False)
 
     @matrix(from_version=metadata_2_versions, to_version=metadata_2_versions)
     def test_simple_upgrade_downgrade(self, from_version, to_version):
@@ -163,7 +185,6 @@ class StreamsUpgradeTest(Test):
 
         # shutdown
         self.driver.stop()
-        self.driver.wait()
 
         random.shuffle(self.processors)
         for p in self.processors:
@@ -174,8 +195,6 @@ class StreamsUpgradeTest(Test):
                                    timeout_sec=60,
                                    err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED'
on" + str(node.account))
 
-        self.driver.stop()
-
     @matrix(from_version=metadata_1_versions, to_version=backward_compatible_metadata_2_versions)
     @matrix(from_version=metadata_1_versions, to_version=metadata_3_or_higher_versions)
     @matrix(from_version=metadata_2_versions, to_version=metadata_3_or_higher_versions)
@@ -219,7 +238,6 @@ class StreamsUpgradeTest(Test):
 
         # shutdown
         self.driver.stop()
-        self.driver.wait()
 
         random.shuffle(self.processors)
         for p in self.processors:
@@ -230,8 +248,6 @@ class StreamsUpgradeTest(Test):
                                    timeout_sec=60,
                                    err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED'
on" + str(node.account))
 
-        self.driver.stop()
-
     def test_version_probing_upgrade(self):
         """
         Starts 3 KafkaStreams instances, and upgrades one-by-one to "future version"
@@ -276,7 +292,6 @@ class StreamsUpgradeTest(Test):
 
         # shutdown
         self.driver.stop()
-        self.driver.wait()
 
         random.shuffle(self.processors)
         for p in self.processors:
@@ -287,8 +302,6 @@ class StreamsUpgradeTest(Test):
                                    timeout_sec=60,
                                    err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED'
on" + str(node.account))
 
-        self.driver.stop()
-
     def update_leader(self):
         self.leader = None
         retries = 10
@@ -329,7 +342,7 @@ class StreamsUpgradeTest(Test):
                 log_monitor.wait_until(kafka_version_str,
                                        timeout_sec=60,
                                        err_msg="Could not detect Kafka Streams version "
+ version + " " + str(node1.account))
-                monitor.wait_until("processed 100 records from topic",
+                monitor.wait_until("processed [0-9]* records from topic",
                                    timeout_sec=60,
                                    err_msg="Never saw output 'processed 100 records from
topic' on" + str(node1.account))
 
@@ -343,10 +356,10 @@ class StreamsUpgradeTest(Test):
                     log_monitor.wait_until(kafka_version_str,
                                            timeout_sec=60,
                                            err_msg="Could not detect Kafka Streams version
" + version + " " + str(node2.account))
-                    first_monitor.wait_until("processed 100 records from topic",
+                    first_monitor.wait_until("processed [0-9]* records from topic",
                                              timeout_sec=60,
                                              err_msg="Never saw output 'processed 100 records
from topic' on" + str(node1.account))
-                    second_monitor.wait_until("processed 100 records from topic",
+                    second_monitor.wait_until("processed [0-9]* records from topic",
                                               timeout_sec=60,
                                               err_msg="Never saw output 'processed 100 records
from topic' on" + str(node2.account))
 
@@ -361,13 +374,13 @@ class StreamsUpgradeTest(Test):
                         log_monitor.wait_until(kafka_version_str,
                                                timeout_sec=60,
                                                err_msg="Could not detect Kafka Streams version
" + version + " " + str(node3.account))
-                        first_monitor.wait_until("processed 100 records from topic",
+                        first_monitor.wait_until("processed [0-9]* records from topic",
                                                  timeout_sec=60,
                                                  err_msg="Never saw output 'processed 100
records from topic' on" + str(node1.account))
-                        second_monitor.wait_until("processed 100 records from topic",
+                        second_monitor.wait_until("processed [0-9]* records from topic",
                                                   timeout_sec=60,
                                                   err_msg="Never saw output 'processed 100
records from topic' on" + str(node2.account))
-                        third_monitor.wait_until("processed 100 records from topic",
+                        third_monitor.wait_until("processed [0-9]* records from topic",
                                                   timeout_sec=60,
                                                   err_msg="Never saw output 'processed 100
records from topic' on" + str(node3.account))
 
@@ -582,3 +595,11 @@ class StreamsUpgradeTest(Test):
             found = list(p.node.account.ssh_capture("grep \"Sent a version 4 subscription
and group leader.s latest supported version is 5. Upgrading subscription metadata version
to 5 for next rebalance.\" " + p.LOG_FILE, allow_fail=True))
             if len(found) > 0:
                 raise Exception("Kafka Streams failed with 'group member upgraded to metadata
4 too early'")
+
+    def get_topics_count(self):
+        count = 0
+        for node in self.kafka.nodes:
+            topic_list = self.kafka.list_topics("placeholder", node)
+            for topic in topic_list:
+                count += 1
+        return count
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index 6eed81f..264eec5 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -111,4 +111,9 @@ LATEST_1_1 = V_1_1_1
 
 # 2.0.x versions
 V_2_0_0 = KafkaVersion("2.0.0")
-LATEST_2_0 = V_2_0_0
+V_2_0_1 = KafkaVersion("2.0.1")
+LATEST_2_0 = V_2_0_1
+
+# 2.1.x versions
+V_2_1_0 = KafkaVersion("2.1.0")
+LATEST_2_1 = V_2_1_0
diff --git a/vagrant/base.sh b/vagrant/base.sh
index 4429096..59e890c 100755
--- a/vagrant/base.sh
+++ b/vagrant/base.sh
@@ -120,8 +120,10 @@ get_kafka 1.0.2 2.11
 chmod a+rw /opt/kafka-1.0.2
 get_kafka 1.1.1 2.11
 chmod a+rw /opt/kafka-1.1.1
-get_kafka 2.0.0 2.12
-chmod a+rw /opt/kafka-2.0.0
+get_kafka 2.0.1 2.12
+chmod a+rw /opt/kafka-2.0.1
+get_kafka 2.1.0 2.12
+chmod a+rw /opt/kafka-2.1.0
 
 
 # For EC2 nodes, we want to use /mnt, which should have the local disk. On local


Mime
View raw message