kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-8496: System test for KIP-429 upgrades and compatibility (#7529)
Date Thu, 17 Oct 2019 05:30:28 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new f38d47b  KAFKA-8496: System test for KIP-429 upgrades and compatibility (#7529)
f38d47b is described below

commit f38d47bf074ec5c23576b0124068d3b8206c903e
Author: Bill Bejeck <bill@confluent.io>
AuthorDate: Thu Oct 17 01:29:33 2019 -0400

    KAFKA-8496: System test for KIP-429 upgrades and compatibility (#7529)
    
    Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../assignment/AssignorConfiguration.java          |   4 +-
 .../StreamsUpgradeToCooperativeRebalanceTest.java  | 136 ++++++++++++++
 .../StreamsUpgradeToCooperativeRebalanceTest.java  |  90 +++++++++
 .../StreamsUpgradeToCooperativeRebalanceTest.java  |  89 +++++++++
 .../StreamsUpgradeToCooperativeRebalanceTest.java  |  85 +++++++++
 .../StreamsUpgradeToCooperativeRebalanceTest.java  |  87 +++++++++
 .../StreamsUpgradeToCooperativeRebalanceTest.java  | 136 ++++++++++++++
 .../StreamsUpgradeToCooperativeRebalanceTest.java  | 136 ++++++++++++++
 .../StreamsUpgradeToCooperativeRebalanceTest.java  | 136 ++++++++++++++
 .../StreamsUpgradeToCooperativeRebalanceTest.java  | 136 ++++++++++++++
 .../StreamsUpgradeToCooperativeRebalanceTest.java  | 136 ++++++++++++++
 .../StreamsUpgradeToCooperativeRebalanceTest.java  | 125 ++++++++++++
 tests/kafkatest/services/streams.py                |  85 +++++++++
 .../streams_cooperative_rebalance_upgrade_test.py  | 209 +++++++++++++++++++++
 tests/kafkatest/version.py                         |   1 +
 15 files changed, 1589 insertions(+), 2 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
index 1e406e2..be56707 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
@@ -147,13 +147,13 @@ public final class AssignorConfiguration {
                 case StreamsConfig.UPGRADE_FROM_21:
                 case StreamsConfig.UPGRADE_FROM_22:
                 case StreamsConfig.UPGRADE_FROM_23:
-                    log.info("Turning off cooperative rebalancing for upgrade from {}.x", upgradeFrom);
+                    log.info("Eager rebalancing enabled now for upgrade from {}.x", upgradeFrom);
                     return RebalanceProtocol.EAGER;
                 default:
                     throw new IllegalArgumentException("Unknown configuration value for parameter 'upgrade.from': " + upgradeFrom);
             }
         }
-
+        log.info("Cooperative rebalancing enabled now");
         return RebalanceProtocol.COOPERATIVE;
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
new file mode 100644
index 0000000..df73593
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.processor.TaskMetadata;
+import org.apache.kafka.streams.processor.ThreadMetadata;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+public class StreamsUpgradeToCooperativeRebalanceTest {
+
+
+    @SuppressWarnings("unchecked")
+    public static void main(final String[] args) throws Exception {
+        if (args.length < 1) {
+            System.err.println("StreamsUpgradeToCooperativeRebalanceTest requires one argument (properties-file) but no args provided");
+        }
+        System.out.println("Args are " + Arrays.toString(args));
+        final String propFileName = args[0];
+        final Properties streamsProperties = Utils.loadProps(propFileName);
+
+        final Properties config = new Properties();
+        System.out.println("StreamsTest instance started (StreamsUpgradeToCooperativeRebalanceTest)");
+        System.out.println("props=" + streamsProperties);
+
+        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade");
+        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        config.putAll(streamsProperties);
+
+        final String sourceTopic = streamsProperties.getProperty("source.topic", "source");
+        final String sinkTopic = streamsProperties.getProperty("sink.topic", "sink");
+        final String taskDelimiter = "#";
+        final int reportInterval = Integer.parseInt(streamsProperties.getProperty("report.interval", "100"));
+        final String upgradePhase = streamsProperties.getProperty("upgrade.phase", "");
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        builder.<String, String>stream(sourceTopic)
+            .peek(new ForeachAction<String, String>() {
+                int recordCounter = 0;
+
+                @Override
+                public void apply(final String key, final String value) {
+                    if (recordCounter++ % reportInterval == 0) {
+                        System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter));
+                        System.out.flush();
+                    }
+                }
+            }
+            ).to(sinkTopic);
+
+        final KafkaStreams streams = new KafkaStreams(builder.build(), config);
+
+        streams.setStateListener((newState, oldState) -> {
+            if (newState == State.RUNNING && oldState == State.REBALANCING) {
+                System.out.println(String.format("%sSTREAMS in a RUNNING State", upgradePhase));
+                final Set<ThreadMetadata> allThreadMetadata = streams.localThreadsMetadata();
+                final StringBuilder taskReportBuilder = new StringBuilder();
+                final List<String> activeTasks = new ArrayList<>();
+                final List<String> standbyTasks = new ArrayList<>();
+                for (final ThreadMetadata threadMetadata : allThreadMetadata) {
+                    getTasks(threadMetadata.activeTasks(), activeTasks);
+                    if (!threadMetadata.standbyTasks().isEmpty()) {
+                        getTasks(threadMetadata.standbyTasks(), standbyTasks);
+                    }
+                }
+                addTasksToBuilder(activeTasks, taskReportBuilder);
+                taskReportBuilder.append(taskDelimiter);
+                if (!standbyTasks.isEmpty()) {
+                    addTasksToBuilder(standbyTasks, taskReportBuilder);
+                }
+                System.out.println("TASK-ASSIGNMENTS:" + taskReportBuilder);
+            }
+
+            if (newState == State.REBALANCING) {
+                System.out.println(String.format("%sStarting a REBALANCE", upgradePhase));
+            }
+        });
+
+
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            streams.close();
+            System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase));
+            System.out.flush();
+        }));
+    }
+
+    private static void addTasksToBuilder(final List<String> tasks, final StringBuilder builder) {
+        if (!tasks.isEmpty()) {
+            for (final String task : tasks) {
+                builder.append(task).append(",");
+            }
+            builder.setLength(builder.length() - 1);
+        }
+    }
+
+    private static void getTasks(final Set<TaskMetadata> taskMetadata,
+                                 final List<String> taskList) {
+        for (final TaskMetadata task : taskMetadata) {
+            final Set<TopicPartition> topicPartitions = task.topicPartitions();
+            for (final TopicPartition topicPartition : topicPartitions) {
+                taskList.add(topicPartition.toString());
+            }
+        }
+    }
+}
diff --git a/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
new file mode 100644
index 0000000..da63d9a
--- /dev/null
+++ b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+
+import java.util.Properties;
+
+public class StreamsUpgradeToCooperativeRebalanceTest {
+
+
+    @SuppressWarnings("unchecked")
+    public static void main(final String[] args) throws Exception {
+        if (args.length < 3) {
+            System.err.println("StreamsUpgradeTest requires three argument (kafka-url, zookeeper-url, properties-file) but only " + args.length + " provided: "
+                + (args.length > 0 ? args[0] + " " : "")
+                + (args.length > 1 ? args[1] : ""));
+        }
+
+        final String zookeeper = args[1];
+        final String propFileName = args.length > 2 ? args[2] : null;
+
+        final Properties streamsProperties = Utils.loadProps(propFileName);
+        final Properties config = new Properties();
+
+        System.out.println("StreamsTest instance started (StreamsUpgradeToCooperativeRebalanceTest v0.10.0)");
+        System.out.println("zookeeper=" + zookeeper);
+        System.out.println("props=" + config);
+
+        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade");
+        config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        config.putAll(streamsProperties);
+
+        final String sourceTopic = config.getProperty("source.topic", "source");
+        final String sinkTopic = config.getProperty("sink.topic", "sink");
+        final int reportInterval = Integer.parseInt(config.getProperty("report.interval", "100"));
+        final String upgradePhase = config.getProperty("upgrade.phase",  "");
+
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        final KStream<String, String> upgradeStream = builder.stream(sourceTopic);
+        upgradeStream.foreach(new ForeachAction<String, String>() {
+            int recordCounter = 0;
+
+            @Override
+            public void apply(final String key, final String value) {
+                if (recordCounter++ % reportInterval == 0) {
+                    System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter));
+                    System.out.flush();
+                }
+            }
+        }
+        );
+        upgradeStream.to(sinkTopic);
+
+        final KafkaStreams streams = new KafkaStreams(builder, config);
+
+
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            streams.close();
+            System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase));
+            System.out.flush();
+        }));
+    }
+}
diff --git a/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
new file mode 100644
index 0000000..70ad666
--- /dev/null
+++ b/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+
+import java.util.Properties;
+
+public class StreamsUpgradeToCooperativeRebalanceTest {
+
+
+    @SuppressWarnings("unchecked")
+    public static void main(final String[] args) throws Exception {
+        if (args.length < 3) {
+            System.err.println("StreamsUpgradeTest requires three argument (kafka-url, zookeeper-url, properties-file) but only " + args.length + " provided: "
+                + (args.length > 0 ? args[0] + " " : "")
+                + (args.length > 1 ? args[1] : ""));
+        }
+        final String zookeeper = args[1];
+        final String propFileName = args.length > 2 ? args[2] : null;
+
+        final Properties streamsProperties = Utils.loadProps(propFileName);
+        final Properties config = new Properties();
+
+        System.out.println("StreamsTest instance started (StreamsUpgradeToCooperativeRebalanceTest v0.10.1)");
+        System.out.println("zookeeper=" + zookeeper);
+        System.out.println("props=" + config);
+
+        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade");
+        config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        config.putAll(streamsProperties);
+
+        final String sourceTopic = config.getProperty("source.topic", "source");
+        final String sinkTopic = config.getProperty("sink.topic", "sink");
+        final int reportInterval = Integer.parseInt(config.getProperty("report.interval", "100"));
+        final String upgradePhase = config.getProperty("upgrade.phase",  "");
+
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        final KStream<String, String> upgradeStream = builder.stream(sourceTopic);
+        upgradeStream.foreach(new ForeachAction<String, String>() {
+            int recordCounter = 0;
+
+            @Override
+            public void apply(final String key, final String value) {
+                if (recordCounter++ % reportInterval == 0) {
+                    System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter));
+                    System.out.flush();
+                }
+            }
+        }
+        );
+        upgradeStream.to(sinkTopic);
+
+        final KafkaStreams streams = new KafkaStreams(builder, config);
+
+
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            streams.close();
+            System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase));
+            System.out.flush();
+        }));
+    }
+}
diff --git a/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
new file mode 100644
index 0000000..7525635
--- /dev/null
+++ b/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+
+import java.util.Properties;
+
+public class StreamsUpgradeToCooperativeRebalanceTest {
+
+
+    @SuppressWarnings("unchecked")
+    public static void main(final String[] args) throws Exception {
+        if (args.length < 2) {
+            System.err.println("StreamsUpgradeTest requires three argument (kafka-url, properties-file) but only " + args.length + " provided: "
+                + (args.length > 0 ? args[0] + " " : ""));
+        }
+        final String propFileName = args[1];
+
+        final Properties streamsProperties = Utils.loadProps(propFileName);
+        final Properties config = new Properties();
+
+        System.out.println("StreamsTest instance started (StreamsUpgradeToCooperativeRebalanceTest v0.10.2)");
+        System.out.println("props=" + config);
+
+        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade");
+        config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        config.putAll(streamsProperties);
+
+        final String sourceTopic = config.getProperty("source.topic", "source");
+        final String sinkTopic = config.getProperty("sink.topic", "sink");
+        final int reportInterval = Integer.parseInt(config.getProperty("report.interval", "100"));
+        final String upgradePhase = config.getProperty("upgrade.phase",  "");
+
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        final KStream<String, String> upgradeStream = builder.stream(sourceTopic);
+        upgradeStream.foreach(new ForeachAction<String, String>() {
+            int recordCounter = 0;
+
+            @Override
+            public void apply(final String key, final String value) {
+                if (recordCounter++ % reportInterval == 0) {
+                    System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter));
+                    System.out.flush();
+                }
+            }
+        }
+        );
+        upgradeStream.to(sinkTopic);
+
+        final KafkaStreams streams = new KafkaStreams(builder, config);
+
+
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            streams.close();
+            System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase));
+            System.out.flush();
+        }));
+    }
+}
diff --git a/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
new file mode 100644
index 0000000..a465963
--- /dev/null
+++ b/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+
+import java.util.Properties;
+
+public class StreamsUpgradeToCooperativeRebalanceTest {
+
+
+    @SuppressWarnings("unchecked")
+    public static void main(final String[] args) throws Exception {
+        if (args.length < 2) {
+            System.err.println("StreamsUpgradeTest requires three argument (kafka-url, properties-file) but only " + args.length + " provided: "
+                + (args.length > 0 ? args[0] + " " : ""));
+        }
+        final String kafka = args[0];
+        final String propFileName = args[1];
+
+        final Properties streamsProperties = Utils.loadProps(propFileName);
+        final Properties config = new Properties();
+
+        System.out.println("StreamsTest instance started (StreamsUpgradeToCooperativeRebalanceTest v0.11.0)");
+        System.out.println("kafka=" + kafka);
+        System.out.println("props=" + config);
+
+        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade");
+        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        config.putAll(streamsProperties);
+
+        final String sourceTopic = config.getProperty("source.topic", "source");
+        final String sinkTopic = config.getProperty("sink.topic", "sink");
+        final int reportInterval = Integer.parseInt(config.getProperty("report.interval", "100"));
+        final String upgradePhase = config.getProperty("upgrade.phase",  "");
+
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        final KStream<String, String> upgradeStream = builder.stream(sourceTopic);
+        upgradeStream.foreach(new ForeachAction<String, String>() {
+            int recordCounter = 0;
+
+            @Override
+            public void apply(final String key, final String value) {
+                if (recordCounter++ % reportInterval == 0) {
+                    System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter));
+                    System.out.flush();
+                }
+            }
+        }
+        );
+        upgradeStream.to(sinkTopic);
+
+        final KafkaStreams streams = new KafkaStreams(builder, config);
+
+
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            streams.close();
+            System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase));
+            System.out.flush();
+        }));
+    }
+}
diff --git a/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
new file mode 100644
index 0000000..09e8458
--- /dev/null
+++ b/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.processor.TaskMetadata;
+import org.apache.kafka.streams.processor.ThreadMetadata;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+public class StreamsUpgradeToCooperativeRebalanceTest {
+
+
+    @SuppressWarnings("unchecked")
+    public static void main(final String[] args) throws Exception {
+        if (args.length < 2) {
+            System.err.println("StreamsUpgradeToCooperativeRebalanceTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+                + (args.length > 0 ? args[0] : ""));
+        }
+        System.out.println("Args are " + Arrays.toString(args));
+        final String propFileName = args[1];
+        final Properties streamsProperties = Utils.loadProps(propFileName);
+
+        final Properties config = new Properties();
+        System.out.println("StreamsTest instance started (StreamsUpgradeToCooperativeRebalanceTest v1.0)");
+        System.out.println("props=" + streamsProperties);
+
+        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade");
+        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        config.putAll(streamsProperties);
+
+        final String sourceTopic = streamsProperties.getProperty("source.topic", "source");
+        final String sinkTopic = streamsProperties.getProperty("sink.topic", "sink");
+        final String taskDelimiter = streamsProperties.getProperty("task.delimiter", "#");
+        final int reportInterval = Integer.parseInt(streamsProperties.getProperty("report.interval", "100"));
+        final String upgradePhase = streamsProperties.getProperty("upgrade.phase",  "");
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        builder.<String, String>stream(sourceTopic)
+            .peek(new ForeachAction<String, String>() {
+                int recordCounter = 0;
+
+                @Override
+                public void apply(final String key, final String value) {
+                    if (recordCounter++ % reportInterval == 0) {
+                        System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter));
+                        System.out.flush();
+                    }
+                }
+            }
+            ).to(sinkTopic);
+
+        final KafkaStreams streams = new KafkaStreams(builder.build(), config);
+
+        streams.setStateListener((newState, oldState) -> {
+            if (newState == State.RUNNING && oldState == State.REBALANCING) {
+                System.out.println(String.format("%sSTREAMS in a RUNNING State", upgradePhase));
+                final Set<ThreadMetadata> allThreadMetadata = streams.localThreadsMetadata();
+                final StringBuilder taskReportBuilder = new StringBuilder();
+                final List<String> activeTasks = new ArrayList<>();
+                final List<String> standbyTasks = new ArrayList<>();
+                for (final ThreadMetadata threadMetadata : allThreadMetadata) {
+                    getTasks(threadMetadata.activeTasks(), activeTasks);
+                    if (!threadMetadata.standbyTasks().isEmpty()) {
+                        getTasks(threadMetadata.standbyTasks(), standbyTasks);
+                    }
+                }
+                addTasksToBuilder(activeTasks, taskReportBuilder);
+                taskReportBuilder.append(taskDelimiter);
+                if (!standbyTasks.isEmpty()) {
+                    addTasksToBuilder(standbyTasks, taskReportBuilder);
+                }
+                System.out.println("TASK-ASSIGNMENTS:" + taskReportBuilder);
+            }
+
+            if (newState == State.REBALANCING) {
+                System.out.println(String.format("%sStarting a REBALANCE", upgradePhase));
+            }
+        });
+
+
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            streams.close();
+            System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase));
+            System.out.flush();
+        }));
+    }
+
+    private static void addTasksToBuilder(final List<String> tasks, final StringBuilder builder) {
+        if (!tasks.isEmpty()) {
+            for (final String task : tasks) {
+                builder.append(task).append(",");
+            }
+            builder.setLength(builder.length() - 1);
+        }
+    }
+    private static void getTasks(final Set<TaskMetadata> taskMetadata,
+                                 final List<String> taskList) {
+        for (final TaskMetadata task : taskMetadata) {
+            final Set<TopicPartition> topicPartitions = task.topicPartitions();
+            for (final TopicPartition topicPartition : topicPartitions) {
+                taskList.add(topicPartition.toString());
+            }
+        }
+    }
+}
diff --git a/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
new file mode 100644
index 0000000..afade75
--- /dev/null
+++ b/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.processor.TaskMetadata;
+import org.apache.kafka.streams.processor.ThreadMetadata;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+public class StreamsUpgradeToCooperativeRebalanceTest {
+
+
+    @SuppressWarnings("unchecked")
+    public static void main(final String[] args) throws Exception {
+        if (args.length < 2) {
+            System.err.println("StreamsUpgradeToCooperativeRebalanceTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+                + (args.length > 0 ? args[0] : ""));
+        }
+        System.out.println("Args are " + Arrays.toString(args));
+        final String propFileName = args[1];
+        final Properties streamsProperties = Utils.loadProps(propFileName);
+
+        final Properties config = new Properties();
+        System.out.println("StreamsTest instance started (StreamsUpgradeToCooperativeRebalanceTest v1.1)");
+        System.out.println("props=" + streamsProperties);
+
+        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade");
+        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        config.putAll(streamsProperties);
+
+        final String sourceTopic = streamsProperties.getProperty("source.topic", "source");
+        final String sinkTopic = streamsProperties.getProperty("sink.topic", "sink");
+        final String taskDelimiter = streamsProperties.getProperty("task.delimiter", "#");
+        final int reportInterval = Integer.parseInt(streamsProperties.getProperty("report.interval", "100"));
+        final String upgradePhase = streamsProperties.getProperty("upgrade.phase",  "");
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        builder.<String, String>stream(sourceTopic)
+            .peek(new ForeachAction<String, String>() {
+                int recordCounter = 0;
+
+                @Override
+                public void apply(final String key, final String value) {
+                    if (recordCounter++ % reportInterval == 0) {
+                        System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter));
+                        System.out.flush();
+                    }
+                }
+            }
+            ).to(sinkTopic);
+
+        final KafkaStreams streams = new KafkaStreams(builder.build(), config);
+
+        streams.setStateListener((newState, oldState) -> {
+            if (newState == State.RUNNING && oldState == State.REBALANCING) {
+                System.out.println(String.format("%sSTREAMS in a RUNNING State", upgradePhase));
+                final Set<ThreadMetadata> allThreadMetadata = streams.localThreadsMetadata();
+                final StringBuilder taskReportBuilder = new StringBuilder();
+                final List<String> activeTasks = new ArrayList<>();
+                final List<String> standbyTasks = new ArrayList<>();
+                for (final ThreadMetadata threadMetadata : allThreadMetadata) {
+                    getTasks(threadMetadata.activeTasks(), activeTasks);
+                    if (!threadMetadata.standbyTasks().isEmpty()) {
+                        getTasks(threadMetadata.standbyTasks(), standbyTasks);
+                    }
+                }
+                addTasksToBuilder(activeTasks, taskReportBuilder);
+                taskReportBuilder.append(taskDelimiter);
+                if (!standbyTasks.isEmpty()) {
+                    addTasksToBuilder(standbyTasks, taskReportBuilder);
+                }
+                System.out.println("TASK-ASSIGNMENTS:" + taskReportBuilder);
+            }
+
+            if (newState == State.REBALANCING) {
+                System.out.println(String.format("%sStarting a REBALANCE", upgradePhase));
+            }
+        });
+
+
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            streams.close();
+            System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase));
+            System.out.flush();
+        }));
+    }
+
+    private static void addTasksToBuilder(final List<String> tasks, final StringBuilder builder) {
+        if (!tasks.isEmpty()) {
+            for (final String task : tasks) {
+                builder.append(task).append(",");
+            }
+            builder.setLength(builder.length() - 1);
+        }
+    }
+    private static void getTasks(final Set<TaskMetadata> taskMetadata,
+                                 final List<String> taskList) {
+        for (final TaskMetadata task : taskMetadata) {
+            final Set<TopicPartition> topicPartitions = task.topicPartitions();
+            for (final TopicPartition topicPartition : topicPartitions) {
+                taskList.add(topicPartition.toString());
+            }
+        }
+    }
+}
diff --git a/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
new file mode 100644
index 0000000..5b4069a
--- /dev/null
+++ b/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.processor.TaskMetadata;
+import org.apache.kafka.streams.processor.ThreadMetadata;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+public class StreamsUpgradeToCooperativeRebalanceTest {
+
+
+    @SuppressWarnings("unchecked")
+    public static void main(final String[] args) throws Exception {
+        if (args.length < 2) {
+            System.err.println("StreamsUpgradeToCooperativeRebalanceTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+                + (args.length > 0 ? args[0] : ""));
+        }
+        System.out.println("Args are " + Arrays.toString(args));
+        final String propFileName = args[1];
+        final Properties streamsProperties = Utils.loadProps(propFileName);
+
+        final Properties config = new Properties();
+        System.out.println("StreamsTest instance started (StreamsUpgradeToCooperativeRebalanceTest v2.0)");
+        System.out.println("props=" + streamsProperties);
+
+        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade");
+        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        config.putAll(streamsProperties);
+
+        final String sourceTopic = streamsProperties.getProperty("source.topic", "source");
+        final String sinkTopic = streamsProperties.getProperty("sink.topic", "sink");
+        final String taskDelimiter = streamsProperties.getProperty("task.delimiter", "#");
+        final int reportInterval = Integer.parseInt(streamsProperties.getProperty("report.interval", "100"));
+        final String upgradePhase = streamsProperties.getProperty("upgrade.phase",  "");
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        builder.<String, String>stream(sourceTopic)
+            .peek(new ForeachAction<String, String>() {
+                int recordCounter = 0;
+
+                @Override
+                public void apply(final String key, final String value) {
+                    if (recordCounter++ % reportInterval == 0) {
+                        System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter));
+                        System.out.flush();
+                    }
+                }
+            }
+            ).to(sinkTopic);
+
+        final KafkaStreams streams = new KafkaStreams(builder.build(), config);
+
+        streams.setStateListener((newState, oldState) -> {
+            if (newState == State.RUNNING && oldState == State.REBALANCING) {
+                System.out.println(String.format("%sSTREAMS in a RUNNING State", upgradePhase));
+                final Set<ThreadMetadata> allThreadMetadata = streams.localThreadsMetadata();
+                final StringBuilder taskReportBuilder = new StringBuilder();
+                final List<String> activeTasks = new ArrayList<>();
+                final List<String> standbyTasks = new ArrayList<>();
+                for (final ThreadMetadata threadMetadata : allThreadMetadata) {
+                    getTasks(threadMetadata.activeTasks(), activeTasks);
+                    if (!threadMetadata.standbyTasks().isEmpty()) {
+                        getTasks(threadMetadata.standbyTasks(), standbyTasks);
+                    }
+                }
+                addTasksToBuilder(activeTasks, taskReportBuilder);
+                taskReportBuilder.append(taskDelimiter);
+                if (!standbyTasks.isEmpty()) {
+                    addTasksToBuilder(standbyTasks, taskReportBuilder);
+                }
+                System.out.println("TASK-ASSIGNMENTS:" + taskReportBuilder);
+            }
+
+            if (newState == State.REBALANCING) {
+                System.out.println(String.format("%sStarting a REBALANCE", upgradePhase));
+            }
+        });
+
+
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            streams.close();
+            System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase));
+            System.out.flush();
+        }));
+    }
+
+    private static void addTasksToBuilder(final List<String> tasks, final StringBuilder builder) {
+        if (!tasks.isEmpty()) {
+            for (final String task : tasks) {
+                builder.append(task).append(",");
+            }
+            builder.setLength(builder.length() - 1);
+        }
+    }
+    private static void getTasks(final Set<TaskMetadata> taskMetadata,
+                                 final List<String> taskList) {
+        for (final TaskMetadata task : taskMetadata) {
+            final Set<TopicPartition> topicPartitions = task.topicPartitions();
+            for (final TopicPartition topicPartition : topicPartitions) {
+                taskList.add(topicPartition.toString());
+            }
+        }
+    }
+}
diff --git a/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
new file mode 100644
index 0000000..174c34f
--- /dev/null
+++ b/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.processor.TaskMetadata;
+import org.apache.kafka.streams.processor.ThreadMetadata;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+public class StreamsUpgradeToCooperativeRebalanceTest {
+
+
+    @SuppressWarnings("unchecked")
+    public static void main(final String[] args) throws Exception {
+        if (args.length < 2) {
+            System.err.println("StreamsUpgradeToCooperativeRebalanceTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+                + (args.length > 0 ? args[0] : ""));
+        }
+        System.out.println("Args are " + Arrays.toString(args));
+        final String propFileName = args[1];
+        final Properties streamsProperties = Utils.loadProps(propFileName);
+
+        final Properties config = new Properties();
+        System.out.println("StreamsTest instance started (StreamsUpgradeToCooperativeRebalanceTest v2.2)");
+        System.out.println("props=" + streamsProperties);
+
+        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade");
+        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        config.putAll(streamsProperties);
+
+        final String sourceTopic = streamsProperties.getProperty("source.topic", "source");
+        final String sinkTopic = streamsProperties.getProperty("sink.topic", "sink");
+        final String taskDelimiter = streamsProperties.getProperty("task.delimiter", "#");
+        final int reportInterval = Integer.parseInt(streamsProperties.getProperty("report.interval", "100"));
+        final String upgradePhase = streamsProperties.getProperty("upgrade.phase",  "");
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        builder.<String, String>stream(sourceTopic)
+            .peek(new ForeachAction<String, String>() {
+                int recordCounter = 0;
+
+                @Override
+                public void apply(final String key, final String value) {
+                    if (recordCounter++ % reportInterval == 0) {
+                        System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter));
+                        System.out.flush();
+                    }
+                }
+            }
+            ).to(sinkTopic);
+
+        final KafkaStreams streams = new KafkaStreams(builder.build(), config);
+
+        streams.setStateListener((newState, oldState) -> {
+            if (newState == State.RUNNING && oldState == State.REBALANCING) {
+                System.out.println(String.format("%sSTREAMS in a RUNNING State", upgradePhase));
+                final Set<ThreadMetadata> allThreadMetadata = streams.localThreadsMetadata();
+                final StringBuilder taskReportBuilder = new StringBuilder();
+                final List<String> activeTasks = new ArrayList<>();
+                final List<String> standbyTasks = new ArrayList<>();
+                for (final ThreadMetadata threadMetadata : allThreadMetadata) {
+                    getTasks(threadMetadata.activeTasks(), activeTasks);
+                    if (!threadMetadata.standbyTasks().isEmpty()) {
+                        getTasks(threadMetadata.standbyTasks(), standbyTasks);
+                    }
+                }
+                addTasksToBuilder(activeTasks, taskReportBuilder);
+                taskReportBuilder.append(taskDelimiter);
+                if (!standbyTasks.isEmpty()) {
+                    addTasksToBuilder(standbyTasks, taskReportBuilder);
+                }
+                System.out.println("TASK-ASSIGNMENTS:" + taskReportBuilder);
+            }
+
+            if (newState == State.REBALANCING) {
+                System.out.println(String.format("%sStarting a REBALANCE", upgradePhase));
+            }
+        });
+
+
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            streams.close();
+            System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase));
+            System.out.flush();
+        }));
+    }
+
+    private static void addTasksToBuilder(final List<String> tasks, final StringBuilder builder) {
+        if (!tasks.isEmpty()) {
+            for (final String task : tasks) {
+                builder.append(task).append(",");
+            }
+            builder.setLength(builder.length() - 1);
+        }
+    }
+    private static void getTasks(final Set<TaskMetadata> taskMetadata,
+                                 final List<String> taskList) {
+        for (final TaskMetadata task : taskMetadata) {
+            final Set<TopicPartition> topicPartitions = task.topicPartitions();
+            for (final TopicPartition topicPartition : topicPartitions) {
+                taskList.add(topicPartition.toString());
+            }
+        }
+    }
+}
diff --git a/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
new file mode 100644
index 0000000..174c34f
--- /dev/null
+++ b/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.processor.TaskMetadata;
+import org.apache.kafka.streams.processor.ThreadMetadata;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+public class StreamsUpgradeToCooperativeRebalanceTest {
+
+
+    @SuppressWarnings("unchecked")
+    public static void main(final String[] args) throws Exception {
+        if (args.length < 2) {
+            System.err.println("StreamsUpgradeToCooperativeRebalanceTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+                + (args.length > 0 ? args[0] : ""));
+        }
+        System.out.println("Args are " + Arrays.toString(args));
+        final String propFileName = args[1];
+        final Properties streamsProperties = Utils.loadProps(propFileName);
+
+        final Properties config = new Properties();
+        System.out.println("StreamsTest instance started (StreamsUpgradeToCooperativeRebalanceTest v2.2)");
+        System.out.println("props=" + streamsProperties);
+
+        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade");
+        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        config.putAll(streamsProperties);
+
+        final String sourceTopic = streamsProperties.getProperty("source.topic", "source");
+        final String sinkTopic = streamsProperties.getProperty("sink.topic", "sink");
+        final String taskDelimiter = streamsProperties.getProperty("task.delimiter", "#");
+        final int reportInterval = Integer.parseInt(streamsProperties.getProperty("report.interval", "100"));
+        final String upgradePhase = streamsProperties.getProperty("upgrade.phase",  "");
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        builder.<String, String>stream(sourceTopic)
+            .peek(new ForeachAction<String, String>() {
+                int recordCounter = 0;
+
+                @Override
+                public void apply(final String key, final String value) {
+                    if (recordCounter++ % reportInterval == 0) {
+                        System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter));
+                        System.out.flush();
+                    }
+                }
+            }
+            ).to(sinkTopic);
+
+        final KafkaStreams streams = new KafkaStreams(builder.build(), config);
+
+        streams.setStateListener((newState, oldState) -> {
+            if (newState == State.RUNNING && oldState == State.REBALANCING) {
+                System.out.println(String.format("%sSTREAMS in a RUNNING State", upgradePhase));
+                final Set<ThreadMetadata> allThreadMetadata = streams.localThreadsMetadata();
+                final StringBuilder taskReportBuilder = new StringBuilder();
+                final List<String> activeTasks = new ArrayList<>();
+                final List<String> standbyTasks = new ArrayList<>();
+                for (final ThreadMetadata threadMetadata : allThreadMetadata) {
+                    getTasks(threadMetadata.activeTasks(), activeTasks);
+                    if (!threadMetadata.standbyTasks().isEmpty()) {
+                        getTasks(threadMetadata.standbyTasks(), standbyTasks);
+                    }
+                }
+                addTasksToBuilder(activeTasks, taskReportBuilder);
+                taskReportBuilder.append(taskDelimiter);
+                if (!standbyTasks.isEmpty()) {
+                    addTasksToBuilder(standbyTasks, taskReportBuilder);
+                }
+                System.out.println("TASK-ASSIGNMENTS:" + taskReportBuilder);
+            }
+
+            if (newState == State.REBALANCING) {
+                System.out.println(String.format("%sStarting a REBALANCE", upgradePhase));
+            }
+        });
+
+
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            streams.close();
+            System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase));
+            System.out.flush();
+        }));
+    }
+
+    private static void addTasksToBuilder(final List<String> tasks, final StringBuilder builder) {
+        if (!tasks.isEmpty()) {
+            for (final String task : tasks) {
+                builder.append(task).append(",");
+            }
+            builder.setLength(builder.length() - 1);
+        }
+    }
+    private static void getTasks(final Set<TaskMetadata> taskMetadata,
+                                 final List<String> taskList) {
+        for (final TaskMetadata task : taskMetadata) {
+            final Set<TopicPartition> topicPartitions = task.topicPartitions();
+            for (final TopicPartition topicPartition : topicPartitions) {
+                taskList.add(topicPartition.toString());
+            }
+        }
+    }
+}
diff --git a/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
new file mode 100644
index 0000000..0aff8e5
--- /dev/null
+++ b/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.processor.TaskMetadata;
+import org.apache.kafka.streams.processor.ThreadMetadata;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.Set;
+
+public class StreamsUpgradeToCooperativeRebalanceTest {
+
+
+    @SuppressWarnings("unchecked")
+    public static void main(final String[] args) throws Exception {
+        if (args.length < 2) {
+            System.err.println("StreamsUpgradeToCooperativeRebalanceTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+                + (args.length > 0 ? args[0] : ""));
+        }
+        System.out.println("Args are " + Arrays.toString(args));
+        final String propFileName = args[1];
+        final Properties streamsProperties = Utils.loadProps(propFileName);
+
+        final Properties config = new Properties();
+        System.out.println("StreamsTest instance started (StreamsUpgradeToCooperativeRebalanceTest v2.3)");
+        System.out.println("props=" + streamsProperties);
+
+        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade");
+        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        config.putAll(streamsProperties);
+
+        final String sourceTopic = streamsProperties.getProperty("source.topic", "source");
+        final String sinkTopic = streamsProperties.getProperty("sink.topic", "sink");
+        final String threadDelimiter = streamsProperties.getProperty("thread.delimiter", "&");
+        final String taskDelimiter = streamsProperties.getProperty("task.delimiter", "#");
+        final int reportInterval = Integer.parseInt(streamsProperties.getProperty("report.interval", "100"));
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        builder.<String, String>stream(sourceTopic)
+            .peek(new ForeachAction<String, String>() {
+                int recordCounter = 0;
+
+                @Override
+                public void apply(final String key, final String value) {
+                    if (recordCounter++ % reportInterval == 0) {
+                        System.out.println(String.format("Processed %d records so far", recordCounter));
+                        System.out.flush();
+                    }
+                }
+            }
+            ).to(sinkTopic);
+
+        final KafkaStreams streams = new KafkaStreams(builder.build(), config);
+
+        streams.setStateListener((newState, oldState) -> {
+            if (newState == State.RUNNING && oldState == State.REBALANCING) {
+                System.out.println("STREAMS in a RUNNING State");
+                final Set<ThreadMetadata> allThreadMetadata = streams.localThreadsMetadata();
+                final StringBuilder taskReportBuilder = new StringBuilder();
+                for (final ThreadMetadata threadMetadata : allThreadMetadata) {
+                    buildTaskAssignmentReport(taskReportBuilder, threadMetadata.activeTasks(), "ACTIVE-TASKS:");
+                    if (!threadMetadata.standbyTasks().isEmpty()) {
+                        taskReportBuilder.append(taskDelimiter);
+                        buildTaskAssignmentReport(taskReportBuilder, threadMetadata.standbyTasks(), "STANDBY-TASKS:");
+                    }
+                    taskReportBuilder.append(threadDelimiter);
+                }
+                taskReportBuilder.setLength(taskReportBuilder.length() - 1);
+                System.out.println("TASK-ASSIGNMENTS:" + taskReportBuilder);
+            }
+
+            if (newState == State.REBALANCING) {
+                System.out.println("Starting a REBALANCE");
+            }
+        });
+
+
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            streams.close();
+            System.out.println("COOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED");
+            System.out.flush();
+        }));
+    }
+
+    private static void buildTaskAssignmentReport(final StringBuilder taskReportBuilder,
+                                                  final Set<TaskMetadata> taskMetadata,
+                                                  final String taskType) {
+        taskReportBuilder.append(taskType);
+        for (final TaskMetadata task : taskMetadata) {
+            final Set<TopicPartition> topicPartitions = task.topicPartitions();
+            for (final TopicPartition topicPartition : topicPartitions) {
+                taskReportBuilder.append(topicPartition.toString()).append(",");
+            }
+        }
+        taskReportBuilder.setLength(taskReportBuilder.length() - 1);
+    }
+}
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index 48bac70..52afe4e 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -536,6 +536,7 @@ class StreamsNamedRepartitionTopicService(StreamsTestBaseService):
         cfg = KafkaConfig(**properties)
         return cfg.render()
 
