kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0 (#4779)
Date Sat, 07 Apr 2018 00:00:56 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0c0d836  KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0 (#4779)
0c0d836 is described below

commit 0c0d8363e5787e97cce5e0b9b86486d737a6890c
Author: Matthias J. Sax <mjsax@apache.org>
AuthorDate: Fri Apr 6 17:00:52 2018 -0700

    KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0 (#4779)
    
    Reviewers: Guozhang Wang <guozhang@confluent.io>, Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>, Damian Guy <damian@confluent.io>
---
 bin/kafka-run-class.sh                             |  40 ++-
 build.gradle                                       |  60 ++++
 checkstyle/suppressions.xml                        |   2 +-
 docs/streams/upgrade-guide.html                    |  59 +++-
 docs/upgrade.html                                  | 195 ++++++++++-
 gradle/dependencies.gradle                         |  16 +-
 settings.gradle                                    |   4 +-
 .../org/apache/kafka/streams/StreamsConfig.java    |  17 +
 .../internals/StreamsPartitionAssignor.java        |   9 +
 .../apache/kafka/streams/StreamsConfigTest.java    |   9 +-
 .../KStreamAggregationDedupIntegrationTest.java    |   2 -
 .../internals/StreamsPartitionAssignorTest.java    |  95 +++---
 .../internals/assignment/AssignmentInfoTest.java   |   1 -
 .../kafka/streams/tests/SmokeTestClient.java       | 171 +++++-----
 .../kafka/streams/tests/SmokeTestDriver.java       |  40 ++-
 .../apache/kafka/streams/tests/SmokeTestUtil.java  |  15 +-
 .../kafka/streams/tests/StreamsSmokeTest.java      |  17 +-
 .../kafka/streams/tests/StreamsUpgradeTest.java    |  69 ++++
 .../kafka/streams/tests/StreamsUpgradeTest.java    | 107 ++++++
 .../kafka/streams/tests/StreamsUpgradeTest.java    | 110 +++++++
 .../kafka/streams/tests/StreamsUpgradeTest.java    | 104 ++++++
 .../kafka/streams/tests/StreamsUpgradeTest.java    | 104 ++++++
 .../kafka/streams/tests/StreamsUpgradeTest.java    | 104 ++++++
 tests/docker/Dockerfile                            |  26 +-
 tests/kafkatest/services/streams.py                | 184 ++++++++++-
 .../tests/streams/streams_upgrade_test.py          | 357 ++++++++++++++++-----
 tests/kafkatest/version.py                         |   3 +-
 vagrant/base.sh                                    |  10 +-
 28 files changed, 1636 insertions(+), 294 deletions(-)

diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index bb786da..4dd0923 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -69,28 +69,50 @@ do
   fi
 done
 
-for file in "$base_dir"/clients/build/libs/kafka-clients*.jar;
-do
-  if should_include_file "$file"; then
-    CLASSPATH="$CLASSPATH":"$file"
-  fi
-done
+if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
+  clients_lib_dir=$(dirname $0)/../clients/build/libs
+  streams_lib_dir=$(dirname $0)/../streams/build/libs
+  rocksdb_lib_dir=$(dirname $0)/../streams/build/dependant-libs-${SCALA_VERSION}
+else
+  clients_lib_dir=/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs
+  streams_lib_dir=$clients_lib_dir
+  rocksdb_lib_dir=$streams_lib_dir
+fi
+
 
-for file in "$base_dir"/streams/build/libs/kafka-streams*.jar;
+for file in "$clients_lib_dir"/kafka-clients*.jar;
 do
   if should_include_file "$file"; then
     CLASSPATH="$CLASSPATH":"$file"
   fi
 done
 
-for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;
+for file in "$streams_lib_dir"/kafka-streams*.jar;
 do
   if should_include_file "$file"; then
     CLASSPATH="$CLASSPATH":"$file"
   fi
 done
 
-for file in "$base_dir"/streams/build/dependant-libs-${SCALA_VERSION}/rocksdb*.jar;
+if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
+  for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;
+  do
+    if should_include_file "$file"; then
+      CLASSPATH="$CLASSPATH":"$file"
+    fi
+  done
+else
+  VERSION_NO_DOTS=`echo $UPGRADE_KAFKA_STREAMS_TEST_VERSION | sed 's/\.//g'`
+  SHORT_VERSION_NO_DOTS=${VERSION_NO_DOTS:0:((${#VERSION_NO_DOTS} - 1))} # remove last char, ie, bug-fix number
+  for file in "$base_dir"/streams/upgrade-system-tests-$SHORT_VERSION_NO_DOTS/build/libs/kafka-streams-upgrade-system-tests*.jar;
+  do
+    if should_include_file "$file"; then
+      CLASSPATH="$CLASSPATH":"$file"
+    fi
+  done
+fi
+
+for file in "$rocksdb_lib_dir"/rocksdb*.jar;
 do
   CLASSPATH="$CLASSPATH":"$file"
 done
diff --git a/build.gradle b/build.gradle
index 33fa7a7..f8daf2f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1027,6 +1027,66 @@ project(':streams:examples') {
   }
 }
 
+project(':streams:upgrade-system-tests-0100') {
+  archivesBaseName = "kafka-streams-upgrade-system-tests-0100"
+
+  dependencies {
+    testCompile libs.kafkaStreams_0100
+  }
+
+  systemTestLibs {
+    dependsOn testJar
+  }
+}
+
+project(':streams:upgrade-system-tests-0101') {
+  archivesBaseName = "kafka-streams-upgrade-system-tests-0101"
+
+  dependencies {
+    testCompile libs.kafkaStreams_0101
+  }
+
+  systemTestLibs {
+    dependsOn testJar
+  }
+}
+
+project(':streams:upgrade-system-tests-0102') {
+  archivesBaseName = "kafka-streams-upgrade-system-tests-0102"
+
+  dependencies {
+    testCompile libs.kafkaStreams_0102
+  }
+
+  systemTestLibs {
+    dependsOn testJar
+  }
+}
+
+project(':streams:upgrade-system-tests-0110') {
+  archivesBaseName = "kafka-streams-upgrade-system-tests-0110"
+
+  dependencies {
+    testCompile libs.kafkaStreams_0110
+  }
+
+  systemTestLibs {
+    dependsOn testJar
+  }
+}
+
+project(':streams:upgrade-system-tests-10') {
+  archivesBaseName = "kafka-streams-upgrade-system-tests-10"
+
+  dependencies {
+    testCompile libs.kafkaStreams_10
+  }
+
+  systemTestLibs {
+    dependsOn testJar
+  }
+}
+
 project(':jmh-benchmarks') {
 
   apply plugin: 'com.github.johnrengelman.shadow'
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index e3bf151..0fec810 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -189,7 +189,7 @@
               files="SmokeTestDriver.java"/>
 
     <suppress checks="NPathComplexity"
-              files="KStreamKStreamJoinTest.java"/>
+              files="KStreamKStreamJoinTest.java|SmokeTestDriver.java"/>
     <suppress checks="NPathComplexity"
               files="KStreamKStreamLeftJoinTest.java"/>
 
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index d21d505..fdc0af1 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -40,37 +40,64 @@
     </p>
 
     <p>
-        If you want to upgrade from 1.0.x to 1.1.0 and you have customized window store implementations on the <code>ReadOnlyWindowStore</code> interface
+        If you want to upgrade from 1.0.x to 1.2.0 and you have customized window store implementations on the <code>ReadOnlyWindowStore</code> interface
         you'd need to update your code to incorporate the newly added public APIs.
         Otherwise, if you are using Java 7 you don't need to make any code changes as the public API is fully backward compatible;
         but if you are using Java 8 method references in your Kafka Streams code you might need to update your code to resolve method ambiguities.
         Hot-swaping the jar-file only might not work for this case.
-        See <a href="#streams_api_changes_110">below</a> for a complete list of 1.1.0 API and semantic changes that allow you to advance your application and/or simplify your code base.
+        See below a complete list of <a href="#streams_api_changes_120">1.2.0</a> and <a href="#streams_api_changes_110">1.1.0</a>
+        API and semantic changes that allow you to advance your application and/or simplify your code base.
     </p>
 
     <p>
-        If you want to upgrade from 0.11.0.x to 1.0.0 you don't need to make any code changes as the public API is fully backward compatible.
+        If you want to upgrade from 0.10.2.x or 0.11.0.x to 1.2.x and you have customized window store implementations on the <code>ReadOnlyWindowStore</code> interface
+        you'd need to update your code to incorporate the newly added public APIs.
+        Otherwise, if you are using Java 7 you don't need to do any code changes as the public API is fully backward compatible;
+        but if you are using Java 8 method references in your Kafka Streams code you might need to update your code to resolve method ambiguities.
         However, some public APIs were deprecated and thus it is recommended to update your code eventually to allow for future upgrades.
-        See <a href="#streams_api_changes_100">below</a> for a complete list of 1.0.0 API and semantic changes that allow you to advance your application and/or simplify your code base.
-    </p>
-
-    <p>
-        If you want to upgrade from 0.10.2.x to 0.11.0 you don't need to make any code changes as the public API is fully backward compatible.
-        However, some configuration parameters were deprecated and thus it is recommended to update your code eventually to allow for future upgrades.
-        See <a href="#streams_api_changes_0110">below</a> for a complete list of 0.11.0 API and semantic changes that allow you to advance your application and/or simplify your code base.
+        See below a complete list of <a href="#streams_api_changes_120">1.2</a>, <a href="#streams_api_changes_110">1.1</a>,
+        <a href="#streams_api_changes_100">1.0</a>, and <a href="#streams_api_changes_0110">0.11.0</a> API
+        and semantic changes that allow you to advance your application and/or simplify your code base, including the usage of new features.
+        Additionally, Streams API 1.1.x requires broker on-disk message format version 0.10 or higher; thus, you need to make sure that the message
+        format is configured correctly before you upgrade your Kafka Streams application.
     </p>
 
     <p>
-        If you want to upgrade from 0.10.1.x to 0.10.2, see the <a href="/{{version}}/documentation/#upgrade_1020_streams"><b>Upgrade Section for 0.10.2</b></a>.
-        It highlights incompatible changes you need to consider to upgrade your code and application.
-        See <a href="#streams_api_changes_0102">below</a> for a complete list of 0.10.2 API and semantic changes that allow you to advance your application and/or simplify your code base.
+        If you want to upgrade from 0.10.1.x to 1.2.x see the Upgrade Sections for <a href="/{{version}}/documentation/#upgrade_1020_streams"><b>0.10.2</b></a>,
+        <a href="/{{version}}/documentation/#upgrade_1100_streams"><b>0.11.0</b></a>,
+        <a href="/{{version}}/documentation/#upgrade_100_streams"><b>1.0</b></a>,
+        <a href="/{{version}}/documentation/#upgrade_100_streams"><b>1.0</b></a>, and
+        <a href="/{{version}}/documentation/#upgrade_110_streams"><b>1.2</b></a>.
+        Note, that a brokers on-disk message format must be on version 0.10 or higher to run a Kafka Streams application version 1.2 or higher.
+        See below a complete list of <a href="#streams_api_changes_0102">0.10.2</a>, <a href="#streams_api_changes_0110">0.11.0</a>,
+        <a href="#streams_api_changes_100">1.0</a>, <a href="#streams_api_changes_110">1.1</a>, and <a href="#streams_api_changes_120">1.2</a>
+        API and semantical changes that allow you to advance your application and/or simplify your code base, including the usage of new features.
     </p>
 
     <p>
-        If you want to upgrade from 0.10.0.x to 0.10.1, see the <a href="/{{version}}/documentation/#upgrade_1010_streams"><b>Upgrade Section for 0.10.1</b></a>.
-        It highlights incompatible changes you need to consider to upgrade your code and application.
-        See <a href="#streams_api_changes_0101">below</a> a complete list of 0.10.1 API changes that allow you to advance your application and/or simplify your code base, including the usage of new features.
+        Upgrading from 0.10.0.x to 1.2.0 directly is also possible.
+        Note, that a brokers must be on version 0.10.1 or higher and on-disk message format must be on version 0.10 or higher
+        to run a Kafka Streams application version 1.2 or higher.
+        See <a href="#streams_api_changes_0101">Streams API changes in 0.10.1</a>, <a href="#streams_api_changes_0102">Streams API changes in 0.10.2</a>,
+        <a href="#streams_api_changes_0110">Streams API changes in 0.11.0</a>, <a href="#streams_api_changes_100">Streams API changes in 1.0</a>, and
+        <a href="#streams_api_changes_110">Streams API changes in 1.1</a>, and <a href="#streams_api_changes_120">Streams API changes in 1.2</a>
+        for a complete list of API changes.
+        Upgrading to 1.2.0 requires two rolling bounces with config <code>upgrade.from="0.10.0"</code> set for first upgrade phase
+        (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade">KIP-268</a>).
+        As an alternative, an offline upgrade is also possible.
     </p>
+    <ul>
+        <li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from</code> is set to <code>"0.10.0"</code> for new version 1.2.0</li>
+        <li> bounce each instance of your application once </li>
+        <li> prepare your newly deployed 1.2.0 application instances for a second round of rolling bounces; make sure to remove the value for config <code>upgrade.mode</code> </li>
+        <li> bounce each instance of your application once more to complete the upgrade </li>
+    </ul>
+    <p> Upgrading from 0.10.0.x to 1.2.0 in offline mode: </p>
+    <ul>
+        <li> stop all old (0.10.0.x) application instances </li>
+        <li> update your code and swap old code and jar file with new code and new jar file </li>
+        <li> restart all new (1.2.0) application instances </li>
+    </ul>
 
     <!-- TODO: verify release verion and update `id` and `href` attributes (also at other places that link to this headline) -->
     <h3><a id="streams_api_changes_120" href="#streams_api_changes_120">Streams API changes in 1.2.0</a></h3>
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 0c5f5fd..95f2c41 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -68,6 +68,7 @@
 <ul>
     <li><a href="https://cwiki.apache.org/confluence/x/oYtjB">KIP-186</a> increases the default offset retention time from 1 day to 7 days. This makes it less likely to "lose" offsets in an application that commits infrequently. It also increases the active set of offsets and therefore can increase memory usage on the broker. Note that the console consumer currently enables offset commit by default and can be the source of a large number of offsets which this change will now preserve for [...]
     <li><a href="https://issues.apache.org/jira/browse/KAFKA-5674">KAFKA-5674</a> extends the lower interval of <code>max.connections.per.ip minimum</code> to zero and therefore allows IP-based filtering of inbound connections.</li>
+    <li> New Kafka Streams configuration parameter <code>upgrade.from</code> added that allows rolling bounce upgrade from older version. </li>
 </ul>
 
 <h5><a id="upgrade_120_new_protocols" href="#upgrade_120_new_protocols">New Protocol Versions</a></h5>
@@ -76,7 +77,7 @@
 <h5><a id="upgrade_120_streams" href="#upgrade_120_streams">Upgrading a 1.2.0 Kafka Streams Application</a></h5>
 <ul>
     <li> Upgrading your Streams application from 1.1.0 to 1.2.0 does not require a broker upgrade.
-        A Kafka Streams 1.2.0 application can connect to 1.2, 1.1, 1.0, 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though). </li>
+         A Kafka Streams 1.2.0 application can connect to 1.2, 1.1, 1.0, 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though). </li>
     <li> See <a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_120">Streams API changes in 1.2.0</a> for more details. </li>
 </ul>
 
@@ -125,6 +126,14 @@
         Hot-swaping the jar-file only might not work.</li>
 </ol>
 
+<!-- TODO add if 1.1.1 gets release
+<h5><a id="upgrade_111_notable" href="#upgrade_111_notable">Notable changes in 1.1.1</a></h5>
+<ul>
+    <li> New Kafka Streams configuration parameter <code>upgrade.from</code> added that allows rolling bounce upgrade from version 0.10.0.x </li>
+    <li> See the <a href="/{{version}}/documentation/streams/upgrade-guide.html"><b>Kafka Streams upgrade guide</b></a> for details about this new config.
+</ul>
+-->
+
 <h5><a id="upgrade_110_notable" href="#upgrade_110_notable">Notable changes in 1.1.0</a></h5>
 <ul>
     <li>The kafka artifact in Maven no longer depends on log4j or slf4j-log4j12. Similarly to the kafka-clients artifact, users
@@ -196,6 +205,14 @@
         Similarly for the message format version.</li>
 </ol>
 
+<!-- TODO add if 1.0.2 gets release
+<h5><a id="upgrade_102_notable" href="#upgrade_102_notable">Notable changes in 1.0.2</a></h5>
+<ul>
+    <li> New Kafka Streams configuration parameter <code>upgrade.from</code> added that allows rolling bounce upgrade from version 0.10.0.x </li>
+    <li> See the <a href="/{{version}}/documentation/streams/upgrade-guide.html"><b>Kafka Streams upgrade guide</b></a> for details about this new config.
+</ul>
+-->
+
 <h5><a id="upgrade_101_notable" href="#upgrade_101_notable">Notable changes in 1.0.1</a></h5>
 <ul>
     <li>Restored binary compatibility of AdminClient's Options classes (e.g. CreateTopicsOptions, DeleteTopicsOptions, etc.) with
@@ -261,17 +278,74 @@
          be used if the SaslHandshake request version is greater than 0. </li>
 </ul>
 
-<h5><a id="upgrade_100_streams" href="#upgrade_100_streams">Upgrading a 1.0.0 Kafka Streams Application</a></h5>
+<h5><a id="upgrade_100_streams" href="#upgrade_100_streams">Upgrading a 0.11.0 Kafka Streams Application</a></h5>
 <ul>
     <li> Upgrading your Streams application from 0.11.0 to 1.0.0 does not require a broker upgrade.
-        A Kafka Streams 1.0.0 application can connect to 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though).
-        However, Kafka Streams 1.0 requires 0.10 message format or newer and does not work with older message formats. </li>
+         A Kafka Streams 1.0.0 application can connect to 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though).
+         However, Kafka Streams 1.0 requires 0.10 message format or newer and does not work with older message formats. </li>
     <li> If you are monitoring on streams metrics, you will need make some changes to the metrics names in your reporting and monitoring code, because the metrics sensor hierarchy was changed. </li>
     <li> There are a few public APIs including <code>ProcessorContext#schedule()</code>, <code>Processor#punctuate()</code> and <code>KStreamBuilder</code>, <code>TopologyBuilder</code> are being deprecated by new APIs.
-        We recommend making corresponding code changes, which should be very minor since the new APIs look quite similar, when you upgrade.
+         We recommend making corresponding code changes, which should be very minor since the new APIs look quite similar, when you upgrade.
     <li> See <a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_100">Streams API changes in 1.0.0</a> for more details. </li>
 </ul>
 
+<h5><a id="upgrade_100_streams_from_0102" href="#upgrade_100_streams_from_0102">Upgrading a 0.10.2 Kafka Streams Application</a></h5>
+<ul>
+    <li> Upgrading your Streams application from 0.10.2 to 1.0 does not require a broker upgrade.
+         A Kafka Streams 1.0 application can connect to 1.0, 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though). </li>
+    <li> If you are monitoring on streams metrics, you will need make some changes to the metrics names in your reporting and monitoring code, because the metrics sensor hierarchy was changed. </li>
+    <li> There are a few public APIs including <code>ProcessorContext#schedule()</code>, <code>Processor#punctuate()</code> and <code>KStreamBuilder</code>, <code>TopologyBuilder</code> are being deprecated by new APIs.
+         We recommend making corresponding code changes, which should be very minor since the new APIs look quite similar, when you upgrade.
+    <li> If you specify customized <code>key.serde</code>, <code>value.serde</code> and <code>timestamp.extractor</code> in configs, it is recommended to use their replaced configure parameter as these configs are deprecated. </li>
+    <li> See <a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_0110">Streams API changes in 0.11.0</a> for more details. </li>
+</ul>
+
+<h5><a id="upgrade_100_streams_from_0101" href="#upgrade_1100_streams_from_0101">Upgrading a 0.10.1 Kafka Streams Application</a></h5>
+<ul>
+    <li> Upgrading your Streams application from 0.10.1 to 1.0 does not require a broker upgrade.
+         A Kafka Streams 1.0 application can connect to 1.0, 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though). </li>
+    <li> You need to recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application. </li>
+    <li> If you are monitoring on streams metrics, you will need make some changes to the metrics names in your reporting and monitoring code, because the metrics sensor hierarchy was changed. </li>
+    <li> There are a few public APIs including <code>ProcessorContext#schedule()</code>, <code>Processor#punctuate()</code> and <code>KStreamBuilder</code>, <code>TopologyBuilder</code> are being deprecated by new APIs.
+         We recommend making corresponding code changes, which should be very minor since the new APIs look quite similar, when you upgrade.
+    <li> If you specify customized <code>key.serde</code>, <code>value.serde</code> and <code>timestamp.extractor</code> in configs, it is recommended to use their replaced configure parameter as these configs are deprecated. </li>
+    <li> If you use a custom (i.e., user implemented) timestamp extractor, you will need to update this code, because the <code>TimestampExtractor</code> interface was changed. </li>
+    <li> If you register custom metrics, you will need to update this code, because the <code>StreamsMetric</code> interface was changed. </li>
+    <li> See <a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_100">Streams API changes in 1.0.0</a>,
+         <a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_0110">Streams API changes in 0.11.0</a> and
+         <a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_0102">Streams API changes in 0.10.2</a> for more details. </li>
+</ul>
+
+<h5><a id="upgrade_100_streams_from_0100" href="#upgrade_100_streams_from_0100">Upgrading a 0.10.0 Kafka Streams Application</a></h5>
+<ul>
+    <li> Upgrading your Streams application from 0.10.0 to 1.0 does require a <a href="#upgrade_10_1">broker upgrade</a> because a Kafka Streams 1.0 application can only connect to 0.1, 0.11.0, 0.10.2, or 0.10.1 brokers. </li>
+    <li> There are couple of API changes, that are not backward compatible (cf. <a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_100">Streams API changes in 1.0.0</a>,
+         <a href="/{{version}}/documentation/streams#streams_api_changes_0110">Streams API changes in 0.11.0</a>,
+         <a href="/{{version}}/documentation/streams#streams_api_changes_0102">Streams API changes in 0.10.2</a>, and
+         <a href="/{{version}}/documentation/streams#streams_api_changes_0101">Streams API changes in 0.10.1</a> for more details).
+         Thus, you need to update and recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application. </li>
+    <!-- TODO add if 1.0.2 gets release
+    <li> Upgrading from 0.10.0.x to 1.0.2 requires two rolling bounces with config <code>upgrade.from="0.10.0"</code> set for first upgrade phase
+        (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade">KIP-268</a>).
+        As an alternative, an offline upgrade is also possible.
+        <ul>
+            <li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from</code> is set to <code>"0.10.0"</code> for new version 0.11.0.3 </li>
+            <li> bounce each instance of your application once </li>
+            <li> prepare your newly deployed 1.0.2 application instances for a second round of rolling bounces; make sure to remove the value for config <code>upgrade.mode</code> </li>
+            <li> bounce each instance of your application once more to complete the upgrade </li>
+        </ul>
+    </li>
+    -->
+    <li> Upgrading from 0.10.0.x to 1.0.0 or 1.0.1 requires an offline upgrade (rolling bounce upgrade is not supported)
+
+        <ul>
+            <li> stop all old (0.10.0.x) application instances </li>
+            <li> update your code and swap old code and jar file with new code and new jar file </li>
+            <li> restart all new (1.0.0 or 1.0.1) application instances </li>
+        </ul>
+    </li>
+</ul>
+
 <h4><a id="upgrade_11_0_0" href="#upgrade_11_0_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x or 0.10.2.x to 0.11.0.0</a></h4>
 <p>Kafka 0.11.0.0 introduces a new message format version as well as wire protocol changes. By following the recommended rolling upgrade plan below,
   you guarantee no downtime during the upgrade. However, please review the <a href="#upgrade_1100_notable">notable changes in 0.11.0.0</a> before upgrading.
@@ -291,7 +365,7 @@
         <ul>
             <li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2, 0.9.0, 0.10.0, 0.10.1 or 0.10.2).</li>
             <li>log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION  (See <a href="#upgrade_10_performance_impact">potential performance impact
-		following the upgrade</a> for the details on what this configuration does.)</li>
+        following the upgrade</a> for the details on what this configuration does.)</li>
         </ul>
     </li>
     <li> Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. </li>
@@ -320,11 +394,59 @@
 <h5><a id="upgrade_1100_streams" href="#upgrade_1100_streams">Upgrading a 0.10.2 Kafka Streams Application</a></h5>
 <ul>
     <li> Upgrading your Streams application from 0.10.2 to 0.11.0 does not require a broker upgrade.
-        A Kafka Streams 0.11.0 application can connect to 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though). </li>
+         A Kafka Streams 0.11.0 application can connect to 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though). </li>
     <li> If you specify customized <code>key.serde</code>, <code>value.serde</code> and <code>timestamp.extractor</code> in configs, it is recommended to use their replaced configure parameter as these configs are deprecated. </li>
     <li> See <a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_0110">Streams API changes in 0.11.0</a> for more details. </li>
 </ul>
 
