kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch 0.10.2 updated: KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0 (#4758)
Date Mon, 26 Mar 2018 23:31:50 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 0.10.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/0.10.2 by this push:
     new 67b0a94  KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0 (#4758)
67b0a94 is described below

commit 67b0a94fa2856928809435ad8f442dd3c96ba544
Author: Matthias J. Sax <mjsax@apache.org>
AuthorDate: Mon Mar 26 16:31:47 2018 -0700

    KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0 (#4758)
    
    Introduces new config parameter `upgrade.from`.
    
    Reviewers: Guozhang Wang <guozhang@confluent.io>, Bill Bejeck <bill@confluent.io>
---
 bin/kafka-run-class.sh                             |  40 +++-
 build.gradle                                       |  24 ++
 .../authenticator/SaslClientCallbackHandler.java   |   9 +-
 docs/streams.html                                  |  39 +++-
 docs/upgrade.html                                  |  22 ++
 gradle/dependencies.gradle                         |   4 +
 settings.gradle                                    |   3 +-
 .../org/apache/kafka/streams/StreamsConfig.java    |  19 +-
 .../internals/StreamPartitionAssignor.java         |  18 +-
 .../internals/assignment/AssignmentInfo.java       |   7 +-
 .../internals/assignment/SubscriptionInfo.java     |   5 +-
 .../org/apache/kafka/streams/KafkaStreamsTest.java |  36 +--
 .../apache/kafka/streams/StreamsConfigTest.java    |  42 ++--
 .../streams/integration/FanoutIntegrationTest.java |   2 +
 .../KStreamAggregationDedupIntegrationTest.java    |  11 +-
 .../KStreamAggregationIntegrationTest.java         |  12 +-
 .../integration/QueryableStateIntegrationTest.java |   7 +-
 .../internals/StreamPartitionAssignorTest.java     | 125 ++++++++---
 .../internals/assignment/AssignmentInfoTest.java   |   3 +-
 .../kafka/streams/tests/SmokeTestClient.java       |  10 +-
 .../kafka/streams/tests/SmokeTestDriver.java       |  38 ++--
 .../apache/kafka/streams/tests/SmokeTestUtil.java  |  11 +-
 .../kafka/streams/tests/StreamsSmokeTest.java      |  14 +-
 .../kafka/streams/tests/StreamsUpgradeTest.java    |  73 +++++++
 .../kafka/streams/tests/StreamsUpgradeTest.java    | 104 +++++++++
 .../kafka/streams/tests/StreamsUpgradeTest.java    | 114 ++++++++++
 tests/kafkatest/services/streams.py                | 173 ++++++++++++++-
 .../tests/streams/streams_upgrade_test.py          | 242 +++++++++++++++++++++
 tests/kafkatest/version.py                         |   1 +
 vagrant/base.sh                                    |   2 +
 30 files changed, 1060 insertions(+), 150 deletions(-)

diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index af10f61..a258681 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -73,28 +73,50 @@ do
   fi
 done
 
-for file in "$base_dir"/clients/build/libs/kafka-clients*.jar;
-do
-  if should_include_file "$file"; then
-    CLASSPATH="$CLASSPATH":"$file"
-  fi
-done
+if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
+  clients_lib_dir=$(dirname $0)/../clients/build/libs
+  streams_lib_dir=$(dirname $0)/../streams/build/libs
+  rocksdb_lib_dir=$(dirname $0)/../streams/build/dependant-libs-${SCALA_VERSION}
+else
+  clients_lib_dir=/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs
+  streams_lib_dir=$clients_lib_dir
+  rocksdb_lib_dir=$streams_lib_dir
+fi
+
 
-for file in "$base_dir"/streams/build/libs/kafka-streams*.jar;
+for file in "$clients_lib_dir"/kafka-clients*.jar;
 do
   if should_include_file "$file"; then
     CLASSPATH="$CLASSPATH":"$file"
   fi
 done
 
-for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;
+for file in "$streams_lib_dir"/kafka-streams*.jar;
 do
   if should_include_file "$file"; then
     CLASSPATH="$CLASSPATH":"$file"
   fi
 done
 
-for file in "$base_dir"/streams/build/dependant-libs-${SCALA_VERSION}/rocksdb*.jar;
+if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
+  for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;
+  do
+    if should_include_file "$file"; then
+      CLASSPATH="$CLASSPATH":"$file"
+    fi
+  done
+else
+  VERSION_NO_DOTS=`echo $UPGRADE_KAFKA_STREAMS_TEST_VERSION | sed 's/\.//g'`
+  SHORT_VERSION_NO_DOTS=${VERSION_NO_DOTS:0:((${#VERSION_NO_DOTS} - 1))} # remove last char, ie, bug-fix number
+  for file in "$base_dir"/streams/upgrade-system-tests-$SHORT_VERSION_NO_DOTS/build/libs/kafka-streams-upgrade-system-tests*.jar;
+  do
+    if should_include_file "$file"; then
+      CLASSPATH="$CLASSPATH":"$file"
+    fi
+  done
+fi
+
+for file in "$rocksdb_lib_dir"/rocksdb*.jar;
 do
   CLASSPATH="$CLASSPATH":"$file"
 done
diff --git a/build.gradle b/build.gradle
index 20a184c..5e97f90 100644
--- a/build.gradle
+++ b/build.gradle
@@ -770,6 +770,30 @@ project(':streams:examples') {
   }
 }
 
+project(':streams:upgrade-system-tests-0100') {
+  archivesBaseName = "kafka-streams-upgrade-system-tests-0100"
+
+  dependencies {
+    testCompile libs.kafkaStreams_0100
+  }
+
+  systemTestLibs {
+    dependsOn testJar
+  }
+}
+
+project(':streams:upgrade-system-tests-0101') {
+  archivesBaseName = "kafka-streams-upgrade-system-tests-0101"
+
+  dependencies {
+    testCompile libs.kafkaStreams_0101
+  }
+
+  systemTestLibs {
+    dependsOn testJar
+  }
+}
+
 project(':log4j-appender') {
   archivesBaseName = "kafka-log4j-appender"
 
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
index 6094b54..b80dfcc 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
@@ -17,7 +17,9 @@
  */
 package org.apache.kafka.common.security.authenticator;
 
-import java.util.Map;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.security.auth.AuthCallbackHandler;
 
 import javax.security.auth.Subject;
 import javax.security.auth.callback.Callback;
@@ -26,10 +28,7 @@ import javax.security.auth.callback.PasswordCallback;
 import javax.security.auth.callback.UnsupportedCallbackException;
 import javax.security.sasl.AuthorizeCallback;
 import javax.security.sasl.RealmCallback;
-
-import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.network.Mode;
-import org.apache.kafka.common.security.auth.AuthCallbackHandler;
+import java.util.Map;
 
 /**
  * Callback handler for Sasl clients. The callbacks required for the SASL mechanism
diff --git a/docs/streams.html b/docs/streams.html
index fe0e84e..d691e63 100644
--- a/docs/streams.html
+++ b/docs/streams.html
@@ -808,20 +808,49 @@ $ java -cp path-to-app-fatjar.jar com.example.MyStreamsApp
         </p>
 
         <p>
+        Upgrading from 0.10.0.x to 0.10.2.x directly is also possible.
+        See <a href="#streams_api_changes_0102">Streams API changes in 0.10.2</a> and <a href="#streams_api_changes_0101">Streams API changes in 0.10.1</a>
+        for a complete list of API changes.
+        Upgrading to 0.10.2.2 requires two rolling bounces with config <code>upgrade.from="0.10.0"</code> set for first upgrade phase
+        (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade">KIP-268</a>).
+        As an alternative, and offline upgrade is also possible.
+        </p>
+        <ul>
+            <li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from=</code> is set to <code>"0.10.0"</code> for new version 0.10.2.2 </li>
+            <li> bounce each instance of your application once </li>
+            <li> prepare your newly deployed 0.10.2.2 application instances for a second round of rolling bounces; make sure to remove the value for config <code>upgrade.mode</code> </li>
+            <li> bounce each instance of your application once more to complete the upgrade </li>
+        </ul>
+        <p> Upgrading from 0.10.0.x to 0.10.2.0 or 0.10.2.1 requires an offline upgrade (rolling bounce upgrade is not supported) </p>
+        <ul>
+            <li> stop all old (0.10.0.x) application instances </li>
+            <li> update your code and swap old code and jar file with new code and new jar file </li>
+            <li> restart all new (0.10.2.0 or 0.10.2.1) application instances </li>
+        </ul>
+
+        <p>
         If you want to upgrade from 0.10.0.x to 0.10.1, see the <a href="/{{version}}/documentation/#upgrade_1010_streams">Upgrade Section for 0.10.1</a>.
         It highlights incompatible changes you need to consider to upgrade your code and application.
         See <a href="#streams_api_changes_0101">below</a> a complete list of 0.10.1 API changes that allow you to advance your application and/or simplify your code base, including the usage of new features.
         </p>
 
-         <h3><a id="streams_api_changes_01021" href="#streams_api_changes_0102">Notable changes in 0.10.2.1</a></h3>
-         <p>
+        <h3><a id="streams_api_changes_01022" href="#streams_api_changes_0102">Notable changes in 0.10.2.2</a></h3>
+        <p>
+            Parameter updates in <code>StreamsConfig</code>:
+        </p>
+        <ul>
+            <li> New configuration parameter <code>upgrade.from</code> added that allows rolling bounce upgrade from version 0.10.0.x </li>
+        </ul>
+
+        <h3><a id="streams_api_changes_01021" href="#streams_api_changes_0102">Notable changes in 0.10.2.1</a></h3>
+        <p>
             Parameter updates in <code>StreamsConfig</code>:
         </p>
-          <ul>
+        <ul>
             <li> of particular importance to improve the resiliency of a Kafka Streams application are two changes to default parameters of producer <code>retries</code> and consumer <code>max.poll.interval.ms</code> </li>
-          </ul>
-        <h3><a id="streams_api_changes_0102" href="#streams_api_changes_0102">Streams API changes in 0.10.2.0</a></h3>
+        </ul>
 
+        <h3><a id="streams_api_changes_0102" href="#streams_api_changes_0102">Streams API changes in 0.10.2.0</a></h3>
         <p>
             New methods in <code>KafkaStreams</code>:
         </p>
diff --git a/docs/upgrade.html b/docs/upgrade.html
index d7581fa..7747762 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -61,6 +61,11 @@ Kafka cluster before upgrading your clients. Version 0.10.2 brokers support 0.8.
     <li> See <a href="/{{version}}/documentation/streams#streams_api_changes_0102">Streams API changes in 0.10.2</a> for more details. </li>
 </ul>
 
+<h5><a id="upgrade_10202_notable" href="#upgrade_10202_notable">Notable changes in 0.10.2.2</a></h5>
+<ul>
+    <li> New configuration parameter <code>upgrade.from</code> added that allows rolling bounce upgrade from version 0.10.0.x </li>
+</ul>
+
 <h5><a id="upgrade_10201_notable" href="#upgrade_10201_notable">Notable changes in 0.10.2.1</a></h5>
 <ul>
   <li> The default values for two configurations of the StreamsConfig class were changed to improve the resiliency of Kafka Streams applications. The internal Kafka Streams producer <code>retries</code> default value was changed from 0 to 10. The internal Kafka Streams consumer <code>max.poll.interval.ms</code>  default value was changed from 300000 to <code>Integer.MAX_VALUE</code>.
@@ -141,6 +146,23 @@ only support 0.10.1.x or later brokers while 0.10.1.x brokers also support older
     <li> Upgrading your Streams application from 0.10.0 to 0.10.1 does require a <a href="#upgrade_10_1">broker upgrade</a> because a Kafka Streams 0.10.1 application can only connect to 0.10.1 brokers. </li>
     <li> There are couple of API changes, that are not backward compatible (cf. <a href="/{{version}}/documentation/streams#streams_api_changes_0101">Streams API changes in 0.10.1</a> for more details).
          Thus, you need to update and recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application. </li>
+    <li> Upgrading from 0.10.0.x to 0.10.1.2 requires two rolling bounces with config <code>upgrade.from="0.10.0"</code> set for first upgrade phase
+         (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade">KIP-268</a>).
+         As an alternative, and offline upgrade is also possible.
+        <ul>
+            <li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from</code> is set to <code>"0.10.0"</code> for new version 0.10.1.2 </li>
+            <li> bounce each instance of your application once </li>
+            <li> prepare your newly deployed 0.10.1.2 application instances for a second round of rolling bounces; make sure to remove the value for config <code>upgrade.mode</code> </li>
+            <li> bounce each instance of your application once more to complete the upgrade </li>
+        </ul>
+    </li>
+    <li> Upgrading from 0.10.0.x to 0.10.1.0 or 0.10.1.1 requires an offline upgrade (rolling bounce upgrade is not supported)
+        <ul>
+            <li> stop all old (0.10.0.x) application instances </li>
+            <li> update your code and swap old code and jar file with new code and new jar file </li>
+            <li> restart all new (0.10.1.0 or 0.10.1.1) application instances </li>
+        </ul>
+    </li>
 </ul>
 
 <h5><a id="upgrade_1010_notable" href="#upgrade_1010_notable">Notable changes in 0.10.1.0</a></h5>
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 25faa90..4084b12 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -31,6 +31,8 @@ versions += [
   jackson: "2.8.5",
   jetty: "9.2.22.v20170606",
   jersey: "2.24",
+  kafka_0100: "0.10.0.1",
+  kafka_0101: "0.10.1.1",
   log4j: "1.2.17",
   jopt: "5.0.3",
   junit: "4.12",
@@ -92,6 +94,8 @@ libs += [
   junit: "junit:junit:$versions.junit",
   log4j: "log4j:log4j:$versions.log4j",
   joptSimple: "net.sf.jopt-simple:jopt-simple:$versions.jopt",
+  kafkaStreams_0100: "org.apache.kafka:kafka-streams:$versions.kafka_0100",
+  kafkaStreams_0101: "org.apache.kafka:kafka-streams:$versions.kafka_0101",
   lz4: "net.jpountz.lz4:lz4:$versions.lz4",
   metrics: "com.yammer.metrics:metrics-core:$versions.metrics",
   powermock: "org.powermock:powermock-module-junit4:$versions.powermock",
diff --git a/settings.gradle b/settings.gradle
index 29d3895..576b40b 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -13,5 +13,6 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:examples', 'log4j-appender',
+include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:examples', 'streams:upgrade-system-tests-0100',
+        'streams:upgrade-system-tests-0101', 'log4j-appender',
         'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file'
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 0724571..3baa078 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -95,6 +95,16 @@ public class StreamsConfig extends AbstractConfig {
      */
     public static final String PRODUCER_PREFIX = "producer.";
 
+    /**
+     * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.0.x}.
+     */
+    public static final String UPGRADE_FROM_0100 = "0.10.0";
+
+    /** {@code upgrade.from} */
+    public static final String UPGRADE_FROM_CONFIG = "upgrade.from";
+    public static final String UPGRADE_FROM_DOC = "Allows upgrading from version 0.10.0 to version 0.10.1 (or newer) in a backward compatible way. " +
+        "Default is null. Accepted values are \"" + UPGRADE_FROM_0100 + "\" (for upgrading from 0.10.0.x).";
+
     /** {@code state.dir} */
     public static final String STATE_DIR_CONFIG = "state.dir";
     private static final String STATE_DIR_DOC = "Directory location for state store.";
@@ -383,7 +393,13 @@ public class StreamsConfig extends AbstractConfig {
                     40 * 1000,
                     atLeast(0),
                     ConfigDef.Importance.MEDIUM,
-                    REQUEST_TIMEOUT_MS_DOC);
+                    REQUEST_TIMEOUT_MS_DOC)
+            .define(UPGRADE_FROM_CONFIG,
+                    ConfigDef.Type.STRING,
+                    null,
+                    in(null, UPGRADE_FROM_0100),
+                    ConfigDef.Importance.LOW,
+                    UPGRADE_FROM_DOC);
     }
 
     // this is the list of configs for underlying clients
@@ -501,6 +517,7 @@ public class StreamsConfig extends AbstractConfig {
         consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer");
 
         // add configs required for stream partition assignor
+        consumerProps.put(UPGRADE_FROM_CONFIG, getString(UPGRADE_FROM_CONFIG));
         consumerProps.put(InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
         consumerProps.put(REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
         consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index a50a819..889d2ff 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
@@ -155,6 +154,8 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
     private String userEndPoint;
     private int numStandbyReplicas;
 
+    private int userMetadataVersion = SubscriptionInfo.CURRENT_VERSION;
+
     private Cluster metadataWithInternalTopics;
     private Map<HostInfo, Set<TopicPartition>> partitionsByHostState;
 
@@ -182,6 +183,12 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
     public void configure(Map<String, ?> configs) {
         numStandbyReplicas = (Integer) configs.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
 
+        final String upgradeMode = (String) configs.get(StreamsConfig.UPGRADE_FROM_CONFIG);
+        if (StreamsConfig.UPGRADE_FROM_0100.equals(upgradeMode)) {
+            log.info("Downgrading metadata version from 2 to 1 for upgrade from 0.10.0.x.");
+            userMetadataVersion = 1;
+        }
+
         Object o = configs.get(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE);
         if (o == null) {
             KafkaException ex = new KafkaException("StreamThread is not specified");
@@ -241,7 +248,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         Set<TaskId> prevTasks = streamThread.prevTasks();
         Set<TaskId> standbyTasks = streamThread.cachedTasks();
         standbyTasks.removeAll(prevTasks);
-        SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, prevTasks, standbyTasks, this.userEndPoint);
+        SubscriptionInfo data = new SubscriptionInfo(userMetadataVersion, streamThread.processId, prevTasks, standbyTasks, this.userEndPoint);
 
         if (streamThread.builder.sourceTopicPattern() != null) {
             SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
@@ -279,11 +286,16 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         // construct the client metadata from the decoded subscription info
         Map<UUID, ClientMetadata> clientsMetadata = new HashMap<>();
 
+        int minUserMetadataVersion = SubscriptionInfo.CURRENT_VERSION;
         for (Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
             String consumerId = entry.getKey();
             Subscription subscription = entry.getValue();
 
             SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
+            final int usedVersion = info.version;
+            if (usedVersion < minUserMetadataVersion) {
+                minUserMetadataVersion = usedVersion;
+            }
 
             // create the new client metadata if necessary
             ClientMetadata clientMetadata = clientsMetadata.get(info.processId);
@@ -539,7 +551,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                 }
 
                 // finally, encode the assignment before sending back to coordinator
-                assignment.put(consumer, new Assignment(activePartitions, new AssignmentInfo(active, standby, partitionsByHostState).encode()));
+                assignment.put(consumer, new Assignment(activePartitions, new AssignmentInfo(minUserMetadataVersion, active, standby, partitionsByHostState).encode()));
                 i++;
             }
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index ddbd67d..7a6bf14 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.streams.processor.internals.assignment;
 
 import org.apache.kafka.common.record.ByteBufferInputStream;
@@ -56,7 +55,7 @@ public class AssignmentInfo {
         this(CURRENT_VERSION, activeTasks, standbyTasks, hostState);
     }
 
-    protected AssignmentInfo(int version, List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks,
+    public AssignmentInfo(int version, List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks,
                              Map<HostInfo, Set<TopicPartition>> hostState) {
         this.version = version;
         this.activeTasks = activeTasks;
@@ -155,9 +154,7 @@ public class AssignmentInfo {
                 }
             }
 
-            return new AssignmentInfo(activeTasks, standbyTasks, hostStateToTopicPartitions);
-
-
+            return new AssignmentInfo(version, activeTasks, standbyTasks, hostStateToTopicPartitions);
         } catch (IOException ex) {
             throw new TaskAssignmentException("Failed to decode AssignmentInfo", ex);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index c3481c0..92c50a2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.streams.processor.internals.assignment;
 
 import org.apache.kafka.streams.errors.TaskAssignmentException;
@@ -32,7 +31,7 @@ public class SubscriptionInfo {
 
     private static final Logger log = LoggerFactory.getLogger(SubscriptionInfo.class);
 
-    private static final int CURRENT_VERSION = 2;
+    public static final int CURRENT_VERSION = 2;
 
     public final int version;
     public final UUID processId;
@@ -44,7 +43,7 @@ public class SubscriptionInfo {
         this(CURRENT_VERSION, processId, prevTasks, standbyTasks, userEndPoint);
     }
 
-    private SubscriptionInfo(int version, UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, String userEndPoint) {
+    public SubscriptionInfo(int version, UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, String userEndPoint) {
         this.version = version;
         this.processId = processId;
         this.prevTasks = prevTasks;
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 50ab117..832883a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -96,7 +96,7 @@ public class KafkaStreamsTest {
     }
 
     @Test
-    public void testStateCloseAfterCreate() throws Exception {
+    public void testStateCloseAfterCreate() {
         final KStreamBuilder builder = new KStreamBuilder();
         final KafkaStreams streams = new KafkaStreams(builder, props);
 
@@ -160,7 +160,7 @@ public class KafkaStreamsTest {
         // make sure we have the global state thread running too
         builder.globalTable("anyTopic", "anyStore");
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
-        final KafkaStreams streams = new KafkaStreams(builder, props);
+        new KafkaStreams(builder, props);
 
         testStateThreadCloseHelper(numThreads);
     }
@@ -200,9 +200,8 @@ public class KafkaStreamsTest {
 
     }
 
-
     @Test
-    public void testInitializesAndDestroysMetricsReporters() throws Exception {
+    public void testInitializesAndDestroysMetricsReporters() {
         final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
         final KStreamBuilder builder = new KStreamBuilder();
         final KafkaStreams streams = new KafkaStreams(builder, props);
@@ -217,7 +216,7 @@ public class KafkaStreamsTest {
     }
 
     @Test
-    public void testCloseIsIdempotent() throws Exception {
+    public void testCloseIsIdempotent() {
         streams.close();
         final int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
 
@@ -227,7 +226,7 @@ public class KafkaStreamsTest {
     }
 
     @Test(expected = IllegalStateException.class)
-    public void testCannotStartOnceClosed() throws Exception {
+    public void testCannotStartOnceClosed() {
         streams.start();
         streams.close();
         try {
@@ -241,7 +240,7 @@ public class KafkaStreamsTest {
     }
 
     @Test(expected = IllegalStateException.class)
-    public void testCannotStartTwice() throws Exception {
+    public void testCannotStartTwice() {
         streams.start();
 
         try {
@@ -267,10 +266,10 @@ public class KafkaStreamsTest {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "illegalConfig");
         final KStreamBuilder builder = new KStreamBuilder();
-        final KafkaStreams streams = new KafkaStreams(builder, props);
-
+        new KafkaStreams(builder, props);
     }
 
     @Test
@@ -278,6 +277,7 @@ public class KafkaStreamsTest {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.INFO.toString());
         final KStreamBuilder builder1 = new KStreamBuilder();
         final KafkaStreams streams1 = new KafkaStreams(builder1, props);
@@ -285,27 +285,26 @@ public class KafkaStreamsTest {
 
         props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.toString());
         final KStreamBuilder builder2 = new KStreamBuilder();
-        final KafkaStreams streams2 = new KafkaStreams(builder2, props);
-
+        new KafkaStreams(builder2, props);
     }
 
     @Test(expected = IllegalStateException.class)
-    public void shouldNotGetAllTasksWhenNotRunning() throws Exception {
+    public void shouldNotGetAllTasksWhenNotRunning() {
         streams.allMetadata();
     }
 
     @Test(expected = IllegalStateException.class)
-    public void shouldNotGetAllTasksWithStoreWhenNotRunning() throws Exception {
+    public void shouldNotGetAllTasksWithStoreWhenNotRunning() {
         streams.allMetadataForStore("store");
     }
 
     @Test(expected = IllegalStateException.class)
-    public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() throws Exception {
+    public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() {
         streams.metadataForKey("store", "key", Serdes.String().serializer());
     }
 
     @Test(expected = IllegalStateException.class)
-    public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() throws Exception {
+    public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() {
         streams.metadataForKey("store", "key", new StreamPartitioner<String, Object>() {
             @Override
             public Integer partition(final String key, final Object value, final int numPartitions) {
@@ -321,6 +320,7 @@ public class KafkaStreamsTest {
             final Properties props = new Properties();
             props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
             props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+            props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
             props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
             final KStreamBuilder builder = new KStreamBuilder();
@@ -366,16 +366,18 @@ public class KafkaStreamsTest {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
 
         final KStreamBuilder builder = new KStreamBuilder();
         return new KafkaStreams(builder, props);
     }
 
     @Test
-    public void testCleanup() throws Exception {
+    public void testCleanup() {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testLocalCleanup");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
 
         final KStreamBuilder builder = new KStreamBuilder();
         final KafkaStreams streams = new KafkaStreams(builder, props);
@@ -391,6 +393,7 @@ public class KafkaStreamsTest {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotCleanupWhileRunning");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
 
         final KStreamBuilder builder = new KStreamBuilder();
         final KafkaStreams streams = new KafkaStreams(builder, props);
@@ -448,6 +451,5 @@ public class KafkaStreamsTest {
                 streams.close();
             }
         }
-
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 15cc1af..ab8701f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -60,7 +60,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void testGetProducerConfigs() throws Exception {
+    public void testGetProducerConfigs() {
         Map<String, Object> returnedProps = streamsConfig.getProducerConfigs("client");
         assertEquals(returnedProps.get(ProducerConfig.CLIENT_ID_CONFIG), "client-producer");
         assertEquals(returnedProps.get(ProducerConfig.LINGER_MS_CONFIG), "100");
@@ -68,7 +68,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void testGetConsumerConfigs() throws Exception {
+    public void testGetConsumerConfigs() {
         Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(null, "example-application", "client");
         assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-consumer");
         assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), "example-application");
@@ -77,7 +77,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void testGetRestoreConsumerConfigs() throws Exception {
+    public void testGetRestoreConsumerConfigs() {
         Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs("client");
         assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-restore-consumer");
         assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG));
@@ -86,7 +86,7 @@ public class StreamsConfigTest {
 
     @Test
     public void defaultSerdeShouldBeConfigured() {
-        Map<String, Object> serializerConfigs = new HashMap<String, Object>();
+        Map<String, Object> serializerConfigs = new HashMap<>();
         serializerConfigs.put("key.serializer.encoding", "UTF8");
         serializerConfigs.put("value.serializer.encoding", "UTF-16");
         Serializer<String> serializer = Serdes.String().serializer();
@@ -117,7 +117,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void shouldSupportPrefixedConsumerConfigs() throws Exception {
+    public void shouldSupportPrefixedConsumerConfigs() {
         props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
         props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -127,7 +127,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void shouldSupportPrefixedRestoreConsumerConfigs() throws Exception {
+    public void shouldSupportPrefixedRestoreConsumerConfigs() {
         props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
         props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -137,7 +137,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() throws Exception {
+    public void shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() {
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         props.put(consumerPrefix("interceptor.statsd.host"), "host");
         final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
@@ -145,7 +145,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig() throws Exception {
+    public void shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig() {
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         props.put(consumerPrefix("interceptor.statsd.host"), "host");
         final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
@@ -153,7 +153,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig() throws Exception {
+    public void shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig() {
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         props.put(producerPrefix("interceptor.statsd.host"), "host");
         final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
@@ -162,7 +162,7 @@ public class StreamsConfigTest {
 
 
     @Test
-    public void shouldSupportPrefixedProducerConfigs() throws Exception {
+    public void shouldSupportPrefixedProducerConfigs() {
         props.put(producerPrefix(ProducerConfig.BUFFER_MEMORY_CONFIG), 10);
         props.put(producerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -172,7 +172,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void shouldBeSupportNonPrefixedConsumerConfigs() throws Exception {
+    public void shouldBeSupportNonPrefixedConsumerConfigs() {
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -182,7 +182,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void shouldBeSupportNonPrefixedRestoreConsumerConfigs() throws Exception {
+    public void shouldBeSupportNonPrefixedRestoreConsumerConfigs() {
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -192,7 +192,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void shouldSupportNonPrefixedProducerConfigs() throws Exception {
+    public void shouldSupportNonPrefixedProducerConfigs() {
         props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 10);
         props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -201,24 +201,22 @@ public class StreamsConfigTest {
         assertEquals(1, configs.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG));
     }
 
-
-
     @Test(expected = StreamsException.class)
-    public void shouldThrowStreamsExceptionIfKeySerdeConfigFails() throws Exception {
+    public void shouldThrowStreamsExceptionIfKeySerdeConfigFails() {
         props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         streamsConfig.keySerde();
     }
 
     @Test(expected = StreamsException.class)
-    public void shouldThrowStreamsExceptionIfValueSerdeConfigFails() throws Exception {
+    public void shouldThrowStreamsExceptionIfValueSerdeConfigFails() {
         props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         streamsConfig.valueSerde();
     }
 
     @Test
-    public void shouldOverrideStreamsDefaultConsumerConfigs() throws Exception {
+    public void shouldOverrideStreamsDefaultConsumerConfigs() {
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest");
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -228,7 +226,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void shouldOverrideStreamsDefaultProducerConfigs() throws Exception {
+    public void shouldOverrideStreamsDefaultProducerConfigs() {
         props.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), "10000");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("client");
@@ -236,7 +234,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() throws Exception {
+    public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() {
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest");
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -246,14 +244,14 @@ public class StreamsConfigTest {
     }
 
     @Test(expected = ConfigException.class)
-    public void shouldThrowExceptionIfConsumerAutoCommitIsOverridden() throws Exception {
+    public void shouldThrowExceptionIfConsumerAutoCommitIsOverridden() {
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         streamsConfig.getConsumerConfigs(null, "a", "b");
     }
 
     @Test(expected = ConfigException.class)
-    public void shouldThrowExceptionIfRestoreConsumerAutoCommitIsOverridden() throws Exception {
+    public void shouldThrowExceptionIfRestoreConsumerAutoCommitIsOverridden() {
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         streamsConfig.getRestoreConsumerConfigs("client");
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
index 1d2a3e2..1256824 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.test.TestUtils;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -112,6 +113,7 @@ public class FanoutIntegrationTest {
         final Properties streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "fanout-integration-test");
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 6a8c7ff..3e0d80a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -10,6 +10,7 @@
  */
 package org.apache.kafka.streams.integration;
 
+import kafka.utils.MockTime;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
@@ -44,8 +45,6 @@ import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 
-import kafka.utils.MockTime;
-
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
 
@@ -127,7 +126,7 @@ public class KStreamAggregationDedupIntegrationTest {
         List<KeyValue<String, String>> results = receiveMessages(
             new StringDeserializer(),
             new StringDeserializer(),
-                5);
+            5);
 
         Collections.sort(results, new Comparator<KeyValue<String, String>>() {
             @Override
@@ -177,7 +176,7 @@ public class KStreamAggregationDedupIntegrationTest {
         List<KeyValue<String, String>> windowedOutput = receiveMessages(
             new StringDeserializer(),
             new StringDeserializer(),
-                10);
+            10);
 
         Comparator<KeyValue<String, String>>
             comparator =
@@ -229,7 +228,7 @@ public class KStreamAggregationDedupIntegrationTest {
         final List<KeyValue<String, Long>> results = receiveMessages(
             new StringDeserializer(),
             new LongDeserializer(),
-                5);
+            5);
         Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
             @Override
             public int compare(final KeyValue<String, Long> o1, final KeyValue<String, Long> o2) {
@@ -303,6 +302,4 @@ public class KStreamAggregationDedupIntegrationTest {
 
     }
 
-
-
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index bd5911d..13124f1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -155,7 +155,7 @@ public class KStreamAggregationIntegrationTest {
         final List<KeyValue<String, String>> results = receiveMessages(
             new StringDeserializer(),
             new StringDeserializer(),
-                10);
+            10);
 
         Collections.sort(results, new Comparator<KeyValue<String, String>>() {
             @Override
@@ -209,7 +209,7 @@ public class KStreamAggregationIntegrationTest {
         final List<KeyValue<String, String>> windowedOutput = receiveMessages(
             new StringDeserializer(),
             new StringDeserializer(),
-                15);
+            15);
 
         final Comparator<KeyValue<String, String>>
             comparator =
@@ -263,7 +263,7 @@ public class KStreamAggregationIntegrationTest {
         final List<KeyValue<String, Integer>> results = receiveMessages(
             new StringDeserializer(),
             new IntegerDeserializer(),
-                10);
+            10);
 
         Collections.sort(results, new Comparator<KeyValue<String, Integer>>() {
             @Override
@@ -313,7 +313,7 @@ public class KStreamAggregationIntegrationTest {
         final List<KeyValue<String, Integer>> windowedMessages = receiveMessages(
             new StringDeserializer(),
             new IntegerDeserializer(),
-                15);
+            15);
 
         final Comparator<KeyValue<String, Integer>>
             comparator =
@@ -364,7 +364,7 @@ public class KStreamAggregationIntegrationTest {
         final List<KeyValue<String, Long>> results = receiveMessages(
             new StringDeserializer(),
             new LongDeserializer(),
-                10);
+            10);
         Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
             @Override
             public int compare(final KeyValue<String, Long> o1, final KeyValue<String, Long> o2) {
@@ -406,7 +406,7 @@ public class KStreamAggregationIntegrationTest {
         final List<KeyValue<String, Long>> results = receiveMessages(
             new StringDeserializer(),
             new LongDeserializer(),
-                10);
+            10);
         Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
             @Override
             public int compare(final KeyValue<String, Long> o1, final KeyValue<String, Long> o2) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 64e8459..d09d505 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -129,7 +129,7 @@ public class QueryableStateIntegrationTest {
     }
 
     @Before
-    public void before() throws IOException, InterruptedException {
+    public void before() throws Exception {
         testNo++;
         createTopics();
         streamsConfiguration = new Properties();
@@ -609,15 +609,13 @@ public class QueryableStateIntegrationTest {
      * @param failIfKeyNotFound     if true, tests fails if an expected key is not found in store. If false,
      *                              the method merely inserts the new found key into the list of
      *                              expected keys.
-     * @throws InterruptedException
      */
     private void verifyGreaterOrEqual(final String[] keys,
                                       final Map<String, Long> expectedWindowedCount,
                                       final Map<String, Long> expectedCount,
                                       final ReadOnlyWindowStore<String, Long> windowStore,
                                       final ReadOnlyKeyValueStore<String, Long> keyValueStore,
-                                      final boolean failIfKeyNotFound)
-        throws InterruptedException {
+                                      final boolean failIfKeyNotFound) {
         final Map<String, Long> windowState = new HashMap<>();
         final Map<String, Long> countState = new HashMap<>();
 
@@ -744,5 +742,4 @@ public class QueryableStateIntegrationTest {
         }
     }
 
-
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 6503038..e06ed73 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -44,6 +44,7 @@ import org.apache.kafka.test.MockInternalTopicManager;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
+import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -111,6 +112,7 @@ public class StreamPartitionAssignorTest {
             {
                 setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "stream-partition-assignor-test");
                 setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, userEndPoint);
+                setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
                 setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
                 setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName());
             }
@@ -119,7 +121,7 @@ public class StreamPartitionAssignorTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void testSubscription() throws Exception {
+    public void testSubscription() {
         builder.addSource("source1", "topic1");
         builder.addSource("source2", "topic2");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
@@ -159,7 +161,7 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
-    public void testAssignBasic() throws Exception {
+    public void testAssignBasic() {
         builder.addSource("source1", "topic1");
         builder.addSource("source2", "topic2");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
@@ -227,7 +229,7 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
-    public void testAssignWithPartialTopology() throws Exception {
+    public void testAssignWithPartialTopology() {
         Properties props = configProps();
         props.put(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, SingleGroupPartitionGrouperStub.class);
         StreamsConfig config = new StreamsConfig(props);
@@ -267,7 +269,7 @@ public class StreamPartitionAssignorTest {
 
 
     @Test
-    public void testAssignEmptyMetadata() throws Exception {
+    public void testAssignEmptyMetadata() {
         builder.addSource("source1", "topic1");
         builder.addSource("source2", "topic2");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
@@ -324,7 +326,7 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
-    public void testAssignWithNewTasks() throws Exception {
+    public void testAssignWithNewTasks() {
         builder.addSource("source1", "topic1");
         builder.addSource("source2", "topic2");
         builder.addSource("source3", "topic3");
@@ -381,7 +383,7 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
-    public void testAssignWithStates() throws Exception {
+    public void testAssignWithStates() {
         String applicationId = "test";
         builder.setApplicationId(applicationId);
         builder.addSource("source1", "topic1");
@@ -470,7 +472,7 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
-    public void testAssignWithStandbyReplicas() throws Exception {
+    public void testAssignWithStandbyReplicas() {
         Properties props = configProps();
         props.setProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
         StreamsConfig config = new StreamsConfig(props);
@@ -543,7 +545,7 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
-    public void testOnAssignment() throws Exception {
+    public void testOnAssignment() {
         TopicPartition t2p3 = new TopicPartition("topic2", 3);
 
         TopologyBuilder builder = new TopologyBuilder();
@@ -576,7 +578,7 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
-    public void testAssignWithInternalTopics() throws Exception {
+    public void testAssignWithInternalTopics() {
         String applicationId = "test";
         builder.setApplicationId(applicationId);
         builder.addInternalTopic("topicX");
@@ -612,7 +614,7 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
-    public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() throws Exception {
+    public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() {
         String applicationId = "test";
         builder.setApplicationId(applicationId);
         builder.addInternalTopic("topicX");
@@ -650,7 +652,7 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
-    public void shouldAddUserDefinedEndPointToSubscription() throws Exception {
+    public void shouldAddUserDefinedEndPointToSubscription() {
         final Properties properties = configProps();
         properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:8080");
         final StreamsConfig config = new StreamsConfig(properties);
@@ -663,8 +665,8 @@ public class StreamPartitionAssignorTest {
         final UUID uuid1 = UUID.randomUUID();
         final String client1 = "client1";
 
-        final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-                                                           0);
+        final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client1,
+            uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0);
 
         partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1));
         final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("input"));
@@ -673,7 +675,80 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
-    public void shouldMapUserEndPointToTopicPartitions() throws Exception {
+    public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions() {
+        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        final Set<TaskId> emptyTasks = Collections.emptySet();
+        subscriptions.put(
+            "consumer1",
+            new PartitionAssignor.Subscription(
+                Collections.singletonList("topic1"),
+                new SubscriptionInfo(1, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()
+            )
+        );
+        subscriptions.put(
+            "consumer2",
+            new PartitionAssignor.Subscription(
+                Collections.singletonList("topic1"),
+                new SubscriptionInfo(2, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()
+            )
+        );
+
+        final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+        StreamsConfig config = new StreamsConfig(configProps());
+
+        final TopologyBuilder builder = new TopologyBuilder();
+        final StreamThread streamThread = new StreamThread(
+            builder,
+            config,
+            mockClientSupplier,
+            "appId",
+            "clientId",
+            UUID.randomUUID(),
+            new Metrics(),
+            Time.SYSTEM,
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            0);
+
+        partitionAssignor.configure(config.getConsumerConfigs(streamThread, "test", "clientId"));
+        final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, subscriptions);
+
+        assertEquals(2, assignment.size());
+        assertEquals(1, AssignmentInfo.decode(assignment.get("consumer1").userData()).version);
+        assertEquals(1, AssignmentInfo.decode(assignment.get("consumer2").userData()).version);
+    }
+
+    @Test
+    public void shouldDownGradeSubscription() {
+        final Properties properties = configProps();
+        properties.put(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_0100);
+        StreamsConfig config = new StreamsConfig(properties);
+
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.addSource("source1", "topic1");
+
+        String clientId = "client-id";
+        final StreamThread streamThread = new StreamThread(
+            builder,
+            config,
+            mockClientSupplier,
+            "appId",
+            "clientId",
+            UUID.randomUUID(),
+            new Metrics(),
+            Time.SYSTEM,
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+            0);
+
+        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+        partitionAssignor.configure(config.getConsumerConfigs(streamThread, "test", clientId));
+
+        PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1"));
+
+        assertEquals(1, SubscriptionInfo.decode(subscription.userData()).version);
+    }
+
+    @Test
+    public void shouldMapUserEndPointToTopicPartitions() {
         final Properties properties = configProps();
         final String myEndPoint = "localhost:8080";
         properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint);
@@ -711,7 +786,7 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
-    public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() throws Exception {
+    public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() {
         final Properties properties = configProps();
         final String myEndPoint = "localhost";
         properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint);
@@ -736,7 +811,7 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
-    public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() throws Exception {
+    public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() {
         final Properties properties = configProps();
         final String myEndPoint = "localhost:j87yhk";
         properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint);
@@ -760,7 +835,7 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
-    public void shouldExposeHostStateToTopicPartitionsOnAssignment() throws Exception {
+    public void shouldExposeHostStateToTopicPartitionsOnAssignment() {
         List<TopicPartition> topic = Collections.singletonList(new TopicPartition("topic", 0));
         final Map<HostInfo, Set<TopicPartition>> hostState =
                 Collections.singletonMap(new HostInfo("localhost", 80),
@@ -773,7 +848,7 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
-    public void shouldSetClusterMetadataOnAssignment() throws Exception {
+    public void shouldSetClusterMetadataOnAssignment() {
         final List<TopicPartition> topic = Collections.singletonList(new TopicPartition("topic", 0));
         final Map<HostInfo, Set<TopicPartition>> hostState =
                 Collections.singletonMap(new HostInfo("localhost", 80),
@@ -793,7 +868,7 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
-    public void shouldReturnEmptyClusterMetadataIfItHasntBeenBuilt() throws Exception {
+    public void shouldReturnEmptyClusterMetadataIfItHasntBeenBuilt() {
         final Cluster cluster = partitionAssignor.clusterMetadata();
         assertNotNull(cluster);
     }
@@ -891,11 +966,11 @@ public class StreamPartitionAssignorTest {
             new TopicPartition(applicationId + "-count-repartition", 1),
             new TopicPartition(applicationId + "-count-repartition", 2)
         );
-        assertThat(new HashSet(assignment.get(client).partitions()), equalTo(new HashSet(expectedAssignment)));
+        assertThat(new HashSet<>(assignment.get(client).partitions()), equalTo(new HashSet<>(expectedAssignment)));
     }
 
     @Test
-    public void shouldUpdatePartitionHostInfoMapOnAssignment() throws Exception {
+    public void shouldUpdatePartitionHostInfoMapOnAssignment() {
         final TopicPartition partitionOne = new TopicPartition("topic", 1);
         final TopicPartition partitionTwo = new TopicPartition("topic", 2);
         final Map<HostInfo, Set<TopicPartition>> firstHostState = Collections.singletonMap(
@@ -912,7 +987,7 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
-    public void shouldUpdateClusterMetadataOnAssignment() throws Exception {
+    public void shouldUpdateClusterMetadataOnAssignment() {
         final TopicPartition topicOne = new TopicPartition("topic", 1);
         final TopicPartition topicTwo = new TopicPartition("topic2", 2);
         final Map<HostInfo, Set<TopicPartition>> firstHostState = Collections.singletonMap(
@@ -928,7 +1003,7 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
-    public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() throws Exception {
+    public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() {
         final Properties props = configProps();
         props.setProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
         final StreamsConfig config = new StreamsConfig(props);
@@ -976,12 +1051,12 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test(expected = KafkaException.class)
-    public void shouldThrowKafkaExceptionIfStreamThreadNotConfigured() throws Exception {
+    public void shouldThrowKafkaExceptionIfStreamThreadNotConfigured() {
         partitionAssignor.configure(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1));
     }
 
     @Test(expected = KafkaException.class)
-    public void shouldThrowKafkaExceptionIfStreamThreadConfigIsNotStreamThreadInstance() throws Exception {
+    public void shouldThrowKafkaExceptionIfStreamThreadConfigIsNotStreamThreadInstance() {
         final Map<String, Object> config = new HashMap<>();
         config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
         config.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, "i am not a stream thread");
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
index cfa0e61..52c753d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
@@ -65,10 +65,9 @@ public class AssignmentInfoTest {
         assertEquals(oldVersion.activeTasks, decoded.activeTasks);
         assertEquals(oldVersion.standbyTasks, decoded.standbyTasks);
         assertEquals(0, decoded.partitionsByHost.size()); // should be empty as wasn't in V1
-        assertEquals(2, decoded.version); // automatically upgraded to v2 on decode;
+        assertEquals(1, decoded.version);
     }
 
-
     /**
      * This is a clone of what the V1 encoding did. The encode method has changed for V2
      * so it is impossible to test compatibility without having this
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index d192126..9f59b11 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -115,7 +115,7 @@ public class SmokeTestClient extends SmokeTestUtil {
             }
         });
 
-        data.process(SmokeTestUtil.printProcessorSupplier("data"));
+        data.process(SmokeTestUtil.<String, Integer>printProcessorSupplier("data"));
 
         // min
         KGroupedStream<String, Integer>
@@ -141,7 +141,7 @@ public class SmokeTestClient extends SmokeTestUtil {
         ).to(stringSerde, intSerde, "min");
 
         KTable<String, Integer> minTable = builder.table(stringSerde, intSerde, "min", "minStoreName");
-        minTable.toStream().process(SmokeTestUtil.printProcessorSupplier("min"));
+        minTable.toStream().process(SmokeTestUtil.<String, Integer>printProcessorSupplier("min"));
 
         // max
         groupedData.aggregate(
@@ -163,7 +163,7 @@ public class SmokeTestClient extends SmokeTestUtil {
         ).to(stringSerde, intSerde, "max");
 
         KTable<String, Integer> maxTable = builder.table(stringSerde, intSerde, "max", "maxStoreName");
-        maxTable.toStream().process(SmokeTestUtil.printProcessorSupplier("max"));
+        maxTable.toStream().process(SmokeTestUtil.<String, Integer>printProcessorSupplier("max"));
 
         // sum
         groupedData.aggregate(
@@ -186,7 +186,7 @@ public class SmokeTestClient extends SmokeTestUtil {
 
 
         KTable<String, Long> sumTable = builder.table(stringSerde, longSerde, "sum", "sumStoreName");
-        sumTable.toStream().process(SmokeTestUtil.printProcessorSupplier("sum"));
+        sumTable.toStream().process(SmokeTestUtil.<String, Long>printProcessorSupplier("sum"));
 
         // cnt
         groupedData.count(TimeWindows.of(TimeUnit.DAYS.toMillis(2)), "uwin-cnt")
@@ -195,7 +195,7 @@ public class SmokeTestClient extends SmokeTestUtil {
         ).to(stringSerde, longSerde, "cnt");
 
         KTable<String, Long> cntTable = builder.table(stringSerde, longSerde, "cnt", "cntStoreName");
-        cntTable.toStream().process(SmokeTestUtil.printProcessorSupplier("cnt"));
+        cntTable.toStream().process(SmokeTestUtil.<String, Long>printProcessorSupplier("cnt"));
 
         // dif
         maxTable.join(minTable,
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index c2cfd84..a0c2933 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -77,7 +77,6 @@ public class SmokeTestDriver extends SmokeTestUtil {
     // This main() is not used by the system test. It is intended to be used for local debugging.
     public static void main(String[] args) throws Exception {
         final String kafka = "localhost:9092";
-        final String zookeeper = "localhost:2181";
         final File stateDir = TestUtils.tempDirectory();
 
         final int numKeys = 20;
@@ -131,42 +130,50 @@ public class SmokeTestDriver extends SmokeTestUtil {
     }
 
     public static Map<String, Set<Integer>> generate(String kafka, final int numKeys, final int maxRecordsPerKey) throws Exception {
+        return generate(kafka, numKeys, maxRecordsPerKey, true);
+    }
+    public static Map<String, Set<Integer>> generate(final String kafka,
+                                                     final int numKeys,
+                                                     final int maxRecordsPerKey,
+                                                     final boolean autoTerminate) throws Exception {
         final Properties producerProps = new Properties();
         producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
         producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
-        // the next 4 config values make sure that all records are produced with no loss and
-        // no duplicates
+        // the next 2 config values make sure that all records are produced with no loss and no duplicates
         producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
         producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
 
-        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
+        final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
 
         int numRecordsProduced = 0;
 
-        Map<String, Set<Integer>> allData = new HashMap<>();
-        ValueList[] data = new ValueList[numKeys];
+        final Map<String, Set<Integer>> allData = new HashMap<>();
+        final ValueList[] data = new ValueList[numKeys];
         for (int i = 0; i < numKeys; i++) {
             data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
             allData.put(data[i].key, new HashSet<Integer>());
         }
-        Random rand = new Random();
+        final Random rand = new Random();
 
-        int remaining = data.length;
+        int remaining = 1; // dummy value must be positive if <autoTerminate> is false
+        if (autoTerminate) {
+            remaining = data.length;
+        }
 
         while (remaining > 0) {
-            int index = rand.nextInt(remaining);
-            String key = data[index].key;
+            final int index = autoTerminate ? rand.nextInt(remaining) : rand.nextInt(numKeys);
+            final String key = data[index].key;
             int value = data[index].next();
 
-            if (value < 0) {
+            if (autoTerminate && value < 0) {
                 remaining--;
                 data[index] = data[remaining];
             } else {
 
-                ProducerRecord<byte[], byte[]> record =
-                        new ProducerRecord<>("data", stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value));
+                final ProducerRecord<byte[], byte[]> record =
+                    new ProducerRecord<>("data", stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value));
 
                 producer.send(record, new Callback() {
                     @Override
@@ -178,11 +185,12 @@ public class SmokeTestDriver extends SmokeTestUtil {
                     }
                 });
 
-
                 numRecordsProduced++;
                 allData.get(key).add(value);
-                if (numRecordsProduced % 100 == 0)
+
+                if (numRecordsProduced % 100 == 0) {
                     System.out.println(numRecordsProduced + " records produced");
+                }
                 Utils.sleep(2);
 
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
index 73fe27c..87ab60c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
@@ -33,8 +33,6 @@ import java.io.File;
 
 public class SmokeTestUtil {
 
-    public final static int WINDOW_SIZE = 100;
-    public final static long START_TIME = 60000L * 60 * 24 * 365 * 30;
     public final static int END = Integer.MAX_VALUE;
 
     public static ProcessorSupplier<Object, Object> printProcessorSupplier(final String topic) {
@@ -46,18 +44,15 @@ public class SmokeTestUtil {
             public Processor<Object, Object> get() {
                 return new AbstractProcessor<Object, Object>() {
                     private int numRecordsProcessed = 0;
-                    private ProcessorContext context;
 
                     @Override
                     public void init(ProcessorContext context) {
                         System.out.println("initializing processor: topic=" + topic + " taskId=" + context.taskId());
                         numRecordsProcessed = 0;
-                        this.context = context;
                     }
 
                     @Override
                     public void process(Object key, Object value) {
-                        if (printOffset) System.out.println(">>> " + context.offset());
                         numRecordsProcessed++;
                         if (numRecordsProcessed % 100 == 0) {
                             System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
@@ -65,12 +60,10 @@ public class SmokeTestUtil {
                     }
 
                     @Override
-                    public void punctuate(long timestamp) {
-                    }
+                    public void punctuate(long timestamp) {}
 
                     @Override
-                    public void close() {
-                    }
+                    public void close() {}
                 };
             }
         };
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
index 304cae7..aa1def1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
@@ -24,7 +24,7 @@ import java.util.Set;
 public class StreamsSmokeTest {
 
     /**
-     *  args ::= command kafka zookeeper stateDir
+     *  args ::= command kafka zookeeper stateDir disableAutoTerminate
      *  command := "run" | "process"
      *
      * @param args
@@ -33,11 +33,13 @@ public class StreamsSmokeTest {
         String kafka = args[0];
         String stateDir = args.length > 1 ? args[1] : null;
         String command = args.length > 2 ? args[2] : null;
+        boolean disableAutoTerminate = args.length > 3;
 
-        System.out.println("StreamsTest instance started");
+        System.out.println("StreamsTest instance started (StreamsSmokeTest)");
         System.out.println("command=" + command);
         System.out.println("kafka=" + kafka);
         System.out.println("stateDir=" + stateDir);
+        System.out.println("disableAutoTerminate=" + disableAutoTerminate);
 
         switch (command) {
             case "standalone":
@@ -47,8 +49,12 @@ public class StreamsSmokeTest {
                 // this starts the driver (data generation and result verification)
                 final int numKeys = 10;
                 final int maxRecordsPerKey = 500;
-                Map<String, Set<Integer>> allData = SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey);
-                SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey);
+                if (disableAutoTerminate) {
+                    SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey, false);
+                } else {
+                    Map<String, Set<Integer>> allData = SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey);
+                    SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey);
+                }
                 break;
             case "process":
                 // this starts a KafkaStreams client
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
new file mode 100644
index 0000000..17ff97e
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+
+import java.util.Properties;
+
+public class StreamsUpgradeTest {
+
+    @SuppressWarnings("unchecked")
+    public static void main(final String[] args) {
+        if (args.length < 2) {
+            System.err.println("StreamsUpgradeTest requires two argument (kafka-url, state-dir, [upgradeFrom: optional]) but only " + args.length + " provided: "
+                + (args.length > 0 ? args[0] : ""));
+        }
+        final String kafka = args[0];
+        final String stateDir = args[1];
+        final String upgradeFrom = args.length > 2 ? args[2] : null;
+
+        System.out.println("StreamsTest instance started (StreamsUpgradeTest trunk)");
+        System.out.println("kafka=" + kafka);
+        System.out.println("stateDir=" + stateDir);
+        System.out.println("upgradeFrom=" + upgradeFrom);
+
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        final KStream dataStream = builder.stream("data");
+        dataStream.process(SmokeTestUtil.printProcessorSupplier("data"));
+        dataStream.to("echo");
+
+        final Properties config = new Properties();
+        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
+        config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        config.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        if (upgradeFrom != null) {
+            config.setProperty(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom);
+        }
+
+
+        final KafkaStreams streams = new KafkaStreams(builder, config);
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                System.out.println("closing Kafka Streams instance");
+                System.out.flush();
+                streams.close();
+                System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+                System.out.flush();
+            }
+        });
+    }
+}
diff --git a/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
new file mode 100644
index 0000000..7d3ed43
--- /dev/null
+++ b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.util.Properties;
+
+public class StreamsUpgradeTest {
+
+    @SuppressWarnings("unchecked")
+    public static void main(final String[] args) {
+        if (args.length < 3) {
+            System.err.println("StreamsUpgradeTest requires three argument (kafka-url, zookeeper-url, state-dir) but only " + args.length + " provided: "
+                + (args.length > 0 ? args[0] + " " : "")
+                + (args.length > 1 ? args[1] : ""));
+        }
+        final String kafka = args[0];
+        final String zookeeper = args[1];
+        final String stateDir = args[2];
+
+        System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.10.0)");
+        System.out.println("kafka=" + kafka);
+        System.out.println("zookeeper=" + zookeeper);
+        System.out.println("stateDir=" + stateDir);
+
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KStream dataStream = builder.stream("data");
+        dataStream.process(printProcessorSupplier());
+        dataStream.to("echo");
+
+        final Properties config = new Properties();
+        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
+        config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
+        config.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+
+        final KafkaStreams streams = new KafkaStreams(builder, config);
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                System.out.println("closing Kafka Streams instance");
+                System.out.flush();
+                streams.close();
+                System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+                System.out.flush();
+            }
+        });
+    }
+
+    private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
+        return new ProcessorSupplier<K, V>() {
+            public Processor<K, V> get() {
+                return new AbstractProcessor<K, V>() {
+                    private int numRecordsProcessed = 0;
+
+                    @Override
+                    public void init(final ProcessorContext context) {
+                        System.out.println("initializing processor: topic=data taskId=" + context.taskId());
+                        numRecordsProcessed = 0;
+                    }
+
+                    @Override
+                    public void process(final K key, final V value) {
+                        numRecordsProcessed++;
+                        if (numRecordsProcessed % 100 == 0) {
+                            System.out.println("processed " + numRecordsProcessed + " records from topic=data");
+                        }
+                    }
+
+                    @Override
+                    public void punctuate(final long timestamp) {}
+
+                    @Override
+                    public void close() {}
+                };
+            }
+        };
+    }
+}
diff --git a/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
new file mode 100644
index 0000000..604fbe7
--- /dev/null
+++ b/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.util.Properties;
+
+public class StreamsUpgradeTest {
+
+    /**
+     * This test cannot be run executed, as long as Kafka 0.10.1.2 is not released
+     */
+    @SuppressWarnings("unchecked")
+    public static void main(final String[] args) {
+        if (args.length < 3) {
+            System.err.println("StreamsUpgradeTest requires three argument (kafka-url, zookeeper-url, state-dir, [upgradeFrom: optional]) but only " + args.length + " provided: "
+                + (args.length > 0 ? args[0] + " " : "")
+                + (args.length > 1 ? args[1] : ""));
+        }
+        String kafka = args[0];
+        String zookeeper = args[1];
+        String stateDir = args[2];
+        String upgradeFrom = args.length > 3 ? args[3] : null;
+
+        System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.10.1)");
+        System.out.println("kafka=" + kafka);
+        System.out.println("zookeeper=" + zookeeper);
+        System.out.println("stateDir=" + stateDir);
+        System.out.println("upgradeFrom=" + upgradeFrom);
+
+        final KStreamBuilder builder = new KStreamBuilder();
+        KStream dataStream = builder.stream("data");
+        dataStream.process(printProcessorSupplier());
+        dataStream.to("echo");
+
+        final Properties config = new Properties();
+        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
+        config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
+        config.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        if (upgradeFrom != null) {
+            // TODO: because Kafka 0.10.1.2 is not released yet, thus `UPGRADE_FROM_CONFIG` is not available yet
+            //config.setProperty(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom);
+            config.setProperty("upgrade.from", upgradeFrom);
+        }
+
+        final KafkaStreams streams = new KafkaStreams(builder, config);
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                System.out.println("closing Kafka Streams instance");
+                System.out.flush();
+                streams.close();
+                System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+                System.out.flush();
+            }
+        });
+    }
+
+    private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
+        return new ProcessorSupplier<K, V>() {
+            public Processor<K, V> get() {
+                return new AbstractProcessor<K, V>() {
+                    private int numRecordsProcessed = 0;
+
+                    @Override
+                    public void init(final ProcessorContext context) {
+                        System.out.println("initializing processor: topic=data taskId=" + context.taskId());
+                        numRecordsProcessed = 0;
+                    }
+
+                    @Override
+                    public void process(final K key, final V value) {
+                        numRecordsProcessed++;
+                        if (numRecordsProcessed % 100 == 0) {
+                            System.out.println("processed " + numRecordsProcessed + " records from topic=data");
+                        }
+                    }
+
+                    @Override
+                    public void punctuate(final long timestamp) {}
+
+                    @Override
+                    public void close() {}
+                };
+            }
+        };
+    }
+}
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index e7be947..b7de568 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -20,6 +20,7 @@ from ducktape.services.service import Service
 from ducktape.utils.util import wait_until
 
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
+from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1
 
 
 class StreamsTestBaseService(KafkaPathResolverMixin, Service):
@@ -33,6 +34,8 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
     LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
     PID_FILE = os.path.join(PERSISTENT_ROOT, "streams.pid")
 
+    CLEAN_NODE_ENABLED = True
+
     logs = {
         "streams_log": {
             "path": LOG_FILE,
@@ -43,6 +46,114 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
         "streams_stderr": {
             "path": STDERR_FILE,
             "collect_default": True},
+        "streams_log.0-1": {
+            "path": LOG_FILE + ".0-1",
+            "collect_default": True},
+        "streams_stdout.0-1": {
+            "path": STDOUT_FILE + ".0-1",
+            "collect_default": True},
+        "streams_stderr.0-1": {
+            "path": STDERR_FILE + ".0-1",
+            "collect_default": True},
+        "streams_log.0-2": {
+            "path": LOG_FILE + ".0-2",
+            "collect_default": True},
+        "streams_stdout.0-2": {
+            "path": STDOUT_FILE + ".0-2",
+            "collect_default": True},
+        "streams_stderr.0-2": {
+            "path": STDERR_FILE + ".0-2",
+            "collect_default": True},
+        "streams_log.0-3": {
+            "path": LOG_FILE + ".0-3",
+            "collect_default": True},
+        "streams_stdout.0-3": {
+            "path": STDOUT_FILE + ".0-3",
+            "collect_default": True},
+        "streams_stderr.0-3": {
+            "path": STDERR_FILE + ".0-3",
+            "collect_default": True},
+        "streams_log.0-4": {
+            "path": LOG_FILE + ".0-4",
+            "collect_default": True},
+        "streams_stdout.0-4": {
+            "path": STDOUT_FILE + ".0-4",
+            "collect_default": True},
+        "streams_stderr.0-4": {
+            "path": STDERR_FILE + ".0-4",
+            "collect_default": True},
+        "streams_log.0-5": {
+            "path": LOG_FILE + ".0-5",
+            "collect_default": True},
+        "streams_stdout.0-5": {
+            "path": STDOUT_FILE + ".0-5",
+            "collect_default": True},
+        "streams_stderr.0-5": {
+            "path": STDERR_FILE + ".0-5",
+            "collect_default": True},
+        "streams_log.0-6": {
+            "path": LOG_FILE + ".0-6",
+            "collect_default": True},
+        "streams_stdout.0-6": {
+            "path": STDOUT_FILE + ".0-6",
+            "collect_default": True},
+        "streams_stderr.0-6": {
+            "path": STDERR_FILE + ".0-6",
+            "collect_default": True},
+        "streams_log.1-1": {
+            "path": LOG_FILE + ".1-1",
+            "collect_default": True},
+        "streams_stdout.1-1": {
+            "path": STDOUT_FILE + ".1-1",
+            "collect_default": True},
+        "streams_stderr.1-1": {
+            "path": STDERR_FILE + ".1-1",
+            "collect_default": True},
+        "streams_log.1-2": {
+            "path": LOG_FILE + ".1-2",
+            "collect_default": True},
+        "streams_stdout.1-2": {
+            "path": STDOUT_FILE + ".1-2",
+            "collect_default": True},
+        "streams_stderr.1-2": {
+            "path": STDERR_FILE + ".1-2",
+            "collect_default": True},
+        "streams_log.1-3": {
+            "path": LOG_FILE + ".1-3",
+            "collect_default": True},
+        "streams_stdout.1-3": {
+            "path": STDOUT_FILE + ".1-3",
+            "collect_default": True},
+        "streams_stderr.1-3": {
+            "path": STDERR_FILE + ".1-3",
+            "collect_default": True},
+        "streams_log.1-4": {
+            "path": LOG_FILE + ".1-4",
+            "collect_default": True},
+        "streams_stdout.1-4": {
+            "path": STDOUT_FILE + ".1-4",
+            "collect_default": True},
+        "streams_stderr.1-4": {
+            "path": STDERR_FILE + ".1-4",
+            "collect_default": True},
+        "streams_log.1-5": {
+            "path": LOG_FILE + ".1-5",
+            "collect_default": True},
+        "streams_stdout.1-5": {
+            "path": STDOUT_FILE + ".1-5",
+            "collect_default": True},
+        "streams_stderr.1-5": {
+            "path": STDERR_FILE + ".1-5",
+            "collect_default": True},
+        "streams_log.1-6": {
+            "path": LOG_FILE + ".1-6",
+            "collect_default": True},
+        "streams_stdout.1-6": {
+            "path": STDOUT_FILE + ".1-6",
+            "collect_default": True},
+        "streams_stderr.1-6": {
+            "path": STDERR_FILE + ".1-6",
+            "collect_default": True},
     }
 
     def __init__(self, test_context, kafka, streams_class_name, user_test_args, user_test_args1=None, user_test_args2=None):
@@ -107,7 +218,8 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
 
     def clean_node(self, node):
         node.account.kill_process("streams", clean_shutdown=False, allow_fail=True)
-        node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False)
+        if self.CLEAN_NODE_ENABLED:
+            node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False)
 
     def start_cmd(self, node):
         args = self.args.copy()
@@ -153,7 +265,28 @@ class StreamsSmokeTestBaseService(StreamsTestBaseService):
 class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService):
     def __init__(self, test_context, kafka):
         super(StreamsSmokeTestDriverService, self).__init__(test_context, kafka, "run")
+        self.DISABLE_AUTO_TERMINATE = ""
+
+    def disable_auto_terminate(self):
+        self.DISABLE_AUTO_TERMINATE = "disableAutoTerminate"
+
+    def start_cmd(self, node):
+        args = self.args.copy()
+        args['kafka'] = self.kafka.bootstrap_servers()
+        args['state_dir'] = self.PERSISTENT_ROOT
+        args['stdout'] = self.STDOUT_FILE
+        args['stderr'] = self.STDERR_FILE
+        args['pidfile'] = self.PID_FILE
+        args['log4j'] = self.LOG4J_CONFIG_FILE
+        args['disable_auto_terminate'] = self.DISABLE_AUTO_TERMINATE
+        args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
 
+        cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
+              "INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
+              " %(kafka)s %(state_dir)s %(user_test_args)s %(disable_auto_terminate)s" \
+              " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
+
+        return cmd
 
 class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService):
     def __init__(self, test_context, kafka):
@@ -171,3 +304,41 @@ class StreamsBrokerCompatibilityService(StreamsTestBaseService):
                                                                 kafka,
                                                                 "org.apache.kafka.streams.tests.BrokerCompatibilityTest",
                                                                 "dummy")
+
+class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService):
+    def __init__(self, test_context, kafka):
+        super(StreamsUpgradeTestJobRunnerService, self).__init__(test_context,
+                                                                 kafka,
+                                                                 "org.apache.kafka.streams.tests.StreamsUpgradeTest",
+                                                                 "")
+        self.UPGRADE_FROM = ""
+
+    def set_version(self, kafka_streams_version):
+        self.KAFKA_STREAMS_VERSION = kafka_streams_version
+
+    def set_upgrade_from(self, upgrade_from):
+        self.UPGRADE_FROM = upgrade_from
+
+    def start_cmd(self, node):
+        args = self.args.copy()
+        args['kafka'] = self.kafka.bootstrap_servers()
+        if self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_0) or self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_1):
+            args['zk'] = self.kafka.zk.connect_setting()
+        else:
+            args['zk'] = ""
+        args['state_dir'] = self.PERSISTENT_ROOT
+        args['stdout'] = self.STDOUT_FILE
+        args['stderr'] = self.STDERR_FILE
+        args['pidfile'] = self.PID_FILE
+        args['log4j'] = self.LOG4J_CONFIG_FILE
+        args['version'] = self.KAFKA_STREAMS_VERSION
+        args['upgrade_from'] = self.UPGRADE_FROM
+        args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
+
+        cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
+              "INCLUDE_TEST_JARS=true UPGRADE_KAFKA_STREAMS_TEST_VERSION=%(version)s " \
+              " %(kafka_run_class)s %(streams_class_name)s " \
+              " %(kafka)s %(zk)s %(state_dir)s %(user_test_args)s %(upgrade_from)s" \
+              " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
+
+        return cmd
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py
new file mode 100644
index 0000000..294e354
--- /dev/null
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -0,0 +1,242 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize
+from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsUpgradeTestJobRunnerService
+from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, DEV_VERSION
+import random
+
+class StreamsUpgradeTest(KafkaTest):
+    """
+    Test upgrading Kafka Streams (all version combination)
+    If metadata was changes, upgrade is more difficult
+    Metadata version was bumped in 0.10.1.0
+    """
+
+    def __init__(self, test_context):
+        super(StreamsUpgradeTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
+            'echo' : { 'partitions': 5 },
+            'data' : { 'partitions': 5 }
+        })
+
+        self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
+        self.driver.disable_auto_terminate()
+        self.processor1 = StreamsUpgradeTestJobRunnerService(test_context, self.kafka)
+        self.processor2 = StreamsUpgradeTestJobRunnerService(test_context, self.kafka)
+        self.processor3 = StreamsUpgradeTestJobRunnerService(test_context, self.kafka)
+
+    def test_simple_upgrade(self):
+        """
+        Starts 3 KafkaStreams instances with version 0.10.1, and upgrades one-by-one to 0.10.2
+        """
+
+        self.driver.start()
+        self.start_all_nodes_with(str(LATEST_0_10_1))
+
+        self.processors = [self.processor1, self.processor2, self.processor3]
+
+        counter = 1
+        random.seed()
+
+        random.shuffle(self.processors)
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False
+            self.do_rolling_bounce(p, "", str(DEV_VERSION), counter)
+            counter = counter + 1
+
+        # shutdown
+        self.driver.stop()
+        self.driver.wait()
+
+        random.shuffle(self.processors)
+        for p in self.processors:
+            node = p.node
+            with node.account.monitor_log(p.STDOUT_FILE) as monitor:
+                p.stop()
+                monitor.wait_until("UPGRADE-TEST-CLIENT-CLOSED",
+                                   timeout_sec=60,
+                                   err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account))
+
+        self.driver.stop()
+
+    #@parametrize(new_version=str(LATEST_0_10_1)) we cannot run this test until Kafka 0.10.1.2 is released
+    @parametrize(new_version=str(DEV_VERSION))
+    def test_metadata_upgrade(self, new_version):
+        """
+        Starts 3 KafkaStreams instances with version 0.10.0, and upgrades one-by-one to <new_version>
+        """
+
+        self.driver.start()
+        self.start_all_nodes_with(str(LATEST_0_10_0))
+
+        self.processors = [self.processor1, self.processor2, self.processor3]
+
+        counter = 1
+        random.seed()
+
+        # first rolling bounce
+        random.shuffle(self.processors)
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False
+            self.do_rolling_bounce(p, "0.10.0", new_version, counter)
+            counter = counter + 1
+
+        # second rolling bounce
+        random.shuffle(self.processors)
+        for p in self.processors:
+            self.do_rolling_bounce(p, "", new_version, counter)
+            counter = counter + 1
+
+        # shutdown
+        self.driver.stop()
+        self.driver.wait()
+
+        random.shuffle(self.processors)
+        for p in self.processors:
+            node = p.node
+            with node.account.monitor_log(p.STDOUT_FILE) as monitor:
+                p.stop()
+                monitor.wait_until("UPGRADE-TEST-CLIENT-CLOSED",
+                                   timeout_sec=60,
+                                   err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account))
+
+        self.driver.stop()
+
+    def start_all_nodes_with(self, version):
+        # start first with <version>
+        self.prepare_for(self.processor1, version)
+        node1 = self.processor1.node
+        with node1.account.monitor_log(self.processor1.STDOUT_FILE) as monitor:
+            with node1.account.monitor_log(self.processor1.LOG_FILE) as log_monitor:
+                self.processor1.start()
+                log_monitor.wait_until("Kafka version : " + version,
+                                       timeout_sec=60,
+                                       err_msg="Could not detect Kafka Streams version " + version + " " + str(node1.account))
+                monitor.wait_until("processed 100 records from topic",
+                                   timeout_sec=60,
+                                   err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account))
+
+        # start second with <version>
+        self.prepare_for(self.processor2, version)
+        node2 = self.processor2.node
+        with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor:
+            with node2.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor:
+                with node2.account.monitor_log(self.processor2.LOG_FILE) as log_monitor:
+                    self.processor2.start()
+                    log_monitor.wait_until("Kafka version : " + version,
+                                           timeout_sec=60,
+                                           err_msg="Could not detect Kafka Streams version " + version + " " + str(node2.account))
+                    first_monitor.wait_until("processed 100 records from topic",
+                                             timeout_sec=60,
+                                             err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account))
+                    second_monitor.wait_until("processed 100 records from topic",
+                                              timeout_sec=60,
+                                              err_msg="Never saw output 'processed 100 records from topic' on" + str(node2.account))
+
+        # start third with <version>
+        self.prepare_for(self.processor3, version)
+        node3 = self.processor3.node
+        with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor:
+            with node2.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor:
+                with node3.account.monitor_log(self.processor3.STDOUT_FILE) as third_monitor:
+                    with node3.account.monitor_log(self.processor3.LOG_FILE) as log_monitor:
+                        self.processor3.start()
+                        log_monitor.wait_until("Kafka version : " + version,
+                                               timeout_sec=60,
+                                               err_msg="Could not detect Kafka Streams version " + version + " " + str(node3.account))
+                        first_monitor.wait_until("processed 100 records from topic",
+                                                 timeout_sec=60,
+                                                 err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account))
+                        second_monitor.wait_until("processed 100 records from topic",
+                                                  timeout_sec=60,
+                                                  err_msg="Never saw output 'processed 100 records from topic' on" + str(node2.account))
+                        third_monitor.wait_until("processed 100 records from topic",
+                                                  timeout_sec=60,
+                                                  err_msg="Never saw output 'processed 100 records from topic' on" + str(node3.account))
+
+    @staticmethod
+    def prepare_for(processor, version):
+        processor.node.account.ssh("rm -rf " + processor.PERSISTENT_ROOT, allow_fail=False)
+        processor.set_version(version)
+
+    def do_rolling_bounce(self, processor, upgrade_from, new_version, counter):
+        first_other_processor = None
+        second_other_processor = None
+        for p in self.processors:
+            if p != processor:
+                if first_other_processor is None:
+                    first_other_processor = p
+                else:
+                    second_other_processor = p
+
+        node = processor.node
+        first_other_node = first_other_processor.node
+        second_other_node = second_other_processor.node
+
+        # stop processor and wait for rebalance of others
+        with first_other_node.account.monitor_log(first_other_processor.STDOUT_FILE) as first_other_monitor:
+            with second_other_node.account.monitor_log(second_other_processor.STDOUT_FILE) as second_other_monitor:
+                processor.stop()
+                first_other_monitor.wait_until("processed 100 records from topic",
+                                               timeout_sec=60,
+                                               err_msg="Never saw output 'processed 100 records from topic' on" + str(first_other_node.account))
+                second_other_monitor.wait_until("processed 100 records from topic",
+                                                timeout_sec=60,
+                                                err_msg="Never saw output 'processed 100 records from topic' on" + str(second_other_node.account))
+        node.account.ssh_capture("grep UPGRADE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False)
+
+        if upgrade_from == "":  # upgrade disabled -- second round of rolling bounces
+            roll_counter = ".1-"  # second round of rolling bounces
+        else:
+            roll_counter = ".0-"  # first  round of rolling boundes
+
+        node.account.ssh("mv " + processor.STDOUT_FILE + " " + processor.STDOUT_FILE + roll_counter + str(counter), allow_fail=False)
+        node.account.ssh("mv " + processor.STDERR_FILE + " " + processor.STDERR_FILE + roll_counter + str(counter), allow_fail=False)
+        node.account.ssh("mv " + processor.LOG_FILE + " " + processor.LOG_FILE + roll_counter + str(counter), allow_fail=False)
+
+        if new_version == str(DEV_VERSION):
+            processor.set_version("")  # set to TRUNK
+        else:
+            processor.set_version(new_version)
+        processor.set_upgrade_from(upgrade_from)
+
+        grep_metadata_error = "grep \"org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode subscription data: version=2\" "
+        with node.account.monitor_log(processor.STDOUT_FILE) as monitor:
+            with node.account.monitor_log(processor.LOG_FILE) as log_monitor:
+                with first_other_node.account.monitor_log(first_other_processor.STDOUT_FILE) as first_other_monitor:
+                    with second_other_node.account.monitor_log(second_other_processor.STDOUT_FILE) as second_other_monitor:
+                        processor.start()
+
+                        log_monitor.wait_until("Kafka version : " + new_version,
+                                               timeout_sec=60,
+                                               err_msg="Could not detect Kafka Streams version " + new_version + " " + str(node.account))
+                        first_other_monitor.wait_until("processed 100 records from topic",
+                                                       timeout_sec=60,
+                                                       err_msg="Never saw output 'processed 100 records from topic' on" + str(first_other_node.account))
+                        found = list(first_other_node.account.ssh_capture(grep_metadata_error + first_other_processor.STDERR_FILE, allow_fail=True))
+                        if len(found) > 0:
+                            raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'")
+
+                        second_other_monitor.wait_until("processed 100 records from topic",
+                                                        timeout_sec=60,
+                                                        err_msg="Never saw output 'processed 100 records from topic' on" + str(second_other_node.account))
+                        found = list(second_other_node.account.ssh_capture(grep_metadata_error + second_other_processor.STDERR_FILE, allow_fail=True))
+                        if len(found) > 0:
+                            raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'")
+
+                        monitor.wait_until("processed 100 records from topic",
+                                           timeout_sec=60,
+                                           err_msg="Never saw output 'processed 100 records from topic' on" + str(node.account))
\ No newline at end of file
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index 7cd489d..df95602 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -61,6 +61,7 @@ def get_version(node=None):
         return DEV_BRANCH
 
 DEV_BRANCH = KafkaVersion("dev")
+DEV_VERSION = KafkaVersion("0.10.2.2-SNAPSHOT")
 
 # 0.8.2.X versions
 V_0_8_2_1 = KafkaVersion("0.8.2.1")
diff --git a/vagrant/base.sh b/vagrant/base.sh
index 0bb0f30..70987c6 100755
--- a/vagrant/base.sh
+++ b/vagrant/base.sh
@@ -52,6 +52,8 @@ get_kafka() {
 
     kafka_dir=/opt/kafka-$version
     url=https://s3-us-west-2.amazonaws.com/kafka-packages-$version/kafka_2.10-$version.tgz
+    # the .tgz above does not include the streams test jar hence we need to get it separately
+    url_streams_test=https://s3-us-west-2.amazonaws.com/kafka-packages/kafka-streams-$version-test.jar
     if [ ! -d /opt/kafka-$version ]; then
         pushd /tmp
         curl -O $url

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.

Mime
View raw message