+
 class StaticMemberTestService(StreamsTestBaseService):
     def __init__(self, test_context, kafka, group_instance_id, num_threads):
         super(StaticMemberTestService, self).__init__(test_context,
@@ -556,3 +557,87 @@ class StaticMemberTestService(StreamsTestBaseService):
 
         cfg = KafkaConfig(**properties)
         return cfg.render()
+
+
+class CooperativeRebalanceUpgradeService(StreamsTestBaseService):
+    def __init__(self, test_context, kafka):
+        super(CooperativeRebalanceUpgradeService, self).__init__(test_context,
+                                                                 kafka,
+                                                                 "org.apache.kafka.streams.tests.StreamsUpgradeToCooperativeRebalanceTest",
+                                                                 "")
+        self.UPGRADE_FROM = None
+        # these properties will be overridden in test
+        self.SOURCE_TOPIC = None
+        self.SINK_TOPIC = None
+        self.TASK_DELIMITER = "#"
+        self.REPORT_INTERVAL = None
+
+        self.standby_tasks = None
+        self.active_tasks = None
+        self.upgrade_phase = None
+
+    def set_tasks(self, task_string):
+        label = "TASK-ASSIGNMENTS:"
+        task_string_substr = task_string[len(label):]
+        all_tasks = task_string_substr.split(self.TASK_DELIMITER)
+        self.active_tasks = set(all_tasks[0].split(","))
+        if len(all_tasks) > 1:
+            self.standby_tasks = set(all_tasks[1].split(","))
+
+    def set_version(self, kafka_streams_version):
+        self.KAFKA_STREAMS_VERSION = kafka_streams_version
+
+    def set_upgrade_phase(self, upgrade_phase):
+        self.upgrade_phase = upgrade_phase
+
+    def start_cmd(self, node):
+        args = self.args.copy()
+        if self.KAFKA_STREAMS_VERSION in [str(LATEST_0_10_0), str(LATEST_0_10_1), str(LATEST_0_10_2),
+                                          str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1),
+                                          str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3)]:
+            args['kafka'] = self.kafka.bootstrap_servers()
+        else:
+            args['kafka'] = ""
+        if self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_0) or self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_1):
+            args['zk'] = self.kafka.zk.connect_setting()
+        else:
+            args['zk'] = ""
+        args['config_file'] = self.CONFIG_FILE
+        args['stdout'] = self.STDOUT_FILE
+        args['stderr'] = self.STDERR_FILE
+        args['pidfile'] = self.PID_FILE
+        args['log4j'] = self.LOG4J_CONFIG_FILE
+        args['version'] = self.KAFKA_STREAMS_VERSION
+        args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
+
+        cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
+              "INCLUDE_TEST_JARS=true UPGRADE_KAFKA_STREAMS_TEST_VERSION=%(version)s " \
+              " %(kafka_run_class)s %(streams_class_name)s %(kafka)s %(zk)s %(config_file)s " \
+              " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
+
+        self.logger.info("Executing: " + cmd)
+
+        return cmd
+
+    def prop_file(self):
+        properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
+                      streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers()}
+
+        if self.UPGRADE_FROM is not None:
+            properties['upgrade.from'] = self.UPGRADE_FROM
+        else:
+            try:
+                del properties['upgrade.from']
+            except KeyError:
+                self.logger.info("Key 'upgrade.from' not there, better safe than sorry")
+
+        if self.upgrade_phase is not None:
+            properties['upgrade.phase'] = self.upgrade_phase
+
+        properties['source.topic'] = self.SOURCE_TOPIC
+        properties['sink.topic'] = self.SINK_TOPIC
+        properties['task.delimiter'] = self.TASK_DELIMITER
+        properties['report.interval'] = self.REPORT_INTERVAL
+
+        cfg = KafkaConfig(**properties)
+        return cfg.render()
diff --git a/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py b/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py
new file mode 100644
index 0000000..3128d21
--- /dev/null
+++ b/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py
@@ -0,0 +1,209 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+from ducktape.mark import matrix
+from ducktape.tests.test import Test
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \
+    LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, DEV_BRANCH, DEV_VERSION, KafkaVersion
+from kafkatest.services.streams import CooperativeRebalanceUpgradeService
+from kafkatest.tests.streams.utils import verify_stopped, stop_processors, verify_running
+
+
+class StreamsCooperativeRebalanceUpgradeTest(Test):
+    """
+    Test of a rolling upgrade from eager rebalance to
+    cooperative rebalance
+    """
+
+    source_topic = "source"
+    sink_topic = "sink"
+    task_delimiter = "#"
+    report_interval = "1000"
+    processing_message = "Processed [0-9]* records so far"
+    stopped_message = "COOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED"
+    running_state_msg = "STREAMS in a RUNNING State"
+    cooperative_turned_off_msg = "Eager rebalancing enabled now for upgrade from %s"
+    cooperative_enabled_msg = "Cooperative rebalancing enabled now"
+    first_bounce_phase = "first_bounce_phase-"
+    second_bounce_phase = "second_bounce_phase-"
+
+    streams_eager_rebalance_upgrade_versions = [str(LATEST_0_10_0), str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0),
+                                                str(LATEST_1_0), str(LATEST_1_1), str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2),
+                                                str(LATEST_2_3)]
+
+    def __init__(self, test_context):
+        super(StreamsCooperativeRebalanceUpgradeTest, self).__init__(test_context)
+        self.topics = {
+            self.source_topic: {'partitions': 9},
+            self.sink_topic: {'partitions': 9}
+        }
+
+        self.zookeeper = ZookeeperService(self.test_context, num_nodes=1)
+        self.kafka = KafkaService(self.test_context, num_nodes=3,
+                                  zk=self.zookeeper, topics=self.topics)
+
+        self.producer = VerifiableProducer(self.test_context,
+                                           1,
+                                           self.kafka,
+                                           self.source_topic,
+                                           throughput=1000,
+                                           acks=1)
+
+    @matrix(upgrade_from_version=streams_eager_rebalance_upgrade_versions)
+    def test_upgrade_to_cooperative_rebalance(self, upgrade_from_version):
+        self.zookeeper.start()
+        self.kafka.start()
+
+        processor1 = CooperativeRebalanceUpgradeService(self.test_context, self.kafka)
+        processor2 = CooperativeRebalanceUpgradeService(self.test_context, self.kafka)
+        processor3 = CooperativeRebalanceUpgradeService(self.test_context, self.kafka)
+
+        processors = [processor1, processor2, processor3]
+
+        # produce records continually during the test
+        self.producer.start()
+
+        # start all processors without upgrade_from config; normal operations mode
+        self.logger.info("Starting all streams clients in normal running mode")
+        for processor in processors:
+            processor.set_version(upgrade_from_version)
+            self.set_props(processor)
+            processor.CLEAN_NODE_ENABLED = False
+            # can't use state as older version don't have state listener
+            # so just verify up and running
+            verify_running(processor, self.processing_message)
+
+        # all running rebalancing has ceased
+        for processor in processors:
+            self.verify_processing(processor, self.processing_message)
+
+        # first rolling bounce with "upgrade.from" config set
+        previous_phase = ""
+        self.maybe_upgrade_rolling_bounce_and_verify(processors,
+                                                     previous_phase,
+                                                     self.first_bounce_phase,
+                                                     upgrade_from_version)
+
+        # All nodes processing, rebalancing has ceased
+        for processor in processors:
+            self.verify_processing(processor, self.first_bounce_phase + self.processing_message)
+
+        # second rolling bounce without "upgrade.from" config
+        self.maybe_upgrade_rolling_bounce_and_verify(processors,
+                                                     self.first_bounce_phase,
+                                                     self.second_bounce_phase)
+
+        # All nodes processing, rebalancing has ceased
+        for processor in processors:
+            self.verify_processing(processor, self.second_bounce_phase + self.processing_message)
+
+        # now verify tasks are unique
+        for processor in processors:
+            self.get_tasks_for_processor(processor)
+            self.logger.info("Active tasks %s" % processor.active_tasks)
+
+        overlapping_tasks = processor1.active_tasks.intersection(processor2.active_tasks)
+        assert len(overlapping_tasks) == int(0), \
+            "Final task assignments are not unique %s %s" % (processor1.active_tasks, processor2.active_tasks)
+
+        overlapping_tasks = processor1.active_tasks.intersection(processor3.active_tasks)
+        assert len(overlapping_tasks) == int(0), \
+            "Final task assignments are not unique %s %s" % (processor1.active_tasks, processor3.active_tasks)
+
+        overlapping_tasks = processor2.active_tasks.intersection(processor3.active_tasks)
+        assert len(overlapping_tasks) == int(0), \
+            "Final task assignments are not unique %s %s" % (processor2.active_tasks, processor3.active_tasks)
+
+        # test done close all down
+        stop_processors(processors, self.second_bounce_phase + self.stopped_message)
+
+        self.producer.stop()
+        self.kafka.stop()
+        self.zookeeper.stop()
+
+    def maybe_upgrade_rolling_bounce_and_verify(self,
+                                                processors,
+                                                previous_phase,
+                                                current_phase,
+                                                upgrade_from_version=None):
+        for processor in processors:
+            # stop the processor in prep for setting "update.from" or removing "update.from"
+            verify_stopped(processor, previous_phase + self.stopped_message)
+            # upgrade to version with cooperative rebalance
+            processor.set_version("")
+            processor.set_upgrade_phase(current_phase)
+
+            if upgrade_from_version is not None:
+                # need to remove minor version numbers for check of valid upgrade from numbers
+                upgrade_version = upgrade_from_version[:upgrade_from_version.rfind('.')]
+                rebalance_mode_msg = self.cooperative_turned_off_msg % upgrade_version
+            else:
+                upgrade_version = None
+                rebalance_mode_msg = self.cooperative_enabled_msg
+
+            self.set_props(processor, upgrade_version)
+            node = processor.node
+            with node.account.monitor_log(processor.STDOUT_FILE) as stdout_monitor:
+                with node.account.monitor_log(processor.LOG_FILE) as log_monitor:
+                    processor.start()
+                    # verify correct rebalance mode either turned off for upgrade or enabled after upgrade
+                    log_monitor.wait_until(rebalance_mode_msg,
+                                           timeout_sec=60,
+                                           err_msg="Never saw '%s' message " % rebalance_mode_msg + str(processor.node.account))
+
+                # verify rebalanced into a running state
+                rebalance_msg = current_phase + self.running_state_msg
+                stdout_monitor.wait_until(rebalance_msg,
+                                          timeout_sec=60,
+                                          err_msg="Never saw '%s' message " % rebalance_msg + str(
+                                              processor.node.account))
+
+                # verify processing
+                verify_processing_msg = current_phase + self.processing_message
+                stdout_monitor.wait_until(verify_processing_msg,
+                                          timeout_sec=60,
+                                          err_msg="Never saw '%s' message " % verify_processing_msg + str(
+                                              processor.node.account))
+
+    def verify_processing(self, processor, pattern):
+        self.logger.info("Verifying %s processing pattern in STDOUT_FILE" % pattern)
+        with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor:
+            monitor.wait_until(pattern,
+                               timeout_sec=60,
+                               err_msg="Never saw processing of %s " % pattern + str(processor.node.account))
+
+    def get_tasks_for_processor(self, processor):
+        retries = 0
+        while retries < 5:
+            found_tasks = list(processor.node.account.ssh_capture("grep TASK-ASSIGNMENTS %s | tail -n 1" % processor.STDOUT_FILE, allow_fail=True))
+            self.logger.info("Returned %s from assigned task check" % found_tasks)
+            if len(found_tasks) > 0:
+                task_string = str(found_tasks[0]).strip()
+                self.logger.info("Converted %s from assigned task check" % task_string)
+                processor.set_tasks(task_string)
+                return
+            retries += 1
+            time.sleep(1)
+        return
+
+    def set_props(self, processor, upgrade_from=None):
+        processor.SOURCE_TOPIC = self.source_topic
+        processor.SINK_TOPIC = self.sink_topic
+        processor.REPORT_INTERVAL = self.report_interval
+        processor.UPGRADE_FROM = upgrade_from
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index 573e2ed..8deceac 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -129,4 +129,5 @@ LATEST_2_2 = V_2_2_1
 
 # 2.3.x versions
 V_2_3_0 = KafkaVersion("2.3.0")
+V_2_3_1 = KafkaVersion("2.3.1")
 LATEST_2_3 = V_2_3_0


Mime
View raw message