+<h5><a id="upgrade_1100_streams_from_0101" href="#upgrade_1100_streams_from_0101">Upgrading a 0.10.1 Kafka Streams Application</a></h5>
+<ul>
+    <li> Upgrading your Streams application from 0.10.1 to 0.11.0 does not require a broker upgrade.
+         A Kafka Streams 0.11.0 application can connect to 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though). </li>
+    <li> You need to recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application. </li>
+    <li> If you specify customized <code>key.serde</code>, <code>value.serde</code> and <code>timestamp.extractor</code> in configs, it is recommended to use their replaced configure parameter as these configs are deprecated. </li>
+    <li> If you use a custom (i.e., user implemented) timestamp extractor, you will need to update this code, because the <code>TimestampExtractor</code> interface was changed. </li>
+    <li> If you register custom metrics, you will need to update this code, because the <code>StreamsMetric</code> interface was changed. </li>
+    <li> See <a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_0110">Streams API changes in 0.11.0</a> and
+         <a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_0102">Streams API changes in 0.10.2</a> for more details. </li>
+</ul>
+
+<h5><a id="upgrade_1100_streams_from_0100" href="#upgrade_1100_streams_from_0100">Upgrading a 0.10.0 Kafka Streams Application</a></h5>
+<ul>
+    <li> Upgrading your Streams application from 0.10.0 to 0.11.0 does require a <a href="#upgrade_10_1">broker upgrade</a> because a Kafka Streams 0.11.0 application can only connect to 0.11.0, 0.10.2, or 0.10.1 brokers. </li>
+    <li> There are couple of API changes, that are not backward compatible (cf. <a href="/{{version}}/documentation/streams#streams_api_changes_0110">Streams API changes in 0.11.0</a>,
+         <a href="/{{version}}/documentation/streams#streams_api_changes_0102">Streams API changes in 0.10.2</a>, and
+         <a href="/{{version}}/documentation/streams#streams_api_changes_0101">Streams API changes in 0.10.1</a> for more details).
+         Thus, you need to update and recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application. </li>
+    <!-- TODO add if 0.11.0.3 gets release
+    <li> Upgrading from 0.10.0.x to 0.11.0.3 requires two rolling bounces with config <code>upgrade.from="0.10.0"</code> set for first upgrade phase
+        (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade">KIP-268</a>).
+        As an alternative, an offline upgrade is also possible.
+        <ul>
+            <li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from</code> is set to <code>"0.10.0"</code> for new version 0.11.0.3 </li>
+            <li> bounce each instance of your application once </li>
+            <li> prepare your newly deployed 0.11.0.3 application instances for a second round of rolling bounces; make sure to remove the value for config <code>upgrade.mode</code> </li>
+            <li> bounce each instance of your application once more to complete the upgrade </li>
+        </ul>
+    </li>
+    -->
+    <li> Upgrading from 0.10.0.x to 0.11.0.0, 0.11.0.1, or 0.11.0.2 requires an offline upgrade (rolling bounce upgrade is not supported)
+        <ul>
+            <li> stop all old (0.10.0.x) application instances </li>
+            <li> update your code and swap old code and jar file with new code and new jar file </li>
+            <li> restart all new (0.11.0.0 , 0.11.0.1, or 0.11.0.2) application instances </li>
+        </ul>
+    </li>
+</ul>
+
+<!-- TODO add if 0.11.0.3 gets release
+<h5><a id="upgrade_1103_notable" href="#upgrade_1103_notable">Notable changes in 0.11.0.3</a></h5>
+<ul>
+<li> New Kafka Streams configuration parameter <code>upgrade.from</code> added that allows rolling bounce upgrade from version 0.10.0.x </li>
+<li> See the <a href="/{{version}}/documentation/streams/upgrade-guide.html"><b>Kafka Streams upgrade guide</b></a> for details about this new config.
+</ul>
+-->
+
 <h5><a id="upgrade_1100_notable" href="#upgrade_1100_notable">Notable changes in 0.11.0.0</a></h5>
 <ul>
     <li>Unclean leader election is now disabled by default. The new default favors durability over availability. Users who wish to
@@ -475,6 +597,39 @@ Kafka cluster before upgrading your clients. Version 0.10.2 brokers support 0.8.
     <li> See <a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_0102">Streams API changes in 0.10.2</a> for more details. </li>
 </ul>
 
+<h5><a id="upgrade_1020_streams_from_0100" href="#upgrade_1020_streams_from_0100">Upgrading a 0.10.0 Kafka Streams Application</a></h5>
+<ul>
+    <li> Upgrading your Streams application from 0.10.0 to 0.10.2 does require a <a href="#upgrade_10_1">broker upgrade</a> because a Kafka Streams 0.10.2 application can only connect to 0.10.2 or 0.10.1 brokers. </li>
+    <li> There are couple of API changes, that are not backward compatible (cf. <a href="/{{version}}/documentation/streams#streams_api_changes_0102">Streams API changes in 0.10.2</a> for more details).
+         Thus, you need to update and recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application. </li>
+    <!-- TODO add if 0.10.2.2 gets release
+    <li> Upgrading from 0.10.0.x to 0.10.2.2 requires two rolling bounces with config <code>upgrade.from="0.10.0"</code> set for first upgrade phase
+         (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade">KIP-268</a>).
+         As an alternative, an offline upgrade is also possible.
+        <ul>
+            <li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from</code> is set to <code>"0.10.0"</code> for new version 0.10.2.2 </li>
+            <li> bounce each instance of your application once </li>
+            <li> prepare your newly deployed 0.10.2.2 application instances for a second round of rolling bounces; make sure to remove the value for config <code>upgrade.mode</code> </li>
+            <li> bounce each instance of your application once more to complete the upgrade </li>
+        </ul>
+    </li>
+    -->
+    <li> Upgrading from 0.10.0.x to 0.10.2.0 or 0.10.2.1 requires an offline upgrade (rolling bounce upgrade is not supported)
+        <ul>
+            <li> stop all old (0.10.0.x) application instances </li>
+            <li> update your code and swap old code and jar file with new code and new jar file </li>
+            <li> restart all new (0.10.2.0 or 0.10.2.1) application instances </li>
+        </ul>
+    </li>
+</ul>
+
+<!-- TODO add if 0.10.2.2 gets release
+<h5><a id="upgrade_10202_notable" href="#upgrade_10202_notable">Notable changes in 0.10.2.2</a></h5>
+<ul>
+<li> New configuration parameter <code>upgrade.from</code> added that allows rolling bounce upgrade from version 0.10.0.x </li>
+</ul>
+-->
+
 <h5><a id="upgrade_10201_notable" href="#upgrade_10201_notable">Notable changes in 0.10.2.1</a></h5>
 <ul>
   <li> The default values for two configurations of the StreamsConfig class were changed to improve the resiliency of Kafka Streams applications. The internal Kafka Streams producer <code>retries</code> default value was changed from 0 to 10. The internal Kafka Streams consumer <code>max.poll.interval.ms</code>  default value was changed from 300000 to <code>Integer.MAX_VALUE</code>.
@@ -552,7 +707,26 @@ only support 0.10.1.x or later brokers while 0.10.1.x brokers also support older
 <ul>
     <li> Upgrading your Streams application from 0.10.0 to 0.10.1 does require a <a href="#upgrade_10_1">broker upgrade</a> because a Kafka Streams 0.10.1 application can only connect to 0.10.1 brokers. </li>
     <li> There are couple of API changes, that are not backward compatible (cf. <a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_0101">Streams API changes in 0.10.1</a> for more details).
-         Thus, you need to update and recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application. </li>
+     Thus, you need to update and recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application. </li>
+    <!-- TODO add if 0.10.1.2 gets release
+        <li> Upgrading from 0.10.0.x to 0.10.1.2 requires two rolling bounces with config <code>upgrade.from="0.10.0"</code> set for first upgrade phase
+         (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade">KIP-268</a>).
+         As an alternative, an offline upgrade is also possible.
+            <ul>
+                <li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from</code> is set to <code>"0.10.0"</code> for new version 0.10.1.2 </li>
+                <li> bounce each instance of your application once </li>
+                <li> prepare your newly deployed 0.10.1.2 application instances for a second round of rolling bounces; make sure to remove the value for config <code>upgrade.mode</code> </li>
+                <li> bounce each instance of your application once more to complete the upgrade </li>
+            </ul>
+        </li>
+        -->
+    <li> Upgrading from 0.10.0.x to 0.10.1.0 or 0.10.1.1 requires an offline upgrade (rolling bounce upgrade is not supported)
+    <ul>
+        <li> stop all old (0.10.0.x) application instances </li>
+        <li> update your code and swap old code and jar file with new code and new jar file </li>
+        <li> restart all new (0.10.1.0 or 0.10.1.1) application instances </li>
+    </ul>
+    </li>
 </ul>
 
 <h5><a id="upgrade_1010_notable" href="#upgrade_1010_notable">Notable changes in 0.10.1.0</a></h5>
@@ -596,14 +770,17 @@ only support 0.10.1.x or later brokers while 0.10.1.x brokers also support older
 </ul>
 
 <h4><a id="upgrade_10" href="#upgrade_10">Upgrading from 0.8.x or 0.9.x to 0.10.0.0</a></h4>
+<p>
 0.10.0.0 has <a href="#upgrade_10_breaking">potential breaking changes</a> (please review before upgrading) and possible <a href="#upgrade_10_performance_impact">  performance impact following the upgrade</a>. By following the recommended rolling upgrade plan below, you guarantee no downtime and no performance impact during and following the upgrade.
 <br>
 Note: Because new protocols are introduced, it is important to upgrade your Kafka clusters before upgrading your clients.
-<p/>
+</p>
+<p>
 <b>Notes to clients with version 0.9.0.0: </b>Due to a bug introduced in 0.9.0.0,
 clients that depend on ZooKeeper (old Scala high-level Consumer and MirrorMaker if used with the old consumer) will not
 work with 0.10.0.x brokers. Therefore, 0.9.0.0 clients should be upgraded to 0.9.0.1 <b>before</b> brokers are upgraded to
 0.10.0.x. This step is not necessary for 0.8.X or 0.9.0.1 clients.
+</p>
 
 <p><b>For a rolling upgrade:</b></p>
 
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 32c0040..effe763 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -62,6 +62,11 @@ versions += [
   jaxb: "2.3.0",
   jopt: "5.0.4",
   junit: "4.12",
+  kafka_0100: "0.10.0.1",
+  kafka_0101: "0.10.1.1",
+  kafka_0102: "0.10.2.1",
+  kafka_0110: "0.11.0.2",
+  kafka_10: "1.0.1",
   lz4: "1.4.1",
   metrics: "2.2.0",
   // PowerMock 1.x doesn't support Java 9, so use PowerMock 2.0.0 beta
@@ -101,12 +106,16 @@ libs += [
   jettyServlets: "org.eclipse.jetty:jetty-servlets:$versions.jetty",
   jerseyContainerServlet: "org.glassfish.jersey.containers:jersey-container-servlet:$versions.jersey",
   jmhCore: "org.openjdk.jmh:jmh-core:$versions.jmh",
-  jmhGeneratorAnnProcess: "org.openjdk.jmh:jmh-generator-annprocess:$versions.jmh",
   jmhCoreBenchmarks: "org.openjdk.jmh:jmh-core-benchmarks:$versions.jmh",
+  jmhGeneratorAnnProcess: "org.openjdk.jmh:jmh-generator-annprocess:$versions.jmh",
+  joptSimple: "net.sf.jopt-simple:jopt-simple:$versions.jopt",
   junit: "junit:junit:$versions.junit",
+  kafkaStreams_0100: "org.apache.kafka:kafka-streams:$versions.kafka_0100",
+  kafkaStreams_0101: "org.apache.kafka:kafka-streams:$versions.kafka_0101",
+  kafkaStreams_0102: "org.apache.kafka:kafka-streams:$versions.kafka_0102",
+  kafkaStreams_0110: "org.apache.kafka:kafka-streams:$versions.kafka_0110",
+  kafkaStreams_10: "org.apache.kafka:kafka-streams:$versions.kafka_10",
   log4j: "log4j:log4j:$versions.log4j",
-  scalaLogging: "com.typesafe.scala-logging:scala-logging_$versions.baseScala:$versions.scalaLogging",
-  joptSimple: "net.sf.jopt-simple:jopt-simple:$versions.jopt",
   lz4: "org.lz4:lz4-java:$versions.lz4",
   metrics: "com.yammer.metrics:metrics-core:$versions.metrics",
   powermockJunit4: "org.powermock:powermock-module-junit4:$versions.powermock",
@@ -114,6 +123,7 @@ libs += [
   reflections: "org.reflections:reflections:$versions.reflections",
   rocksDBJni: "org.rocksdb:rocksdbjni:$versions.rocksDB",
   scalaLibrary: "org.scala-lang:scala-library:$versions.scala",
+  scalaLogging: "com.typesafe.scala-logging:scala-logging_$versions.baseScala:$versions.scalaLogging",
   scalaReflect: "org.scala-lang:scala-reflect:$versions.scala",
   scalatest: "org.scalatest:scalatest_$versions.baseScala:$versions.scalatest",
   scoveragePlugin: "org.scoverage:scalac-scoverage-plugin_$versions.baseScala:$versions.scoverage",
diff --git a/settings.gradle b/settings.gradle
index e599d01..0313684 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -13,5 +13,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:test-utils', 'streams:examples', 'log4j-appender',
+include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:test-utils', 'streams:examples',
+        'streams:upgrade-system-tests-0100', 'streams:upgrade-system-tests-0101', 'streams:upgrade-system-tests-0102',
+        'streams:upgrade-system-tests-0110', 'streams:upgrade-system-tests-10', 'log4j-appender',
         'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file', 'jmh-benchmarks'
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 0a52516..819bebd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -168,6 +168,11 @@ public class StreamsConfig extends AbstractConfig {
     public static final String ADMIN_CLIENT_PREFIX = "admin.";
 
     /**
+     * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.0.x}.
+     */
+    public static final String UPGRADE_FROM_0100 = "0.10.0";
+
+    /**
      * Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} for at-least-once processing guarantees.
      */
     public static final String AT_LEAST_ONCE = "at_least_once";
@@ -340,6 +345,11 @@ public class StreamsConfig extends AbstractConfig {
     public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor";
     private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements the <code>org.apache.kafka.streams.processor.TimestampExtractor</code> interface. This config is deprecated, use <code>" + DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG + "</code> instead";
 
+    /** {@code upgrade.from} */
+    public static final String UPGRADE_FROM_CONFIG = "upgrade.from";
+    public static final String UPGRADE_FROM_DOC = "Allows upgrading from version 0.10.0 to version 0.10.1 (or newer) in a backward compatible way. " +
+        "Default is null. Accepted values are \"" + UPGRADE_FROM_0100 + "\" (for upgrading from 0.10.0.x).";
+
     /**
      * {@code value.serde}
      * @deprecated Use {@link #DEFAULT_VALUE_SERDE_CLASS_CONFIG} instead.
@@ -562,6 +572,12 @@ public class StreamsConfig extends AbstractConfig {
                     10 * 60 * 1000L,
                     Importance.LOW,
                     STATE_CLEANUP_DELAY_MS_DOC)
+            .define(UPGRADE_FROM_CONFIG,
+                    ConfigDef.Type.STRING,
+                    null,
+                    in(null, UPGRADE_FROM_0100),
+                    Importance.LOW,
+                    UPGRADE_FROM_DOC)
             .define(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
                     Type.LONG,
                     24 * 60 * 60 * 1000L,
@@ -793,6 +809,7 @@ public class StreamsConfig extends AbstractConfig {
         consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer");
 
         // add configs required for stream partition assignor
+        consumerProps.put(UPGRADE_FROM_CONFIG, getString(UPGRADE_FROM_CONFIG));
         consumerProps.put(REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
         consumerProps.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG));
         consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 0edbe2f..97771e5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -179,6 +179,8 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
     private TaskManager taskManager;
     private PartitionGrouper partitionGrouper;
 
+    private int userMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION;
+
     private InternalTopicManager internalTopicManager;
     private CopartitionedTopicsValidator copartitionedTopicsValidator;
 
@@ -197,6 +199,12 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
         final LogContext logContext = new LogContext(logPrefix);
         log = logContext.logger(getClass());
 
+        final String upgradeMode = (String) configs.get(StreamsConfig.UPGRADE_FROM_CONFIG);
+        if (StreamsConfig.UPGRADE_FROM_0100.equals(upgradeMode)) {
+            log.info("Downgrading metadata version from 2 to 1 for upgrade from 0.10.0.x.");
+            userMetadataVersion = 1;
+        }
+
         final Object o = configs.get(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR);
         if (o == null) {
             final KafkaException fatalException = new KafkaException("TaskManager is not specified");
@@ -255,6 +263,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
         final Set<TaskId> standbyTasks = taskManager.cachedTasksIds();
         standbyTasks.removeAll(previousActiveTasks);
         final SubscriptionInfo data = new SubscriptionInfo(
+            userMetadataVersion,
             taskManager.processId(),
             previousActiveTasks,
             standbyTasks,
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 0309659..87f7075 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -472,6 +472,7 @@ public class StreamsConfigTest {
         assertThat(streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG), equalTo(commitIntervalMs));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldBeBackwardsCompatibleWithDeprecatedConfigs() {
         final Properties props = minimalStreamsConfig();
@@ -506,6 +507,7 @@ public class StreamsConfigTest {
         assertTrue(config.defaultTimestampExtractor() instanceof FailOnInvalidTimestamp);
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldSpecifyCorrectKeySerdeClassOnErrorUsingDeprecatedConfigs() {
         final Properties props = minimalStreamsConfig();
@@ -519,6 +521,7 @@ public class StreamsConfigTest {
         }
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldSpecifyCorrectKeySerdeClassOnError() {
         final Properties props = minimalStreamsConfig();
@@ -532,6 +535,7 @@ public class StreamsConfigTest {
         }
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldSpecifyCorrectValueSerdeClassOnErrorUsingDeprecatedConfigs() {
         final Properties props = minimalStreamsConfig();
@@ -545,6 +549,7 @@ public class StreamsConfigTest {
         }
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldSpecifyCorrectValueSerdeClassOnError() {
         final Properties props = minimalStreamsConfig();
@@ -567,9 +572,7 @@ public class StreamsConfigTest {
         }
 
         @Override
-        public void close() {
-
-        }
+        public void close() {}
 
         @Override
         public Serializer serializer() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 4c12bb9..44e139a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -313,6 +313,4 @@ public class KStreamAggregationDedupIntegrationTest {
 
     }
 
-
-
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index b0c0d68..e9ed968 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -131,7 +131,7 @@ public class StreamsPartitionAssignorTest {
     private void mockTaskManager(final Set<TaskId> prevTasks,
                                  final Set<TaskId> cachedTasks,
                                  final UUID processId,
-                                 final InternalTopologyBuilder builder) throws NoSuchFieldException, IllegalAccessException {
+                                 final InternalTopologyBuilder builder) {
         EasyMock.expect(taskManager.builder()).andReturn(builder).anyTimes();
         EasyMock.expect(taskManager.prevActiveTaskIds()).andReturn(prevTasks).anyTimes();
         EasyMock.expect(taskManager.cachedTasksIds()).andReturn(cachedTasks).anyTimes();
@@ -167,7 +167,7 @@ public class StreamsPartitionAssignorTest {
     }
 
     @Test
-    public void testSubscription() throws Exception {
+    public void testSubscription() {
         builder.addSource(null, "source1", null, null, null, "topic1");
         builder.addSource(null, "source2", null, null, null, "topic2");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
@@ -195,7 +195,7 @@ public class StreamsPartitionAssignorTest {
     }
 
     @Test
-    public void testAssignBasic() throws Exception {
+    public void testAssignBasic() {
         builder.addSource(null, "source1", null, null, null, "topic1");
         builder.addSource(null, "source2", null, null, null, "topic2");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
@@ -235,11 +235,9 @@ public class StreamsPartitionAssignorTest {
 
         // check assignment info
 
-        Set<TaskId> allActiveTasks = new HashSet<>();
-
         // the first consumer
         AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10"));
-        allActiveTasks.addAll(info10.activeTasks());
+        Set<TaskId> allActiveTasks = new HashSet<>(info10.activeTasks());
 
         // the second consumer
         AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11"));
@@ -259,7 +257,7 @@ public class StreamsPartitionAssignorTest {
     }
 
     @Test
-    public void shouldAssignEvenlyAcrossConsumersOneClientMultipleThreads() throws Exception {
+    public void shouldAssignEvenlyAcrossConsumersOneClientMultipleThreads() {
         builder.addSource(null, "source1", null, null, null, "topic1");
         builder.addSource(null, "source2", null, null, null, "topic2");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source1");
@@ -327,7 +325,7 @@ public class StreamsPartitionAssignorTest {
     }
 
     @Test
-    public void testAssignWithPartialTopology() throws Exception {
+    public void testAssignWithPartialTopology() {
         builder.addSource(null, "source1", null, null, null, "topic1");
         builder.addProcessor("processor1", new MockProcessorSupplier(), "source1");
         builder.addStateStore(new MockStateStoreSupplier("store1", false), "processor1");
@@ -352,9 +350,8 @@ public class StreamsPartitionAssignorTest {
         Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
 
         // check assignment info
-        Set<TaskId> allActiveTasks = new HashSet<>();
         AssignmentInfo info10 = checkAssignment(Utils.mkSet("topic1"), assignments.get("consumer10"));
-        allActiveTasks.addAll(info10.activeTasks());
+        Set<TaskId> allActiveTasks = new HashSet<>(info10.activeTasks());
 
         assertEquals(3, allActiveTasks.size());
         assertEquals(allTasks, new HashSet<>(allActiveTasks));
@@ -362,7 +359,7 @@ public class StreamsPartitionAssignorTest {
 
 
     @Test
-    public void testAssignEmptyMetadata() throws Exception {
+    public void testAssignEmptyMetadata() {
         builder.addSource(null, "source1", null, null, null, "topic1");
         builder.addSource(null, "source2", null, null, null, "topic2");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
@@ -392,9 +389,8 @@ public class StreamsPartitionAssignorTest {
             new HashSet<>(assignments.get("consumer10").partitions()));
 
         // check assignment info
-        Set<TaskId> allActiveTasks = new HashSet<>();
         AssignmentInfo info10 = checkAssignment(Collections.<String>emptySet(), assignments.get("consumer10"));
-        allActiveTasks.addAll(info10.activeTasks());
+        Set<TaskId> allActiveTasks = new HashSet<>(info10.activeTasks());
 
         assertEquals(0, allActiveTasks.size());
         assertEquals(Collections.<TaskId>emptySet(), new HashSet<>(allActiveTasks));
@@ -417,7 +413,7 @@ public class StreamsPartitionAssignorTest {
     }
 
     @Test
-    public void testAssignWithNewTasks() throws Exception {
+    public void testAssignWithNewTasks() {
         builder.addSource(null, "source1", null, null, null, "topic1");
         builder.addSource(null, "source2", null, null, null, "topic2");
         builder.addSource(null, "source3", null, null, null, "topic3");
@@ -450,13 +446,9 @@ public class StreamsPartitionAssignorTest {
         // check assigned partitions: since there is no previous task for topic 3 it will be assigned randomly so we cannot check exact match
         // also note that previously assigned partitions / tasks may not stay on the previous host since we may assign the new task first and
         // then later ones will be re-assigned to other hosts due to load balancing
-        Set<TaskId> allActiveTasks = new HashSet<>();
-        Set<TopicPartition> allPartitions = new HashSet<>();
-        AssignmentInfo info;
-
-        info = AssignmentInfo.decode(assignments.get("consumer10").userData());
-        allActiveTasks.addAll(info.activeTasks());
-        allPartitions.addAll(assignments.get("consumer10").partitions());
+        AssignmentInfo info = AssignmentInfo.decode(assignments.get("consumer10").userData());
+        Set<TaskId> allActiveTasks = new HashSet<>(info.activeTasks());
+        Set<TopicPartition> allPartitions = new HashSet<>(assignments.get("consumer10").partitions());
 
         info = AssignmentInfo.decode(assignments.get("consumer11").userData());
         allActiveTasks.addAll(info.activeTasks());
@@ -471,7 +463,7 @@ public class StreamsPartitionAssignorTest {
     }
 
     @Test
-    public void testAssignWithStates() throws Exception {
+    public void testAssignWithStates() {
         builder.setApplicationId(applicationId);
         builder.addSource(null, "source1", null, null, null, "topic1");
         builder.addSource(null, "source2", null, null, null, "topic2");
@@ -542,7 +534,10 @@ public class StreamsPartitionAssignorTest {
         assertEquals(Utils.mkSet(task10, task11, task12), tasksForState(applicationId, "store3", tasks, topicGroups));
     }
 
-    private Set<TaskId> tasksForState(String applicationId, String storeName, List<TaskId> tasks, Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups) {
+    private Set<TaskId> tasksForState(final String applicationId,
+                                      final String storeName,
+                                      final List<TaskId> tasks,
+                                      final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups) {
         final String changelogTopic = ProcessorStateManager.storeChangelogTopic(applicationId, storeName);
 
         Set<TaskId> ids = new HashSet<>();
@@ -560,7 +555,7 @@ public class StreamsPartitionAssignorTest {
     }
 
     @Test
-    public void testAssignWithStandbyReplicas() throws Exception {
+    public void testAssignWithStandbyReplicas() {
         Map<String, Object> props = configProps();
         props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
         StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -598,13 +593,10 @@ public class StreamsPartitionAssignorTest {
 
         Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
 
-        Set<TaskId> allActiveTasks = new HashSet<>();
-        Set<TaskId> allStandbyTasks = new HashSet<>();
-
         // the first consumer
         AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10"));
-        allActiveTasks.addAll(info10.activeTasks());
-        allStandbyTasks.addAll(info10.standbyTasks().keySet());
+        Set<TaskId> allActiveTasks = new HashSet<>(info10.activeTasks());
+        Set<TaskId> allStandbyTasks = new HashSet<>(info10.standbyTasks().keySet());
 
         // the second consumer
         AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11"));
@@ -632,7 +624,7 @@ public class StreamsPartitionAssignorTest {
     }
 
     @Test
-    public void testOnAssignment() throws Exception {
+    public void testOnAssignment() {
         configurePartitionAssignor(Collections.<String, Object>emptyMap());
 
         final List<TaskId> activeTaskList = Utils.mkList(task0, task3);
@@ -667,7 +659,7 @@ public class StreamsPartitionAssignorTest {
     }
 
     @Test
-    public void testAssignWithInternalTopics() throws Exception {
+    public void testAssignWithInternalTopics() {
         builder.setApplicationId(applicationId);
         builder.addInternalTopic("topicX");
         builder.addSource(null, "source1", null, null, null, "topic1");
@@ -697,7 +689,7 @@ public class StreamsPartitionAssignorTest {
     }
 
     @Test
-    public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() throws Exception {
+    public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() {
         String applicationId = "test";
         builder.setApplicationId(applicationId);
         builder.addInternalTopic("topicX");
@@ -732,7 +724,7 @@ public class StreamsPartitionAssignorTest {
     }
 
     @Test
-    public void shouldGenerateTasksForAllCreatedPartitions() throws Exception {
+    public void shouldGenerateTasksForAllCreatedPartitions() {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final InternalTopologyBuilder internalTopologyBuilder = StreamsBuilderTest.internalTopologyBuilder(builder);
@@ -832,7 +824,7 @@ public class StreamsPartitionAssignorTest {
     }
 
     @Test
-    public void shouldAddUserDefinedEndPointToSubscription() throws Exception {
+    public void shouldAddUserDefinedEndPointToSubscription() {
         builder.setApplicationId(applicationId);
         builder.addSource(null, "source", null, null, null, "input");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source");
@@ -851,7 +843,7 @@ public class StreamsPartitionAssignorTest {
     }
 
     @Test
-    public void shouldMapUserEndPointToTopicPartitions() throws Exception {
+    public void shouldMapUserEndPointToTopicPartitions() {
         builder.setApplicationId(applicationId);
         builder.addSource(null, "source", null, null, null, "topic1");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source");
@@ -881,7 +873,7 @@ public class StreamsPartitionAssignorTest {
     }
 
     @Test
-    public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() throws Exception {
+    public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() {
         builder.setApplicationId(applicationId);
 
         mockTaskManager(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), UUID.randomUUID(), builder);
@@ -908,7 +900,7 @@ public class StreamsPartitionAssignorTest {
     }
 
     @Test
-    public void shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTasks() throws Exception {
+    public void shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTasks() {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final InternalTopologyBuilder internalTopologyBuilder = StreamsBuilderTest.internalTopologyBuilder(builder);
@@ -1010,7 +1002,7 @@ public class StreamsPartitionAssignorTest {
     }
 
     @Test
-    public void shouldUpdateClusterMetadataAndHostInfoOnAssignment() throws Exception {
+    public void shouldUpdateClusterMetadataAndHostInfoOnAssignment() {
         final TopicPartition partitionOne = new TopicPartition("topic", 1);
         final TopicPartition partitionTwo = new TopicPartition("topic", 2);
         final Map<HostInfo, Set<TopicPartition>> hostState = Collections.singletonMap(
@@ -1028,7 +1020,7 @@ public class StreamsPartitionAssignorTest {
     }
 
     @Test
-    public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() throws Exception {
+    public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final InternalTopologyBuilder internalTopologyBuilder = StreamsBuilderTest.internalTopologyBuilder(builder);
@@ -1096,7 +1088,7 @@ public class StreamsPartitionAssignorTest {
     }
 
     @Test
-    public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions() throws Exception {
+    public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions() {
         final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         final Set<TaskId> emptyTasks = Collections.emptySet();
         subscriptions.put(
@@ -1114,10 +1106,11 @@ public class StreamsPartitionAssignorTest {
             )
         );
 
-        mockTaskManager(Collections.<TaskId>emptySet(),
-            Collections.<TaskId>emptySet(),
+        mockTaskManager(
+            emptyTasks,
+            emptyTasks,
             UUID.randomUUID(),
-            new InternalTopologyBuilder());
+            builder);
         partitionAssignor.configure(configProps());
         final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, subscriptions);
 
@@ -1126,6 +1119,22 @@ public class StreamsPartitionAssignorTest {
         assertThat(AssignmentInfo.decode(assignment.get("consumer2").userData()).version(), equalTo(1));
     }
 
+    @Test
+    public void shouldDownGradeSubscription() {
+        final Set<TaskId> emptyTasks = Collections.emptySet();
+
+        mockTaskManager(
+            emptyTasks,
+            emptyTasks,
+            UUID.randomUUID(),
+            builder);
+        configurePartitionAssignor(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, (Object) StreamsConfig.UPGRADE_FROM_0100));
+
+        PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1"));
+
+        assertThat(SubscriptionInfo.decode(subscription.userData()).version(), equalTo(1));
+    }
+
     private PartitionAssignor.Assignment createAssignment(final Map<HostInfo, Set<TopicPartition>> firstHostState) {
         final AssignmentInfo info = new AssignmentInfo(Collections.<TaskId>emptyList(),
                                                        Collections.<TaskId, Set<TopicPartition>>emptyMap(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
index 726a562..c1020a9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
@@ -68,7 +68,6 @@ public class AssignmentInfoTest {
         assertEquals(1, decoded.version());
     }
 
-
     /**
      * This is a clone of what the V1 encoding did. The encode method has changed for V2
      * so it is impossible to test compatibility without having this
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index e2493b2..2936c63 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.tests;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -30,10 +31,13 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowStore;
 
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
@@ -56,7 +60,7 @@ public class SmokeTestClient extends SmokeTestUtil {
         streams = createKafkaStreams(streamsProperties, kafka);
         streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
             @Override
-            public void uncaughtException(Thread t, Throwable e) {
+            public void uncaughtException(final Thread t, final Throwable e) {
                 System.out.println("SMOKE-TEST-CLIENT-EXCEPTION");
                 uncaughtException = true;
                 e.printStackTrace();
@@ -93,38 +97,45 @@ public class SmokeTestClient extends SmokeTestUtil {
         }
     }
 
-    private static KafkaStreams createKafkaStreams(final Properties props, final String kafka) {
-        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
-        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
-        props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
-        props.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100);
-        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
-        props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
-        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
-        props.put(ProducerConfig.ACKS_CONFIG, "all");
+    private static Properties getStreamsConfig(final Properties props, final String kafka) {
+        final Properties config = new Properties(props);
+        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
+        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
+        config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
+        config.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100);
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        config.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
+        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        config.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
+        config.put(ProducerConfig.ACKS_CONFIG, "all");
         //TODO remove this config or set to smaller value when KIP-91 is merged
-        props.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG), 80000);
+        config.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG), 80000);
+
+        config.putAll(props);
+        return config;
+    }
 
-        StreamsBuilder builder = new StreamsBuilder();
-        Consumed<String, Integer> stringIntConsumed = Consumed.with(stringSerde, intSerde);
-        KStream<String, Integer> source = builder.stream("data", stringIntConsumed);
-        source.to(stringSerde, intSerde, "echo");
-        KStream<String, Integer> data = source.filter(new Predicate<String, Integer>() {
+    private static KafkaStreams createKafkaStreams(final Properties props, final String kafka) {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Consumed<String, Integer> stringIntConsumed = Consumed.with(stringSerde, intSerde);
+        final KStream<String, Integer> source = builder.stream("data", stringIntConsumed);
+        source.to("echo", Produced.with(stringSerde, intSerde));
+        final KStream<String, Integer> data = source.filter(new Predicate<String, Integer>() {
             @Override
-            public boolean test(String key, Integer value) {
+            public boolean test(final String key, final Integer value) {
                 return value == null || value != END;
             }
         });
         data.process(SmokeTestUtil.printProcessorSupplier("data"));
 
         // min
-        KGroupedStream<String, Integer>
-            groupedData =
+        final KGroupedStream<String, Integer> groupedData =
             data.groupByKey(Serialized.with(stringSerde, intSerde));
 
-        groupedData.aggregate(
+        groupedData
+            .windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(1)))
+            .aggregate(
                 new Initializer<Integer>() {
                     public Integer apply() {
                         return Integer.MAX_VALUE;
@@ -132,21 +143,24 @@ public class SmokeTestClient extends SmokeTestUtil {
                 },
                 new Aggregator<String, Integer, Integer>() {
                     @Override
-                    public Integer apply(String aggKey, Integer value, Integer aggregate) {
+                    public Integer apply(final String aggKey, final Integer value, final Integer aggregate) {
                         return (value < aggregate) ? value : aggregate;
                     }
                 },
-                TimeWindows.of(TimeUnit.DAYS.toMillis(1)),
-                intSerde, "uwin-min"
-        ).toStream().map(
-                new Unwindow<String, Integer>()
-        ).to(stringSerde, intSerde, "min");
+                Materialized.<String, Integer, WindowStore<Bytes, byte[]>>as("uwin-min").withValueSerde(intSerde))
+            .toStream(new Unwindow<String, Integer>())
+            .to("min", Produced.with(stringSerde, intSerde));
 
-        KTable<String, Integer> minTable = builder.table("min", stringIntConsumed);
+        final KTable<String, Integer> minTable = builder.table(
+            "min",
+            Consumed.with(stringSerde, intSerde),
+            Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("minStoreName"));
         minTable.toStream().process(SmokeTestUtil.printProcessorSupplier("min"));
 
         // max
-        groupedData.aggregate(
+        groupedData
+            .windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(2)))
+            .aggregate(
                 new Initializer<Integer>() {
                     public Integer apply() {
                         return Integer.MIN_VALUE;
@@ -154,21 +168,24 @@ public class SmokeTestClient extends SmokeTestUtil {
                 },
                 new Aggregator<String, Integer, Integer>() {
                     @Override
-                    public Integer apply(String aggKey, Integer value, Integer aggregate) {
+                    public Integer apply(final String aggKey, final Integer value, final Integer aggregate) {
                         return (value > aggregate) ? value : aggregate;
                     }
                 },
-                TimeWindows.of(TimeUnit.DAYS.toMillis(2)),
-                intSerde, "uwin-max"
-        ).toStream().map(
-                new Unwindow<String, Integer>()
-        ).to(stringSerde, intSerde, "max");
+                Materialized.<String, Integer, WindowStore<Bytes, byte[]>>as("uwin-max").withValueSerde(intSerde))
+            .toStream(new Unwindow<String, Integer>())
+            .to("max", Produced.with(stringSerde, intSerde));
 
-        KTable<String, Integer> maxTable = builder.table("max", stringIntConsumed);
+        final KTable<String, Integer> maxTable = builder.table(
+            "max",
+            Consumed.with(stringSerde, intSerde),
+            Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("maxStoreName"));
         maxTable.toStream().process(SmokeTestUtil.printProcessorSupplier("max"));
 
         // sum
-        groupedData.aggregate(
+        groupedData
+            .windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(2)))
+            .aggregate(
                 new Initializer<Long>() {
                     public Long apply() {
                         return 0L;
@@ -176,70 +193,74 @@ public class SmokeTestClient extends SmokeTestUtil {
                 },
                 new Aggregator<String, Integer, Long>() {
                     @Override
-                    public Long apply(String aggKey, Integer value, Long aggregate) {
+                    public Long apply(final String aggKey, final Integer value, final Long aggregate) {
                         return (long) value + aggregate;
                     }
                 },
-                TimeWindows.of(TimeUnit.DAYS.toMillis(2)),
-                longSerde, "win-sum"
-        ).toStream().map(
-                new Unwindow<String, Long>()
-        ).to(stringSerde, longSerde, "sum");
-
-        Consumed<String, Long> stringLongConsumed = Consumed.with(stringSerde, longSerde);
-        KTable<String, Long> sumTable = builder.table("sum", stringLongConsumed);
+                Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("win-sum").withValueSerde(longSerde))
+            .toStream(new Unwindow<String, Long>())
+            .to("sum", Produced.with(stringSerde, longSerde));
+
+        final Consumed<String, Long> stringLongConsumed = Consumed.with(stringSerde, longSerde);
+        final KTable<String, Long> sumTable = builder.table("sum", stringLongConsumed);
         sumTable.toStream().process(SmokeTestUtil.printProcessorSupplier("sum"));
+
         // cnt
-        groupedData.count(TimeWindows.of(TimeUnit.DAYS.toMillis(2)), "uwin-cnt")
-            .toStream().map(
-                new Unwindow<String, Long>()
-        ).to(stringSerde, longSerde, "cnt");
+        groupedData
+            .windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(2)))
+            .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("uwin-cnt"))
+            .toStream(new Unwindow<String, Long>())
+            .to("cnt", Produced.with(stringSerde, longSerde));
 
-        KTable<String, Long> cntTable = builder.table("cnt", stringLongConsumed);
+        final KTable<String, Long> cntTable = builder.table(
+            "cnt",
+            Consumed.with(stringSerde, longSerde),
+            Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("cntStoreName"));
         cntTable.toStream().process(SmokeTestUtil.printProcessorSupplier("cnt"));
 
         // dif
-        maxTable.join(minTable,
+        maxTable
+            .join(
+                minTable,
                 new ValueJoiner<Integer, Integer, Integer>() {
-                    public Integer apply(Integer value1, Integer value2) {
+                    public Integer apply(final Integer value1, final Integer value2) {
                         return value1 - value2;
                     }
-                }
-        ).to(stringSerde, intSerde, "dif");
+                })
+            .toStream()
+            .to("dif", Produced.with(stringSerde, intSerde));
 
         // avg
-        sumTable.join(
+        sumTable
+            .join(
                 cntTable,
                 new ValueJoiner<Long, Long, Double>() {
-                    public Double apply(Long value1, Long value2) {
+                    public Double apply(final Long value1, final Long value2) {
                         return (double) value1 / (double) value2;
                     }
-                }
-        ).to(stringSerde, doubleSerde, "avg");
+                })
+            .toStream()
+            .to("avg", Produced.with(stringSerde, doubleSerde));
 
         // test repartition
-        Agg agg = new Agg();
-        cntTable.groupBy(agg.selector(),
-                         Serialized.with(stringSerde, longSerde)
-        ).aggregate(agg.init(),
-                    agg.adder(),
-                    agg.remover(),
-                    Materialized.<String, Long>as(Stores.inMemoryKeyValueStore("cntByCnt"))
-                            .withKeySerde(Serdes.String())
-                            .withValueSerde(Serdes.Long())
-        ).to(stringSerde, longSerde, "tagg");
-
-        final KafkaStreams streamsClient = new KafkaStreams(builder.build(), props);
+        final Agg agg = new Agg();
+        cntTable.groupBy(agg.selector(), Serialized.with(stringSerde, longSerde))
+            .aggregate(agg.init(), agg.adder(), agg.remover(),
+                Materialized.<String, Long>as(Stores.inMemoryKeyValueStore("cntByCnt"))
+                    .withKeySerde(Serdes.String())
+                    .withValueSerde(Serdes.Long()))
+            .toStream()
+            .to("tagg", Produced.with(stringSerde, longSerde));
+
+        final KafkaStreams streamsClient = new KafkaStreams(builder.build(), getStreamsConfig(props, kafka));
         streamsClient.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
             @Override
-            public void uncaughtException(Thread t, Throwable e) {
+            public void uncaughtException(final Thread t, final Throwable e) {
                 System.out.println("FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
-                
                 streamsClient.close(30, TimeUnit.SECONDS);
             }
         });
 
         return streamsClient;
     }
-
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index a3f520a..fc7a26e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -136,53 +136,65 @@ public class SmokeTestDriver extends SmokeTestUtil {
         System.out.println("shutdown");
     }
 
-    public static Map<String, Set<Integer>> generate(String kafka, final int numKeys, final int maxRecordsPerKey) {
+    public static Map<String, Set<Integer>> generate(final String kafka,
+                                                     final int numKeys,
+                                                     final int maxRecordsPerKey) {
+        return generate(kafka, numKeys, maxRecordsPerKey, true);
+    }
+
+    public static Map<String, Set<Integer>> generate(final String kafka,
+                                                     final int numKeys,
+                                                     final int maxRecordsPerKey,
+                                                     final boolean autoTerminate) {
         final Properties producerProps = new Properties();
         producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
         producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
-        // the next 4 config values make sure that all records are produced with no loss and
-        // no duplicates
+        // the next 2 config values make sure that all records are produced with no loss and no duplicates
         producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
         producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
         producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 80000);
 
-        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
+        final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
 
         int numRecordsProduced = 0;
 
-        Map<String, Set<Integer>> allData = new HashMap<>();
-        ValueList[] data = new ValueList[numKeys];
+        final Map<String, Set<Integer>> allData = new HashMap<>();
+        final ValueList[] data = new ValueList[numKeys];
         for (int i = 0; i < numKeys; i++) {
             data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
             allData.put(data[i].key, new HashSet<Integer>());
         }
-        Random rand = new Random();
+        final Random rand = new Random();
 
-        int remaining = data.length;
+        int remaining = 1; // dummy value must be positive if <autoTerminate> is false
+        if (autoTerminate) {
+            remaining = data.length;
+        }
 
         List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>();
 
         while (remaining > 0) {
-            int index = rand.nextInt(remaining);
-            String key = data[index].key;
+            final int index = autoTerminate ? rand.nextInt(remaining) : rand.nextInt(numKeys);
+            final String key = data[index].key;
             int value = data[index].next();
 
-            if (value < 0) {
+            if (autoTerminate && value < 0) {
                 remaining--;
                 data[index] = data[remaining];
             } else {
 
-                ProducerRecord<byte[], byte[]> record =
-                        new ProducerRecord<>("data", stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value));
+                final ProducerRecord<byte[], byte[]> record =
+                    new ProducerRecord<>("data", stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value));
 
                 producer.send(record, new TestCallback(record, needRetry));
 
                 numRecordsProduced++;
                 allData.get(key).add(value);
-                if (numRecordsProduced % 100 == 0)
+                if (numRecordsProduced % 100 == 0) {
                     System.out.println(numRecordsProduced + " records produced");
+                }
                 Utils.sleep(2);
             }
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
index dc4c91b..87ca829 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
@@ -44,20 +44,15 @@ public class SmokeTestUtil {
             public Processor<Object, Object> get() {
                 return new AbstractProcessor<Object, Object>() {
                     private int numRecordsProcessed = 0;
-                    private ProcessorContext context;
 
                     @Override
                     public void init(final ProcessorContext context) {
                         System.out.println("initializing processor: topic=" + topic + " taskId=" + context.taskId());
                         numRecordsProcessed = 0;
-                        this.context = context;
                     }
 
                     @Override
                     public void process(final Object key, final Object value) {
-                        if (printOffset) {
-                            System.out.println(">>> " + context.offset());
-                        }
                         numRecordsProcessed++;
                         if (numRecordsProcessed % 100 == 0) {
                             System.out.println(System.currentTimeMillis());
@@ -66,19 +61,19 @@ public class SmokeTestUtil {
                     }
 
                     @Override
-                    public void punctuate(final long timestamp) { }
+                    public void punctuate(final long timestamp) {}
 
                     @Override
-                    public void close() { }
+                    public void close() {}
                 };
             }
         };
     }
 
-    public static final class Unwindow<K, V> implements KeyValueMapper<Windowed<K>, V, KeyValue<K, V>> {
+    public static final class Unwindow<K, V> implements KeyValueMapper<Windowed<K>, V, K> {
         @Override
-        public KeyValue<K, V> apply(final Windowed<K> winKey, final V value) {
-            return new KeyValue<>(winKey.key(), value);
+        public K apply(final Windowed<K> winKey, final V value) {
+            return winKey.key();
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
index 27aba29..41c3f6c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.tests;
 
 import org.apache.kafka.common.utils.Utils;
 
-import java.io.IOException;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
@@ -26,22 +25,24 @@ import java.util.Set;
 public class StreamsSmokeTest {
 
     /**
-     *  args ::= kafka propFileName command
+     *  args ::= kafka propFileName command disableAutoTerminate
      *  command := "run" | "process"
      *
      * @param args
      */
-    public static void main(final String[] args) throws InterruptedException, IOException {
+    public static void main(final String[] args) throws Exception {
         final String kafka = args[0];
         final String propFileName = args.length > 1 ? args[1] : null;
         final String command = args.length > 2 ? args[2] : null;
+        final boolean disableAutoTerminate = args.length > 3;
 
         final Properties streamsProperties = Utils.loadProps(propFileName);
 
-        System.out.println("StreamsTest instance started");
+        System.out.println("StreamsTest instance started (StreamsSmokeTest)");
         System.out.println("command=" + command);
         System.out.println("kafka=" + kafka);
         System.out.println("props=" + streamsProperties);
+        System.out.println("disableAutoTerminate=" + disableAutoTerminate);
 
         switch (command) {
             case "standalone":
@@ -51,8 +52,12 @@ public class StreamsSmokeTest {
                 // this starts the driver (data generation and result verification)
                 final int numKeys = 10;
                 final int maxRecordsPerKey = 500;
-                Map<String, Set<Integer>> allData = SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey);
-                SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey);
+                if (disableAutoTerminate) {
+                    SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey, false);
+                } else {
+                    Map<String, Set<Integer>> allData = SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey);
+                    SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey);
+                }
                 break;
             case "process":
                 // this starts a KafkaStreams client
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
new file mode 100644
index 0000000..69eea0b
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+
+import java.util.Properties;
+
+public class StreamsUpgradeTest {
+
+    @SuppressWarnings("unchecked")
+    public static void main(final String[] args) throws Exception {
+        if (args.length < 2) {
+            System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+                + (args.length > 0 ? args[0] : ""));
+        }
+        final String kafka = args[0];
+        final String propFileName = args.length > 1 ? args[1] : null;
+
+        final Properties streamsProperties = Utils.loadProps(propFileName);
+
+        System.out.println("StreamsTest instance started (StreamsUpgradeTest trunk)");
+        System.out.println("kafka=" + kafka);
+        System.out.println("props=" + streamsProperties);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream dataStream = builder.stream("data");
+        dataStream.process(SmokeTestUtil.printProcessorSupplier("data"));
+        dataStream.to("echo");
+
+        final Properties config = new Properties();
+        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
+        config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        config.putAll(streamsProperties);
+
+        final KafkaStreams streams = new KafkaStreams(builder.build(), config);
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                System.out.println("closing Kafka Streams instance");
+                System.out.flush();
+                streams.close();
+                System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+                System.out.flush();
+            }
+        });
+    }
+}
diff --git a/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
new file mode 100644
index 0000000..32f96a0
--- /dev/null
+++ b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.util.Properties;
+
+public class StreamsUpgradeTest {
+
+    @SuppressWarnings("unchecked")
+    public static void main(final String[] args) throws Exception {
+        if (args.length < 3) {
+            System.err.println("StreamsUpgradeTest requires three argument (kafka-url, zookeeper-url, properties-file) but only " + args.length + " provided: "
+                + (args.length > 0 ? args[0] + " " : "")
+                + (args.length > 1 ? args[1] : ""));
+        }
+        final String kafka = args[0];
+        final String zookeeper = args[1];
+        final String propFileName = args.length > 2 ? args[2] : null;
+
+        final Properties streamsProperties = Utils.loadProps(propFileName);
+
+        System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.10.0)");
+        System.out.println("kafka=" + kafka);
+        System.out.println("zookeeper=" + zookeeper);
+        System.out.println("props=" + streamsProperties);
+
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KStream dataStream = builder.stream("data");
+        dataStream.process(printProcessorSupplier());
+        dataStream.to("echo");
+
+        final Properties config = new Properties();
+        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
+        config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        config.putAll(streamsProperties);
+
+        final KafkaStreams streams = new KafkaStreams(builder, config);
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                System.out.println("closing Kafka Streams instance");
+                System.out.flush();
+                streams.close();
+                System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+                System.out.flush();
+            }
+        });
+    }
+
+    private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
+        return new ProcessorSupplier<K, V>() {
+            public Processor<K, V> get() {
+                return new AbstractProcessor<K, V>() {
+                    private int numRecordsProcessed = 0;
+
+                    @Override
+                    public void init(final ProcessorContext context) {
+                        System.out.println("initializing processor: topic=data taskId=" + context.taskId());
+                        numRecordsProcessed = 0;
+                    }
+
+                    @Override
+                    public void process(final K key, final V value) {
+                        numRecordsProcessed++;
+                        if (numRecordsProcessed % 100 == 0) {
+                            System.out.println("processed " + numRecordsProcessed + " records from topic=data");
+                        }
+                    }
+
+                    @Override
+                    public void punctuate(final long timestamp) {}
+
+                    @Override
+                    public void close() {}
+                };
+            }
+        };
+    }
+}
diff --git a/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
new file mode 100644
index 0000000..dcb05ca
--- /dev/null
+++ b/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.util.Properties;
+
+public class StreamsUpgradeTest {
+
+    /**
+     * This test cannot be executed, as long as Kafka 0.10.1.2 is not released
+     */
+    @SuppressWarnings("unchecked")
+    public static void main(final String[] args) throws Exception {
+        if (args.length < 3) {
+            System.err.println("StreamsUpgradeTest requires three argument (kafka-url, zookeeper-url, properties-file) but only " + args.length + " provided: "
+                + (args.length > 0 ? args[0] + " " : "")
+                + (args.length > 1 ? args[1] : ""));
+        }
+        final String kafka = args[0];
+        final String zookeeper = args[1];
+        final String propFileName = args.length > 2 ? args[2] : null;
+
+        final Properties streamsProperties = Utils.loadProps(propFileName);
+
+        System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.10.1)");
+        System.out.println("kafka=" + kafka);
+        System.out.println("zookeeper=" + zookeeper);
+        System.out.println("props=" + streamsProperties);
+
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KStream dataStream = builder.stream("data");
+        dataStream.process(printProcessorSupplier());
+        dataStream.to("echo");
+
+        final Properties config = new Properties();
+        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
+        config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        config.putAll(streamsProperties);
+
+        final KafkaStreams streams = new KafkaStreams(builder, config);
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                System.out.println("closing Kafka Streams instance");
+                System.out.flush();
+                streams.close();
+                System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+                System.out.flush();
+            }
+        });
+    }
+
+    private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
+        return new ProcessorSupplier<K, V>() {
+            public Processor<K, V> get() {
+                return new AbstractProcessor<K, V>() {
+                    private int numRecordsProcessed = 0;
+
+                    @Override
+                    public void init(final ProcessorContext context) {
+                        System.out.println("initializing processor: topic=data taskId=" + context.taskId());
+                        numRecordsProcessed = 0;
+                    }
+
+                    @Override
+                    public void process(final K key, final V value) {
+                        numRecordsProcessed++;
+                        if (numRecordsProcessed % 100 == 0) {
+                            System.out.println("processed " + numRecordsProcessed + " records from topic=data");
+                        }
+                    }
+
+                    @Override
+                    public void punctuate(final long timestamp) {}
+
+                    @Override
+                    public void close() {}
+                };
+            }
+        };
+    }
+}
diff --git a/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
new file mode 100644
index 0000000..fb4a409
--- /dev/null
+++ b/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.util.Properties;
+
+public class StreamsUpgradeTest {
+
+    /**
+     * This test cannot be executed, as long as Kafka 0.10.2.2 is not released
+     */
+    @SuppressWarnings("unchecked")
+    public static void main(final String[] args) throws Exception {
+        if (args.length < 2) {
+            System.err.println("StreamsUpgradeTest requires three argument (kafka-url, properties-file) but only " + args.length + " provided: "
+                + (args.length > 0 ? args[0] : ""));
+        }
+        final String kafka = args[0];
+        final String propFileName = args.length > 1 ? args[1] : null;
+
+        final Properties streamsProperties = Utils.loadProps(propFileName);
+
+        System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.10.2)");
+        System.out.println("kafka=" + kafka);
+        System.out.println("props=" + streamsProperties);
+
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KStream dataStream = builder.stream("data");
+        dataStream.process(printProcessorSupplier());
+        dataStream.to("echo");
+
+        final Properties config = new Properties();
+        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
+        config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        config.putAll(streamsProperties);
+
+        final KafkaStreams streams = new KafkaStreams(builder, config);
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                streams.close();
+                System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+                System.out.flush();
+            }
+        });
+    }
+
+    private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
+        return new ProcessorSupplier<K, V>() {
+            public Processor<K, V> get() {
+                return new AbstractProcessor<K, V>() {
+                    private int numRecordsProcessed = 0;
+
+                    @Override
+                    public void init(final ProcessorContext context) {
+                        System.out.println("initializing processor: topic=data taskId=" + context.taskId());
+                        numRecordsProcessed = 0;
+                    }
+
+                    @Override
+                    public void process(final K key, final V value) {
+                        numRecordsProcessed++;
+                        if (numRecordsProcessed % 100 == 0) {
+                            System.out.println("processed " + numRecordsProcessed + " records from topic=data");
+                        }
+                    }
+
+                    @Override
+                    public void punctuate(final long timestamp) {}
+
+                    @Override
+                    public void close() {}
+                };
+            }
+        };
+    }
+}
diff --git a/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
new file mode 100644
index 0000000..b1aad5d
--- /dev/null
+++ b/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.util.Properties;
+
+public class StreamsUpgradeTest {
+
+    /**
+     * This test cannot be executed, as long as Kafka 0.11.0.3 is not released
+     */
+    @SuppressWarnings("unchecked")
+    public static void main(final String[] args) throws Exception {
+        if (args.length < 2) {
+            System.err.println("StreamsUpgradeTest requires three argument (kafka-url, properties-file) but only " + args.length + " provided: "
+                + (args.length > 0 ? args[0] : ""));
+        }
+        final String kafka = args[0];
+        final String propFileName = args.length > 1 ? args[1] : null;
+
+        final Properties streamsProperties = Utils.loadProps(propFileName);
+
+        System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.11.0)");
+        System.out.println("kafka=" + kafka);
+        System.out.println("props=" + streamsProperties);
+
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KStream dataStream = builder.stream("data");
+        dataStream.process(printProcessorSupplier());
+        dataStream.to("echo");
+
+        final Properties config = new Properties();
+        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
+        config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        config.putAll(streamsProperties);
+
+        final KafkaStreams streams = new KafkaStreams(builder, config);
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                streams.close();
+                System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+                System.out.flush();
+            }
+        });
+    }
+
+    private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
+        return new ProcessorSupplier<K, V>() {
+            public Processor<K, V> get() {
+                return new AbstractProcessor<K, V>() {
+                    private int numRecordsProcessed = 0;
+
+                    @Override
+                    public void init(final ProcessorContext context) {
+                        System.out.println("initializing processor: topic=data taskId=" + context.taskId());
+                        numRecordsProcessed = 0;
+                    }
+
+                    @Override
+                    public void process(final K key, final V value) {
+                        numRecordsProcessed++;
+                        if (numRecordsProcessed % 100 == 0) {
+                            System.out.println("processed " + numRecordsProcessed + " records from topic=data");
+                        }
+                    }
+
+                    @Override
+                    public void punctuate(final long timestamp) {}
+
+                    @Override
+                    public void close() {}
+                };
+            }
+        };
+    }
+}
diff --git a/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
new file mode 100644
index 0000000..dc72f2d
--- /dev/null
+++ b/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.util.Properties;
+
+public class StreamsUpgradeTest {
+
+    /**
+     * This test cannot be executed, as long as Kafka 1.0.2 is not released
+     */
+    @SuppressWarnings("unchecked")
+    public static void main(final String[] args) throws Exception {
+        if (args.length < 2) {
+            System.err.println("StreamsUpgradeTest requires three argument (kafka-url, properties-file) but only " + args.length + " provided: "
+                + (args.length > 0 ? args[0] : ""));
+        }
+        final String kafka = args[0];
+        final String propFileName = args.length > 1 ? args[1] : null;
+
+        final Properties streamsProperties = Utils.loadProps(propFileName);
+
+        System.out.println("StreamsTest instance started (StreamsUpgradeTest v1.0)");
+        System.out.println("kafka=" + kafka);
+        System.out.println("props=" + streamsProperties);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream dataStream = builder.stream("data");
+        dataStream.process(printProcessorSupplier());
+        dataStream.to("echo");
+
+        final Properties config = new Properties();
+        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
+        config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        config.putAll(streamsProperties);
+
+        final KafkaStreams streams = new KafkaStreams(builder.build(), config);
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                streams.close();
+                System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+                System.out.flush();
+            }
+        });
+    }
+
+    private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
+        return new ProcessorSupplier<K, V>() {
+            public Processor<K, V> get() {
+                return new AbstractProcessor<K, V>() {
+                    private int numRecordsProcessed = 0;
+
+                    @Override
+                    public void init(final ProcessorContext context) {
+                        System.out.println("initializing processor: topic=data taskId=" + context.taskId());
+                        numRecordsProcessed = 0;
+                    }
+
+                    @Override
+                    public void process(final K key, final V value) {
+                        numRecordsProcessed++;
+                        if (numRecordsProcessed % 100 == 0) {
+                            System.out.println("processed " + numRecordsProcessed + " records from topic=data");
+                        }
+                    }
+
+                    @Override
+                    public void punctuate(final long timestamp) {}
+
+                    @Override
+                    public void close() {}
+                };
+            }
+        };
+    }
+}
diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile
index 57ca242..25da8db 100644
--- a/tests/docker/Dockerfile
+++ b/tests/docker/Dockerfile
@@ -39,24 +39,36 @@ COPY ./ssh-config /root/.ssh/config
 RUN ssh-keygen -q -t rsa -N '' -f /root/.ssh/id_rsa && cp -f /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys
 
 # Install binary test dependencies.
+# we use the same versions as in vagrant/base.sh
 ARG KAFKA_MIRROR="https://s3-us-west-2.amazonaws.com/kafka-packages"
-RUN mkdir -p "/opt/kafka-0.8.2.2" && chmod a+rw /opt/kafka-0.8.2.2 && curl -s "$KAFKA_MIRROR/kafka_2.10-0.8.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.8.2.2"
+RUN mkdir -p "/opt/kafka-0.8.2.2" && chmod a+rw /opt/kafka-0.8.2.2 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.8.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.8.2.2"
 RUN mkdir -p "/opt/kafka-0.9.0.1" && chmod a+rw /opt/kafka-0.9.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.9.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.9.0.1"
+RUN mkdir -p "/opt/kafka-0.10.0.0" && chmod a+rw /opt/kafka-0.10.0.0 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.0.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.0.0"
 RUN mkdir -p "/opt/kafka-0.10.0.1" && chmod a+rw /opt/kafka-0.10.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.0.1"
+RUN mkdir -p "/opt/kafka-0.10.1.0" && chmod a+rw /opt/kafka-0.10.1.0 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.1.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.1.0"
 RUN mkdir -p "/opt/kafka-0.10.1.1" && chmod a+rw /opt/kafka-0.10.1.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.1.1"
+RUN mkdir -p "/opt/kafka-0.10.2.0" && chmod a+rw /opt/kafka-0.10.2.0 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.2.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.2.0"
 RUN mkdir -p "/opt/kafka-0.10.2.1" && chmod a+rw /opt/kafka-0.10.2.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.2.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.2.1"
 RUN mkdir -p "/opt/kafka-0.11.0.0" && chmod a+rw /opt/kafka-0.11.0.0 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.11.0.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.0"
+RUN mkdir -p "/opt/kafka-0.11.0.1" && chmod a+rw /opt/kafka-0.11.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.11.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.1"
 RUN mkdir -p "/opt/kafka-0.11.0.2" && chmod a+rw /opt/kafka-0.11.0.2 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.11.0.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.2"
 RUN mkdir -p "/opt/kafka-1.0.0" && chmod a+rw /opt/kafka-1.0.0 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.0.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.0.0"
 RUN mkdir -p "/opt/kafka-1.0.1" && chmod a+rw /opt/kafka-1.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.0.1"
+RUN mkdir -p "/opt/kafka-1.1.0" && chmod a+rw /opt/kafka-1.1.0 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.1.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.1.0"
 
 # Streams test dependencies
-RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.1.1-test.jar" -o /opt/kafka-0.10.1.1/libs/kafka-streams-0.10.1.1-test.jar && \
-    curl -s "$KAFKA_MIRROR/kafka-streams-0.10.2.1-test.jar" -o /opt/kafka-0.10.2.1/libs/kafka-streams-0.10.2.1-test.jar && \
-    curl -s "$KAFKA_MIRROR/kafka-streams-0.11.0.0-test.jar" -o /opt/kafka-0.11.0.0/libs/kafka-streams-0.11.0.0-test.jar && \
-    curl -s "$KAFKA_MIRROR/kafka-streams-0.11.0.2-test.jar" -o /opt/kafka-0.11.0.2/libs/kafka-streams-0.11.0.2-test.jar && \
-    curl -s "$KAFKA_MIRROR/kafka-streams-1.0.0-test.jar" -o /opt/kafka-1.0.0/libs/kafka-streams-1.0.0-test.jar && \
-    curl -s "$KAFKA_MIRROR/kafka-streams-1.0.1-test.jar" -o /opt/kafka-1.0.1/libs/kafka-streams-1.0.1-test.jar
+RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.0.0-test.jar" -o /opt/kafka-0.10.0.0/libs/kafka-streams-0.10.0.0-test.jar
+RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.0.1-test.jar" -o /opt/kafka-0.10.0.1/libs/kafka-streams-0.10.0.1-test.jar
+RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.1.0-test.jar" -o /opt/kafka-0.10.1.0/libs/kafka-streams-0.10.1.0-test.jar
+RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.1.1-test.jar" -o /opt/kafka-0.10.1.1/libs/kafka-streams-0.10.1.1-test.jar
+RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.2.0-test.jar" -o /opt/kafka-0.10.2.0/libs/kafka-streams-0.10.2.0-test.jar
+RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.2.1-test.jar" -o /opt/kafka-0.10.2.1/libs/kafka-streams-0.10.2.1-test.jar
+RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.11.0.0-test.jar" -o /opt/kafka-0.11.0.0/libs/kafka-streams-0.11.0.0-test.jar
+RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.11.0.1-test.jar" -o /opt/kafka-0.11.0.1/libs/kafka-streams-0.11.0.1-test.jar
+RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.11.0.2-test.jar" -o /opt/kafka-0.11.0.2/libs/kafka-streams-0.11.0.2-test.jar
+RUN curl -s "$KAFKA_MIRROR/kafka-streams-1.0.0-test.jar" -o /opt/kafka-1.0.0/libs/kafka-streams-1.0.0-test.jar
+RUN curl -s "$KAFKA_MIRROR/kafka-streams-1.0.1-test.jar" -o /opt/kafka-1.0.1/libs/kafka-streams-1.0.1-test.jar
+RUN curl -s "$KAFKA_MIRROR/kafka-streams-1.1.0-test.jar" -o /opt/kafka-1.1.0/libs/kafka-streams-1.1.0-test.jar
 
 # The version of Kibosh to use for testing.
 # If you update this, also update vagrant/base.sy
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index d9b475e..a5be816 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -21,6 +21,7 @@ from ducktape.utils.util import wait_until
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 from kafkatest.services.monitor.jmx import JmxMixin
 from kafkatest.services.kafka import KafkaConfig
+from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1
 
 STATE_DIR = "state.dir"
 
@@ -39,6 +40,8 @@ class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
     LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
     PID_FILE = os.path.join(PERSISTENT_ROOT, "streams.pid")
 
+    CLEAN_NODE_ENABLED = True
+
     logs = {
         "streams_log": {
             "path": LOG_FILE,
@@ -49,6 +52,114 @@ class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
         "streams_stderr": {
             "path": STDERR_FILE,
             "collect_default": True},
+        "streams_log.0-1": {
+            "path": LOG_FILE + ".0-1",
+            "collect_default": True},
+        "streams_stdout.0-1": {
+            "path": STDOUT_FILE + ".0-1",
+            "collect_default": True},
+        "streams_stderr.0-1": {
+            "path": STDERR_FILE + ".0-1",
+            "collect_default": True},
+        "streams_log.0-2": {
+            "path": LOG_FILE + ".0-2",
+            "collect_default": True},
+        "streams_stdout.0-2": {
+            "path": STDOUT_FILE + ".0-2",
+            "collect_default": True},
+        "streams_stderr.0-2": {
+            "path": STDERR_FILE + ".0-2",
+            "collect_default": True},
+        "streams_log.0-3": {
+            "path": LOG_FILE + ".0-3",
+            "collect_default": True},
+        "streams_stdout.0-3": {
+            "path": STDOUT_FILE + ".0-3",
+            "collect_default": True},
+        "streams_stderr.0-3": {
+            "path": STDERR_FILE + ".0-3",
+            "collect_default": True},
+        "streams_log.0-4": {
+            "path": LOG_FILE + ".0-4",
+            "collect_default": True},
+        "streams_stdout.0-4": {
+            "path": STDOUT_FILE + ".0-4",
+            "collect_default": True},
+        "streams_stderr.0-4": {
+            "path": STDERR_FILE + ".0-4",
+            "collect_default": True},
+        "streams_log.0-5": {
+            "path": LOG_FILE + ".0-5",
+            "collect_default": True},
+        "streams_stdout.0-5": {
+            "path": STDOUT_FILE + ".0-5",
+            "collect_default": True},
+        "streams_stderr.0-5": {
+            "path": STDERR_FILE + ".0-5",
+            "collect_default": True},
+        "streams_log.0-6": {
+            "path": LOG_FILE + ".0-6",
+            "collect_default": True},
+        "streams_stdout.0-6": {
+            "path": STDOUT_FILE + ".0-6",
+            "collect_default": True},
+        "streams_stderr.0-6": {
+            "path": STDERR_FILE + ".0-6",
+            "collect_default": True},
+        "streams_log.1-1": {
+            "path": LOG_FILE + ".1-1",
+            "collect_default": True},
+        "streams_stdout.1-1": {
+            "path": STDOUT_FILE + ".1-1",
+            "collect_default": True},
+        "streams_stderr.1-1": {
+            "path": STDERR_FILE + ".1-1",
+            "collect_default": True},
+        "streams_log.1-2": {
+            "path": LOG_FILE + ".1-2",
+            "collect_default": True},
+        "streams_stdout.1-2": {
+            "path": STDOUT_FILE + ".1-2",
+            "collect_default": True},
+        "streams_stderr.1-2": {
+            "path": STDERR_FILE + ".1-2",
+            "collect_default": True},
+        "streams_log.1-3": {
+            "path": LOG_FILE + ".1-3",
+            "collect_default": True},
+        "streams_stdout.1-3": {
+            "path": STDOUT_FILE + ".1-3",
+            "collect_default": True},
+        "streams_stderr.1-3": {
+            "path": STDERR_FILE + ".1-3",
+            "collect_default": True},
+        "streams_log.1-4": {
+            "path": LOG_FILE + ".1-4",
+            "collect_default": True},
+        "streams_stdout.1-4": {
+            "path": STDOUT_FILE + ".1-4",
+            "collect_default": True},
+        "streams_stderr.1-4": {
+            "path": STDERR_FILE + ".1-4",
+            "collect_default": True},
+        "streams_log.1-5": {
+            "path": LOG_FILE + ".1-5",
+            "collect_default": True},
+        "streams_stdout.1-5": {
+            "path": STDOUT_FILE + ".1-5",
+            "collect_default": True},
+        "streams_stderr.1-5": {
+            "path": STDERR_FILE + ".1-5",
+            "collect_default": True},
+        "streams_log.1-6": {
+            "path": LOG_FILE + ".1-6",
+            "collect_default": True},
+        "streams_stdout.1-6": {
+            "path": STDOUT_FILE + ".1-6",
+            "collect_default": True},
+        "streams_stderr.1-6": {
+            "path": STDERR_FILE + ".1-6",
+            "collect_default": True},
         "jmx_log": {
             "path": JMX_LOG_FILE,
             "collect_default": True},
@@ -120,7 +231,8 @@ class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
 
     def clean_node(self, node):
         node.account.kill_process("streams", clean_shutdown=False, allow_fail=True)
-        node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False)
+        if self.CLEAN_NODE_ENABLED:
+            node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False)
 
     def start_cmd(self, node):
         args = self.args.copy()
@@ -141,13 +253,13 @@ class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
 
         return cmd
 
-    def prop_file(self, node):
+    def prop_file(self):
         cfg = KafkaConfig(**{STATE_DIR: self.PERSISTENT_ROOT})
         return cfg.render()
 
     def start_node(self, node):
         node.account.mkdirs(self.PERSISTENT_ROOT)
-        prop_file = self.prop_file(node)
+        prop_file = self.prop_file()
         node.account.create_file(self.CONFIG_FILE, prop_file)
         node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('tools_log4j.properties', log_file=self.LOG_FILE))
 
@@ -189,7 +301,28 @@ class StreamsEosTestBaseService(StreamsTestBaseService):
 class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService):
     def __init__(self, test_context, kafka):
         super(StreamsSmokeTestDriverService, self).__init__(test_context, kafka, "run")
+        self.DISABLE_AUTO_TERMINATE = ""
 
+    def disable_auto_terminate(self):
+        self.DISABLE_AUTO_TERMINATE = "disableAutoTerminate"
+
+    def start_cmd(self, node):
+        args = self.args.copy()
+        args['kafka'] = self.kafka.bootstrap_servers()
+        args['config_file'] = self.CONFIG_FILE
+        args['stdout'] = self.STDOUT_FILE
+        args['stderr'] = self.STDERR_FILE
+        args['pidfile'] = self.PID_FILE
+        args['log4j'] = self.LOG4J_CONFIG_FILE
+        args['disable_auto_terminate'] = self.DISABLE_AUTO_TERMINATE
+        args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
+
+        cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
+              "INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
+              " %(kafka)s %(config_file)s %(user_test_args)s %(disable_auto_terminate)s" \
+              " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
+
+        return cmd
 
 class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService):
     def __init__(self, test_context, kafka):
@@ -273,3 +406,48 @@ class StreamsRepeatingIntegerKeyProducerService(StreamsTestBaseService):
                                                                         kafka,
                                                                         "org.apache.kafka.streams.tests.StreamsRepeatingIntegerKeyProducer",
                                                                         configs)
+class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService):
+    def __init__(self, test_context, kafka):
+        super(StreamsUpgradeTestJobRunnerService, self).__init__(test_context,
+                                                                 kafka,
+                                                                 "org.apache.kafka.streams.tests.StreamsUpgradeTest",
+                                                                 "")
+        self.UPGRADE_FROM = None
+
+    def set_version(self, kafka_streams_version):
+        self.KAFKA_STREAMS_VERSION = kafka_streams_version
+
+    def set_upgrade_from(self, upgrade_from):
+        self.UPGRADE_FROM = upgrade_from
+
+    def prop_file(self):
+        properties = {STATE_DIR: self.PERSISTENT_ROOT}
+        if self.UPGRADE_FROM is not None:
+            properties['upgrade.from'] = self.UPGRADE_FROM
+
+        cfg = KafkaConfig(**properties)
+        return cfg.render()
+
+    def start_cmd(self, node):
+        args = self.args.copy()
+        args['kafka'] = self.kafka.bootstrap_servers()
+        if self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_0) or self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_1):
+            args['zk'] = self.kafka.zk.connect_setting()
+        else:
+            args['zk'] = ""
+        args['config_file'] = self.CONFIG_FILE
+        args['stdout'] = self.STDOUT_FILE
+        args['stderr'] = self.STDERR_FILE
+        args['pidfile'] = self.PID_FILE
+        args['log4j'] = self.LOG4J_CONFIG_FILE
+        args['version'] = self.KAFKA_STREAMS_VERSION
+        args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
+
+        cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
+              "INCLUDE_TEST_JARS=true UPGRADE_KAFKA_STREAMS_TEST_VERSION=%(version)s " \
+              " %(kafka_run_class)s %(streams_class_name)s  %(kafka)s %(zk)s %(config_file)s " \
+              " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
+
+        self.logger.info("Executing: " + cmd)
+
+        return cmd
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index 3b38ff6..fa79d57 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -13,25 +13,50 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import time
-from ducktape.mark import ignore
-from ducktape.mark import matrix
+from ducktape.mark import ignore, matrix, parametrize
 from ducktape.mark.resource import cluster
 from ducktape.tests.test import Test
 from kafkatest.services.kafka import KafkaService
-from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
+from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService, StreamsUpgradeTestJobRunnerService
 from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.version import LATEST_0_10_2, LATEST_0_11, LATEST_1_0, DEV_BRANCH, KafkaVersion
+from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, DEV_BRANCH, DEV_VERSION, KafkaVersion
+import random
+import time
 
-upgrade_versions = [str(LATEST_0_10_2), str(LATEST_0_11), str(LATEST_1_0), str(DEV_BRANCH)]
+broker_upgrade_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), str(DEV_BRANCH)]
+simple_upgrade_versions_metadata_version_2 = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(DEV_VERSION)]
 
 class StreamsUpgradeTest(Test):
     """
-    Tests rolling upgrades and downgrades of the Kafka Streams library.
+    Test upgrading Kafka Streams (all version combination)
+    If metadata was changes, upgrade is more difficult
+    Metadata version was bumped in 0.10.1.0
     """
 
     def __init__(self, test_context):
         super(StreamsUpgradeTest, self).__init__(test_context)
+        self.topics = {
+            'echo' : { 'partitions': 5 },
+            'data' : { 'partitions': 5 },
+        }
+
+    def perform_broker_upgrade(self, to_version):
+        self.logger.info("First pass bounce - rolling broker upgrade")
+        for node in self.kafka.nodes:
+            self.kafka.stop_node(node)
+            node.version = KafkaVersion(to_version)
+            self.kafka.start_node(node)
+
+    @cluster(num_nodes=6)
+    @matrix(from_version=broker_upgrade_versions, to_version=broker_upgrade_versions)
+    def test_upgrade_downgrade_brokers(self, from_version, to_version):
+        """
+        Start a smoke test client then perform rolling upgrades on the broker.
+        """
+
+        if from_version == to_version:
+            return
+
         self.replication = 3
         self.partitions = 1
         self.isr = 2
@@ -58,112 +83,272 @@ class StreamsUpgradeTest(Test):
                        'configs': {"min.insync.replicas": self.isr} }
         }
 
+        # Setup phase
+        self.zk = ZookeeperService(self.test_context, num_nodes=1)
+        self.zk.start()
 
-    def perform_streams_upgrade(self, to_version):
-        self.logger.info("First pass bounce - rolling streams upgrade")
+        # number of nodes needs to be >= 3 for the smoke test
+        self.kafka = KafkaService(self.test_context, num_nodes=3,
+                                  zk=self.zk, version=KafkaVersion(from_version), topics=self.topics)
+        self.kafka.start()
 
-        # get the node running the streams app
-        node = self.processor1.node
-        self.processor1.stop()
+        # allow some time for topics to be created
+        time.sleep(10)
 
-        # change it's version. This will automatically make it pick up a different
-        # JAR when it starts again
-        node.version = KafkaVersion(to_version)
+        self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
+        self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
+        
+        self.driver.start()
         self.processor1.start()
+        time.sleep(15)
 
-    def perform_broker_upgrade(self, to_version):
-        self.logger.info("First pass bounce - rolling broker upgrade")
-        for node in self.kafka.nodes:
-            self.kafka.stop_node(node)
-            node.version = KafkaVersion(to_version)
-            self.kafka.start_node(node)
+        self.perform_broker_upgrade(to_version)
 
-    @ignore
-    @cluster(num_nodes=6)
-    @matrix(from_version=upgrade_versions, to_version=upgrade_versions)
-    def test_upgrade_downgrade_streams(self, from_version, to_version):
-        """
-        Start a smoke test client, then abort (kill -9) and restart it a few times.
-        Ensure that all records are delivered.
+        time.sleep(15)
+        self.driver.wait()
+        self.driver.stop()
+
+        self.processor1.stop()
 
-        Note, that just like tests/core/upgrade_test.py, a prerequisite for this test to succeed
-        if the inclusion of all parametrized versions of kafka in kafka/vagrant/base.sh 
-        (search for get_kafka()). For streams in particular, that means that someone has manually
-        copies the kafka-stream-$version-test.jar in the right S3 bucket as shown in base.sh.
+        node = self.driver.node
+        node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False)
+        self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False)
+
+    @matrix(from_version=simple_upgrade_versions_metadata_version_2, to_version=simple_upgrade_versions_metadata_version_2)
+    def test_simple_upgrade_downgrade(self, from_version, to_version):
+        """
+        Starts 3 KafkaStreams instances with <old_version>, and upgrades one-by-one to <new_version>
         """
-        if from_version != to_version:
-            # Setup phase
-            self.zk = ZookeeperService(self.test_context, num_nodes=1)
-            self.zk.start()
 
-            # number of nodes needs to be >= 3 for the smoke test
-            self.kafka = KafkaService(self.test_context, num_nodes=3,
-                                      zk=self.zk, version=KafkaVersion(from_version), topics=self.topics)
-            self.kafka.start()
+        if from_version == to_version:
+            return
 
-            # allow some time for topics to be created
-            time.sleep(10)
+        self.zk = ZookeeperService(self.test_context, num_nodes=1)
+        self.zk.start()
 
-            self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
-            self.driver.node.version = KafkaVersion(from_version)
-            self.driver.start()
+        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, topics=self.topics)
+        self.kafka.start()
 
-            self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
-            self.processor1.node.version = KafkaVersion(from_version)
-            self.processor1.start()
+        self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
+        self.driver.disable_auto_terminate()
+        self.processor1 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
+        self.processor2 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
+        self.processor3 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
 
-            time.sleep(15)
+        self.driver.start()
+        self.start_all_nodes_with(from_version)
 
-            self.perform_streams_upgrade(to_version)
+        self.processors = [self.processor1, self.processor2, self.processor3]
 
-            time.sleep(15)
-            self.driver.wait()
-            self.driver.stop()
+        counter = 1
+        random.seed()
 
-            self.processor1.stop()
+        # upgrade one-by-one via rolling bounce
+        random.shuffle(self.processors)
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False
+            self.do_rolling_bounce(p, None, to_version, counter)
+            counter = counter + 1
 
-            self.driver.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False)
-            self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False)
+        # shutdown
+        self.driver.stop()
+        self.driver.wait()
 
+        random.shuffle(self.processors)
+        for p in self.processors:
+            node = p.node
+            with node.account.monitor_log(p.STDOUT_FILE) as monitor:
+                p.stop()
+                monitor.wait_until("UPGRADE-TEST-CLIENT-CLOSED",
+                                   timeout_sec=60,
+                                   err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account))
 
-    @ignore
-    @cluster(num_nodes=6)
-    @matrix(from_version=upgrade_versions, to_version=upgrade_versions)
-    def test_upgrade_brokers(self, from_version, to_version):
+        self.driver.stop()
+
+    #@parametrize(new_version=str(LATEST_0_10_1)) we cannot run this test until Kafka 0.10.1.2 is released
+    #@parametrize(new_version=str(LATEST_0_10_2)) we cannot run this test until Kafka 0.10.2.2 is released
+    #@parametrize(new_version=str(LATEST_0_11_0)) we cannot run this test until Kafka 0.11.0.3 is released
+    #@parametrize(new_version=str(LATEST_1_0)) we cannot run this test until Kafka 1.0.2 is released
+    #@parametrize(new_version=str(LATEST_1_1)) we cannot run this test until Kafka 1.1.1 is released
+    @parametrize(new_version=str(DEV_VERSION))
+    def test_metadata_upgrade(self, new_version):
         """
-        Start a smoke test client then perform rolling upgrades on the broker.
+        Starts 3 KafkaStreams instances with version 0.10.0, and upgrades one-by-one to <new_version>
         """
-        if from_version != to_version:
-            # Setup phase
-            self.zk = ZookeeperService(self.test_context, num_nodes=1)
-            self.zk.start()
 
-            # number of nodes needs to be >= 3 for the smoke test
-            self.kafka = KafkaService(self.test_context, num_nodes=3,
-                                      zk=self.zk, version=KafkaVersion(from_version), topics=self.topics)
-            self.kafka.start()
+        self.zk = ZookeeperService(self.test_context, num_nodes=1)
+        self.zk.start()
+
+        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, topics=self.topics)
+        self.kafka.start()
+
+        self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
+        self.driver.disable_auto_terminate()
+        self.processor1 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
+        self.processor2 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
+        self.processor3 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
+
+        self.driver.start()
+        self.start_all_nodes_with(str(LATEST_0_10_0))
+
+        self.processors = [self.processor1, self.processor2, self.processor3]
+
+        counter = 1
+        random.seed()
+
+        # first rolling bounce
+        random.shuffle(self.processors)
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False
+            self.do_rolling_bounce(p, "0.10.0", new_version, counter)
+            counter = counter + 1
+
+        # second rolling bounce
+        random.shuffle(self.processors)
+        for p in self.processors:
+            self.do_rolling_bounce(p, None, new_version, counter)
+            counter = counter + 1
+
+        # shutdown
+        self.driver.stop()
+        self.driver.wait()
+
+        random.shuffle(self.processors)
+        for p in self.processors:
+            node = p.node
+            with node.account.monitor_log(p.STDOUT_FILE) as monitor:
+                p.stop()
+                monitor.wait_until("UPGRADE-TEST-CLIENT-CLOSED",
+                                   timeout_sec=60,
+                                   err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account))
+
+        self.driver.stop()
+
+    def start_all_nodes_with(self, version):
+        # start first with <version>
+        self.prepare_for(self.processor1, version)
+        node1 = self.processor1.node
+        with node1.account.monitor_log(self.processor1.STDOUT_FILE) as monitor:
+            with node1.account.monitor_log(self.processor1.LOG_FILE) as log_monitor:
+                self.processor1.start()
+                log_monitor.wait_until("Kafka version : " + version,
+                                       timeout_sec=60,
+                                       err_msg="Could not detect Kafka Streams version " + version + " " + str(node1.account))
+                monitor.wait_until("processed 100 records from topic",
+                                   timeout_sec=60,
+                                   err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account))
+
+        # start second with <version>
+        self.prepare_for(self.processor2, version)
+        node2 = self.processor2.node
+        with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor:
+            with node2.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor:
+                with node2.account.monitor_log(self.processor2.LOG_FILE) as log_monitor:
+                    self.processor2.start()
+                    log_monitor.wait_until("Kafka version : " + version,
+                                           timeout_sec=60,
+                                           err_msg="Could not detect Kafka Streams version " + version + " " + str(node2.account))
+                    first_monitor.wait_until("processed 100 records from topic",
+                                             timeout_sec=60,
+                                             err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account))
+                    second_monitor.wait_until("processed 100 records from topic",
+                                              timeout_sec=60,
+                                              err_msg="Never saw output 'processed 100 records from topic' on" + str(node2.account))
+
+        # start third with <version>
+        self.prepare_for(self.processor3, version)
+        node3 = self.processor3.node
+        with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor:
+            with node2.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor:
+                with node3.account.monitor_log(self.processor3.STDOUT_FILE) as third_monitor:
+                    with node3.account.monitor_log(self.processor3.LOG_FILE) as log_monitor:
+                        self.processor3.start()
+                        log_monitor.wait_until("Kafka version : " + version,
+                                               timeout_sec=60,
+                                               err_msg="Could not detect Kafka Streams version " + version + " " + str(node3.account))
+                        first_monitor.wait_until("processed 100 records from topic",
+                                                 timeout_sec=60,
+                                                 err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account))
+                        second_monitor.wait_until("processed 100 records from topic",
+                                                  timeout_sec=60,
+                                                  err_msg="Never saw output 'processed 100 records from topic' on" + str(node2.account))
+                        third_monitor.wait_until("processed 100 records from topic",
+                                                  timeout_sec=60,
+                                                  err_msg="Never saw output 'processed 100 records from topic' on" + str(node3.account))
+
+    @staticmethod
+    def prepare_for(processor, version):
+        processor.node.account.ssh("rm -rf " + processor.PERSISTENT_ROOT, allow_fail=False)
+        if version == str(DEV_VERSION):
+            processor.set_version("")  # set to TRUNK
+        else:
+            processor.set_version(version)
+
+    def do_rolling_bounce(self, processor, upgrade_from, new_version, counter):
+        first_other_processor = None
+        second_other_processor = None
+        for p in self.processors:
+            if p != processor:
+                if first_other_processor is None:
+                    first_other_processor = p
+                else:
+                    second_other_processor = p
+
+        node = processor.node
+        first_other_node = first_other_processor.node
+        second_other_node = second_other_processor.node
 
-            # allow some time for topics to be created
-            time.sleep(10)
+        # stop processor and wait for rebalance of others
+        with first_other_node.account.monitor_log(first_other_processor.STDOUT_FILE) as first_other_monitor:
+            with second_other_node.account.monitor_log(second_other_processor.STDOUT_FILE) as second_other_monitor:
+                processor.stop()
+                first_other_monitor.wait_until("processed 100 records from topic",
+                                               timeout_sec=60,
+                                               err_msg="Never saw output 'processed 100 records from topic' on" + str(first_other_node.account))
+                second_other_monitor.wait_until("processed 100 records from topic",
+                                                timeout_sec=60,
+                                                err_msg="Never saw output 'processed 100 records from topic' on" + str(second_other_node.account))
+        node.account.ssh_capture("grep UPGRADE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False)
 
-            # use the current (dev) version driver
-            self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
-            self.driver.node.version = KafkaVersion(from_version)
-            self.driver.start()
+        if upgrade_from is None:  # upgrade disabled -- second round of rolling bounces
+            roll_counter = ".1-"  # second round of rolling bounces
+        else:
+            roll_counter = ".0-"  # first  round of rolling boundes
 
-            self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
-            self.processor1.node.version = KafkaVersion(from_version)
-            self.processor1.start()
+        node.account.ssh("mv " + processor.STDOUT_FILE + " " + processor.STDOUT_FILE + roll_counter + str(counter), allow_fail=False)
+        node.account.ssh("mv " + processor.STDERR_FILE + " " + processor.STDERR_FILE + roll_counter + str(counter), allow_fail=False)
+        node.account.ssh("mv " + processor.LOG_FILE + " " + processor.LOG_FILE + roll_counter + str(counter), allow_fail=False)
 
-            time.sleep(15)
+        if new_version == str(DEV_VERSION):
+            processor.set_version("")  # set to TRUNK
+        else:
+            processor.set_version(new_version)
+        processor.set_upgrade_from(upgrade_from)
 
-            self.perform_broker_upgrade(to_version)
+        grep_metadata_error = "grep \"org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode subscription data: version=2\" "
+        with node.account.monitor_log(processor.STDOUT_FILE) as monitor:
+            with node.account.monitor_log(processor.LOG_FILE) as log_monitor:
+                with first_other_node.account.monitor_log(first_other_processor.STDOUT_FILE) as first_other_monitor:
+                    with second_other_node.account.monitor_log(second_other_processor.STDOUT_FILE) as second_other_monitor:
+                        processor.start()
 
-            time.sleep(15)
-            self.driver.wait()
-            self.driver.stop()
+                        log_monitor.wait_until("Kafka version : " + new_version,
+                                               timeout_sec=60,
+                                               err_msg="Could not detect Kafka Streams version " + new_version + " " + str(node.account))
+                        first_other_monitor.wait_until("processed 100 records from topic",
+                                                       timeout_sec=60,
+                                                       err_msg="Never saw output 'processed 100 records from topic' on" + str(first_other_node.account))
+                        found = list(first_other_node.account.ssh_capture(grep_metadata_error + first_other_processor.STDERR_FILE, allow_fail=True))
+                        if len(found) > 0:
+                            raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'")
 
-            self.processor1.stop()
+                        second_other_monitor.wait_until("processed 100 records from topic",
+                                                        timeout_sec=60,
+                                                        err_msg="Never saw output 'processed 100 records from topic' on" + str(second_other_node.account))
+                        found = list(second_other_node.account.ssh_capture(grep_metadata_error + second_other_processor.STDERR_FILE, allow_fail=True))
+                        if len(found) > 0:
+                            raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'")
 
-            self.driver.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False)
-            self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False)
+                        monitor.wait_until("processed 100 records from topic",
+                                           timeout_sec=60,
+                                           err_msg="Never saw output 'processed 100 records from topic' on" + str(node.account))
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index b7071e7..66e5fcf 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -61,6 +61,7 @@ def get_version(node=None):
         return DEV_BRANCH
 
 DEV_BRANCH = KafkaVersion("dev")
+DEV_VERSION = KafkaVersion("1.2.0-SNAPSHOT")
 
 # 0.8.2.X versions
 V_0_8_2_1 = KafkaVersion("0.8.2.1")
@@ -89,7 +90,7 @@ LATEST_0_10_2 = V_0_10_2_1
 
 LATEST_0_10 = LATEST_0_10_2
 
-# 0.11.0.0 versions
+# 0.11.0.x versions
 V_0_11_0_0 = KafkaVersion("0.11.0.0")
 V_0_11_0_1 = KafkaVersion("0.11.0.1")
 V_0_11_0_2 = KafkaVersion("0.11.0.2")
diff --git a/vagrant/base.sh b/vagrant/base.sh
index bfc3496..f5c03cc 100755
--- a/vagrant/base.sh
+++ b/vagrant/base.sh
@@ -99,8 +99,10 @@ popd
 popd
 popd
 
-# Test multiple Scala versions
-get_kafka 0.8.2.2 2.10
+# Test multiple Kafka versions
+# we want to use the latest Scala version per Kafka version
+# however, we cannot pull in Scala 2.12 builds atm, because Scala 2.12 requires Java 8, but we use Java 7 to run the system tests
+get_kafka 0.8.2.2 2.11
 chmod a+rw /opt/kafka-0.8.2.2
 get_kafka 0.9.0.1 2.11
 chmod a+rw /opt/kafka-0.9.0.1
@@ -112,10 +114,10 @@ get_kafka 0.10.2.1 2.11
 chmod a+rw /opt/kafka-0.10.2.1
 get_kafka 0.11.0.2 2.11
 chmod a+rw /opt/kafka-0.11.0.2
-get_kafka 1.0.0 2.11
-chmod a+rw /opt/kafka-1.0.0
 get_kafka 1.0.1 2.11
 chmod a+rw /opt/kafka-1.0.1
+get_kafka 1.1.0 2.11
+chmod a+rw /opt/kafka-1.1.0
 
 
 # For EC2 nodes, we want to use /mnt, which should have the local disk. On local

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.

Mime
View raw message