kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-5037: Fix infinite loop if all input topics are unknown at startup
Date Thu, 19 Jul 2018 22:23:06 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 82f124a  KAFKA-5037: Fix infinite loop if all input topics are unknown at startup
82f124a is described below

commit 82f124ae30116b2b7d45878aaeb9699b6bdf1c30
Author: Ted Yu <yuzhihong@gmail.com>
AuthorDate: Thu Jul 19 15:22:53 2018 -0700

    KAFKA-5037: Fix infinite loop if all input topics are unknown at startup
    
    1. At the beginning of assign, we first check that all the non-repartition source topics are included in the metadata. If not, we log an error at the leader and set an error in the Assignment userData bytes, indicating that leader cannot complete assignment and the error code would indicate the root cause of it.
    
    2. Upon receiving the assignment, if the error is not NONE the streams will shutdown itself with a log entry re-stating the root cause interpreted from the error code.
    
    Author: tedyu <yuzhihong@gmail.com>
    
    Reviewers: Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>
    
    Closes #5322 from tedyu/trunk
---
 .../org/apache/kafka/streams/KafkaStreams.java     |   4 +-
 .../org/apache/kafka/streams/StreamsConfig.java    |   2 +-
 .../streams/processor/DefaultPartitionGrouper.java |  17 +--
 .../streams/processor/internals/StreamThread.java  |  27 ++--
 .../internals/StreamsPartitionAssignor.java        | 126 +++++++++++++++----
 .../internals/assignment/AssignmentInfo.java       |  52 ++++++--
 .../internals/assignment/SubscriptionInfo.java     |  25 +++-
 .../apache/kafka/streams/KafkaStreamsWrapper.java  |  51 ++++++++
 .../integration/utils/IntegrationTestUtils.java    |  28 +++++
 .../processor/DefaultPartitionGrouperTest.java     |  10 +-
 .../CopartitionedTopicsValidatorTest.java          |  23 +---
 .../processor/internals/StreamThreadTest.java      |  20 +--
 .../internals/StreamsPartitionAssignorTest.java    |  31 ++---
 .../internals/assignment/AssignmentInfoTest.java   |  22 ++--
 .../internals/assignment/SubscriptionInfoTest.java |   7 ++
 .../kafka/streams/tests/StreamsUpgradeTest.java    |   2 +-
 ...StreamToTableJoinScalaIntegrationTestBase.scala | 137 +++++++++++++++++++++
 ...bleJoinScalaIntegrationTestImplicitSerdes.scala |  95 +-------------
 ...JoinWithIncompleteMetadataIntegrationTest.scala |  88 +++++++++++++
 .../tests/streams/streams_upgrade_test.py          |  12 +-
 20 files changed, 550 insertions(+), 229 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index cef8116..8ed80dc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -136,7 +136,7 @@ public class KafkaStreams {
     private final String clientId;
     private final Metrics metrics;
     private final StreamsConfig config;
-    private final StreamThread[] threads;
+    protected final StreamThread[] threads;
     private final StateDirectory stateDirectory;
     private final StreamsMetadataState streamsMetadataState;
     private final ScheduledExecutorService stateDirCleaner;
@@ -209,7 +209,7 @@ public class KafkaStreams {
     }
 
     private final Object stateLock = new Object();
-    private volatile State state = State.CREATED;
+    protected volatile State state = State.CREATED;
 
     private boolean waitOnState(final State targetState, final long waitMs) {
         long begin = time.milliseconds();
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index eee5bc6..d1f7d93 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -727,7 +727,7 @@ public class StreamsConfig extends AbstractConfig {
 
     public static class InternalConfig {
         public static final String TASK_MANAGER_FOR_PARTITION_ASSIGNOR = "__task.manager.instance__";
-        public static final String VERSION_PROBING_FLAG = "__version.probing.flag__";
+        public static final String ASSIGNMENT_ERROR_CODE = "__assignment.error.code__";
     }
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
index cee9488..712f8a7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,18 +79,14 @@ public class DefaultPartitionGrouper implements PartitionGrouper {
         int maxNumPartitions = 0;
         for (String topic : topics) {
             List<PartitionInfo> partitions = metadata.partitionsForTopic(topic);
-
             if (partitions.isEmpty()) {
+                log.error("Empty partitions for topic {}", topic);
+                throw new RuntimeException("Empty partitions for topic " + topic);
+            }
 
-                log.warn("Skipping creating tasks for the topic group {} since topic {}'s metadata is not available yet;"
-                         + " no tasks for this topic group will be assigned to any client.\n"
-                         + " Make sure all supplied topics in the topology are created before starting"
-                         + " as this could lead to unexpected results", topics, topic);
-                return StreamsPartitionAssignor.NOT_AVAILABLE;
-            } else {
-                int numPartitions = partitions.size();
-                if (numPartitions > maxNumPartitions)
-                    maxNumPartitions = numPartitions;
+            int numPartitions = partitions.size();
+            if (numPartitions > maxNumPartitions) {
+                maxNumPartitions = numPartitions;
             }
         }
         return maxNumPartitions;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index f425bb4..31de839 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -63,7 +63,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.util.Collections.singleton;
@@ -261,12 +260,18 @@ public class StreamThread extends Thread {
                 taskManager.suspendedActiveTaskIds(),
                 taskManager.suspendedStandbyTaskIds());
 
+            if (streamThread.assignmentErrorCode.get() == StreamsPartitionAssignor.Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
+                log.debug("Received error code {} - shutdown", streamThread.assignmentErrorCode.get());
+                streamThread.shutdown();
+                streamThread.setStateListener(null);
+                return;
+            }
             final long start = time.milliseconds();
             try {
                 if (streamThread.setState(State.PARTITIONS_ASSIGNED) == null) {
                     return;
                 }
-                if (!streamThread.versionProbingFlag.get()) {
+                if (streamThread.assignmentErrorCode.get() == StreamsPartitionAssignor.Error.NONE.code()) {
                     taskManager.createTasks(assignment);
                 }
             } catch (final Throwable t) {
@@ -302,8 +307,8 @@ public class StreamThread extends Thread {
                 final long start = time.milliseconds();
                 try {
                     // suspend active tasks
-                    if (streamThread.versionProbingFlag.get()) {
-                        streamThread.versionProbingFlag.set(false);
+                    if (streamThread.assignmentErrorCode.get() == StreamsPartitionAssignor.Error.VERSION_PROBING.code()) {
+                        streamThread.assignmentErrorCode.set(StreamsPartitionAssignor.Error.NONE.code());
                     } else {
                         taskManager.suspendTasksAndState();
                     }
@@ -563,7 +568,7 @@ public class StreamThread extends Thread {
     private final String logPrefix;
     private final TaskManager taskManager;
     private final StreamsMetricsThreadImpl streamsMetrics;
-    private final AtomicBoolean versionProbingFlag;
+    private final AtomicInteger assignmentErrorCode;
 
     private long lastCommitMs;
     private long timerStartedMs;
@@ -657,8 +662,8 @@ public class StreamThread extends Thread {
         final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
         final Map<String, Object> consumerConfigs = config.getMainConsumerConfigs(applicationId, threadClientId);
         consumerConfigs.put(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, taskManager);
-        final AtomicBoolean versionProbingFlag = new AtomicBoolean();
-        consumerConfigs.put(StreamsConfig.InternalConfig.VERSION_PROBING_FLAG, versionProbingFlag);
+        final AtomicInteger assignmentErrorCode = new AtomicInteger();
+        consumerConfigs.put(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE, assignmentErrorCode);
         String originalReset = null;
         if (!builder.latestResetTopicsPattern().pattern().equals("") || !builder.earliestResetTopicsPattern().pattern().equals("")) {
             originalReset = (String) consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
@@ -679,7 +684,7 @@ public class StreamThread extends Thread {
             builder,
             threadClientId,
             logContext,
-            versionProbingFlag);
+            assignmentErrorCode);
     }
 
     public StreamThread(final Time time,
@@ -693,7 +698,7 @@ public class StreamThread extends Thread {
                         final InternalTopologyBuilder builder,
                         final String threadClientId,
                         final LogContext logContext,
-                        final AtomicBoolean versionProbingFlag) {
+                        final AtomicInteger assignmentErrorCode) {
         super(threadClientId);
 
         this.stateLock = new Object();
@@ -710,7 +715,7 @@ public class StreamThread extends Thread {
         this.restoreConsumer = restoreConsumer;
         this.consumer = consumer;
         this.originalReset = originalReset;
-        this.versionProbingFlag = versionProbingFlag;
+        this.assignmentErrorCode = assignmentErrorCode;
 
         this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
         this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
@@ -765,7 +770,7 @@ public class StreamThread extends Thread {
         while (isRunning()) {
             try {
                 recordsProcessedBeforeCommit = runOnce(recordsProcessedBeforeCommit);
-                if (versionProbingFlag.get()) {
+                if (assignmentErrorCode.get() == StreamsPartitionAssignor.Error.VERSION_PROBING.code()) {
                     log.info("Version probing detected. Triggering new rebalance.");
                     enforceRebalance();
                 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index db94ac0..d81d4f1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -51,7 +51,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.kafka.common.utils.Utils.getHost;
 import static org.apache.kafka.common.utils.Utils.getPort;
@@ -59,16 +59,44 @@ import static org.apache.kafka.common.utils.Utils.getPort;
 public class StreamsPartitionAssignor implements PartitionAssignor, Configurable {
 
     private final static int UNKNOWN = -1;
-    public final static int NOT_AVAILABLE = -2;
     private final static int VERSION_ONE = 1;
     private final static int VERSION_TWO = 2;
     private final static int VERSION_THREE = 3;
+    private final static int VERSION_FOUR = 4;
     private final static int EARLIEST_PROBEABLE_VERSION = VERSION_THREE;
     private int minReceivedMetadataVersion = UNKNOWN;
     protected Set<Integer> supportedVersions = new HashSet<>();
 
     private Logger log;
     private String logPrefix;
+    public enum Error {
+        NONE(0),
+        INCOMPLETE_SOURCE_TOPIC_METADATA(1),
+        VERSION_PROBING(2);
+
+        private final int code;
+
+        Error(final int code) {
+            this.code = code;
+        }
+
+        public int code() {
+            return code;
+        }
+
+        public static Error fromCode(final int code) {
+            switch (code) {
+                case 0:
+                    return NONE;
+                case 1:
+                    return INCOMPLETE_SOURCE_TOPIC_METADATA;
+                case 2:
+                    return VERSION_PROBING;
+                default:
+                    throw new IllegalArgumentException("Unknown error code: " + code);
+            }
+        }
+    }
 
     private static class AssignedPartition implements Comparable<AssignedPartition> {
         public final TaskId taskId;
@@ -185,7 +213,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
 
     private TaskManager taskManager;
     private PartitionGrouper partitionGrouper;
-    private AtomicBoolean versionProbingFlag;
+    private AtomicInteger assignmentErrorCode;
 
     protected int usedSubscriptionMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION;
 
@@ -250,20 +278,20 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
 
         taskManager = (TaskManager) o;
 
-        final Object o2 = configs.get(StreamsConfig.InternalConfig.VERSION_PROBING_FLAG);
-        if (o2 == null) {
-            final KafkaException fatalException = new KafkaException("VersionProbingFlag is not specified");
+        final Object ai = configs.get(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE);
+        if (ai == null) {
+            final KafkaException fatalException = new KafkaException("assignmentErrorCode is not specified");
             log.error(fatalException.getMessage(), fatalException);
             throw fatalException;
         }
 
-        if (!(o2 instanceof AtomicBoolean)) {
-            final KafkaException fatalException = new KafkaException(String.format("%s is not an instance of %s", o2.getClass().getName(), AtomicBoolean.class.getName()));
+        if (!(ai instanceof AtomicInteger)) {
+            final KafkaException fatalException = new KafkaException(String.format("%s is not an instance of %s",
+                ai.getClass().getName(), AtomicInteger.class.getName()));
             log.error(fatalException.getMessage(), fatalException);
             throw fatalException;
         }
-
-        versionProbingFlag = (AtomicBoolean) o2;
+        assignmentErrorCode = (AtomicInteger) ai;
 
         numStandbyReplicas = streamsConfig.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
 
@@ -319,6 +347,26 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
         return new Subscription(new ArrayList<>(topics), data.encode());
     }
 
+    Map<String, Assignment> errorAssignment(final Map<UUID, ClientMetadata> clientsMetadata,
+                                            final String topic,
+                                            final int errorCode) {
+        log.error("{} is unknown yet during rebalance," +
+            " please make sure they have been pre-created before starting the Streams application.", topic);
+        final Map<String, Assignment> assignment = new HashMap<>();
+        for (final ClientMetadata clientMetadata : clientsMetadata.values()) {
+            for (final String consumerId : clientMetadata.consumers) {
+                assignment.put(consumerId, new Assignment(
+                    Collections.emptyList(),
+                    new AssignmentInfo(AssignmentInfo.LATEST_SUPPORTED_VERSION,
+                        Collections.emptyList(),
+                        Collections.emptyMap(),
+                        Collections.emptyMap(),
+                        errorCode).encode()
+                ));
+            }
+        }
+        return assignment;
+    }
     /*
      * This assigns tasks to consumer clients in the following steps.
      *
@@ -409,6 +457,12 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
 
         final Map<String, InternalTopicMetadata> repartitionTopicMetadata = new HashMap<>();
         for (final InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
+            for (final String topic : topicsInfo.sourceTopics) {
+                if (!topicsInfo.repartitionSourceTopics.keySet().contains(topic) &&
+                    !metadata.topics().contains(topic)) {
+                    return errorAssignment(clientsMetadata, topic, Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code);
+                }
+            }
             for (final InternalTopicConfig topic: topicsInfo.repartitionSourceTopics.values()) {
                 repartitionTopicMetadata.put(topic.name(), new InternalTopicMetadata(topic));
             }
@@ -438,12 +492,9 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
                                         numPartitionsCandidate = repartitionTopicMetadata.get(sourceTopicName).numPartitions;
                                     } else {
                                         numPartitionsCandidate = metadata.partitionCountForTopic(sourceTopicName);
-                                        if (numPartitionsCandidate == null) {
-                                            repartitionTopicMetadata.get(topicName).numPartitions = NOT_AVAILABLE;
-                                        }
                                     }
 
-                                    if (numPartitionsCandidate != null && numPartitionsCandidate > numPartitions) {
+                                    if (numPartitionsCandidate > numPartitions) {
                                         numPartitions = numPartitionsCandidate;
                                     }
                                 }
@@ -582,7 +633,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
 
         // construct the global partition assignment per host map
         final Map<HostInfo, Set<TopicPartition>> partitionsByHostState = new HashMap<>();
-        if (minReceivedMetadataVersion == 2 || minReceivedMetadataVersion == 3) {
+        if (minReceivedMetadataVersion >= 2) {
             for (final Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
                 final HostInfo hostInfo = entry.getValue().hostInfo;
 
@@ -658,7 +709,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
                 // finally, encode the assignment before sending back to coordinator
                 assignment.put(consumer, new Assignment(
                     activePartitions,
-                    new AssignmentInfo(minUserMetadataVersion, active, standby, partitionsByHostState).encode()));
+                    new AssignmentInfo(minUserMetadataVersion, active, standby, partitionsByHostState, 0).encode()));
             }
         }
 
@@ -698,7 +749,8 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
                         minUserMetadataVersion,
                         activeTasks,
                         standbyTasks,
-                        partitionsByHostState)
+                        partitionsByHostState,
+                        0)
                         .encode()
                 ));
             }
@@ -744,6 +796,11 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
         Collections.sort(partitions, PARTITION_COMPARATOR);
 
         final AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
+        if (info.errCode() != Error.NONE.code) {
+            // set flag to shutdown streams app
+            assignmentErrorCode.set(info.errCode());
+            return;
+        }
         final int receivedAssignmentMetadataVersion = info.version();
         final int leaderSupportedVersion = info.latestSupportedVersion();
 
@@ -770,7 +827,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
                 usedSubscriptionMetadataVersion = leaderSupportedVersion;
             }
 
-            versionProbingFlag.set(true);
+            assignmentErrorCode.set(Error.VERSION_PROBING.code);
             return;
         }
 
@@ -801,6 +858,18 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
                 processVersionThreeAssignment(info, partitions, activeTasks, topicToPartitionInfo);
                 partitionsByHost = info.partitionsByHost();
                 break;
+            case VERSION_FOUR:
+                if (leaderSupportedVersion > usedSubscriptionMetadataVersion) {
+                    log.info("Sent a version {} subscription and group leader's latest supported version is {}. " +
+                        "Upgrading subscription metadata version to {} for next rebalance.",
+                        usedSubscriptionMetadataVersion,
+                        leaderSupportedVersion,
+                        leaderSupportedVersion);
+                    usedSubscriptionMetadataVersion = leaderSupportedVersion;
+                }
+                processVersionFourAssignment(info, partitions, activeTasks, topicToPartitionInfo);
+                partitionsByHost = info.partitionsByHost();
+                break;
             default:
                 throw new IllegalStateException("This code should never be reached. Please file a bug report at https://issues.apache.org/jira/projects/KAFKA/");
         }
@@ -854,6 +923,13 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
         processVersionTwoAssignment(info, partitions, activeTasks, topicToPartitionInfo);
     }
 
+    private void processVersionFourAssignment(final AssignmentInfo info,
+                                              final List<TopicPartition> partitions,
+                                              final Map<TaskId, Set<TopicPartition>> activeTasks,
+                                              final Map<TopicPartition, PartitionInfo> topicToPartitionInfo) {
+        processVersionThreeAssignment(info, partitions, activeTasks, topicToPartitionInfo);
+    }
+
     // for testing
     protected void processLatestVersionAssignment(final AssignmentInfo info,
                                                   final List<TopicPartition> partitions,
@@ -877,9 +953,6 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
             final InternalTopicConfig topic = metadata.config;
             final int numPartitions = metadata.numPartitions;
 
-            if (numPartitions == NOT_AVAILABLE) {
-                continue;
-            }
             if (numPartitions < 0) {
                 throw new StreamsException(String.format("%sTopic [%s] number of partitions not defined", logPrefix, topic.name()));
             }
@@ -905,9 +978,12 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
 
     static class CopartitionedTopicsValidator {
         private final String logPrefix;
+        private Logger log;
 
         CopartitionedTopicsValidator(final String logPrefix) {
             this.logPrefix = logPrefix;
+            final LogContext logContext = new LogContext(logPrefix);
+            log = logContext.logger(getClass());
         }
 
         void validate(final Set<String> copartitionGroup,
@@ -918,9 +994,10 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
             for (final String topic : copartitionGroup) {
                 if (!allRepartitionTopicsNumPartitions.containsKey(topic)) {
                     final Integer partitions = metadata.partitionCountForTopic(topic);
-
                     if (partitions == null) {
-                        throw new org.apache.kafka.streams.errors.TopologyException(String.format("%sTopic not found: %s", logPrefix, topic));
+                        String str = String.format("%sTopic not found: %s", logPrefix, topic);
+                        log.error(str);
+                        throw new IllegalStateException(str);
                     }
 
                     if (numPartitions == UNKNOWN) {
@@ -930,9 +1007,6 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
                         Arrays.sort(topics);
                         throw new org.apache.kafka.streams.errors.TopologyException(String.format("%sTopics not co-partitioned: [%s]", logPrefix, Utils.join(Arrays.asList(topics), ",")));
                     }
-                } else if (allRepartitionTopicsNumPartitions.get(topic).numPartitions == NOT_AVAILABLE) {
-                    numPartitions = NOT_AVAILABLE;
-                    break;
                 }
             }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index c577830..1179ca0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -41,11 +41,12 @@ public class AssignmentInfo {
 
     private static final Logger log = LoggerFactory.getLogger(AssignmentInfo.class);
 
-    public static final int LATEST_SUPPORTED_VERSION = 3;
+    public static final int LATEST_SUPPORTED_VERSION = 4;
     static final int UNKNOWN = -1;
 
     private final int usedVersion;
     private final int latestSupportedVersion;
+    private int errCode;
     private List<TaskId> activeTasks;
     private Map<TaskId, Set<TopicPartition>> standbyTasks;
     private Map<HostInfo, Set<TopicPartition>> partitionsByHost;
@@ -55,27 +56,29 @@ public class AssignmentInfo {
                            final int latestSupportedVersion) {
         this.usedVersion = version;
         this.latestSupportedVersion = latestSupportedVersion;
+        this.errCode = 0;
     }
 
     public AssignmentInfo(final List<TaskId> activeTasks,
                           final Map<TaskId, Set<TopicPartition>> standbyTasks,
                           final Map<HostInfo, Set<TopicPartition>> hostState) {
-        this(LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState);
+        this(LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState, 0);
     }
 
     public AssignmentInfo() {
         this(LATEST_SUPPORTED_VERSION,
             Collections.emptyList(),
             Collections.emptyMap(),
-            Collections.emptyMap());
+            Collections.emptyMap(),
+            0);
     }
 
     public AssignmentInfo(final int version,
                           final List<TaskId> activeTasks,
                           final Map<TaskId, Set<TopicPartition>> standbyTasks,
-                          final Map<HostInfo, Set<TopicPartition>> hostState) {
-        this(version, LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState);
-
+                          final Map<HostInfo, Set<TopicPartition>> hostState,
+                          final int errCode) {
+        this(version, LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState, errCode);
         if (version < 1 || version > LATEST_SUPPORTED_VERSION) {
             throw new IllegalArgumentException("version must be between 1 and " + LATEST_SUPPORTED_VERSION
                 + "; was: " + version);
@@ -87,18 +90,24 @@ public class AssignmentInfo {
                    final int latestSupportedVersion,
                    final List<TaskId> activeTasks,
                    final Map<TaskId, Set<TopicPartition>> standbyTasks,
-                   final Map<HostInfo, Set<TopicPartition>> hostState) {
+                   final Map<HostInfo, Set<TopicPartition>> hostState,
+                   final int errCode) {
         this.usedVersion = version;
         this.latestSupportedVersion = latestSupportedVersion;
         this.activeTasks = activeTasks;
         this.standbyTasks = standbyTasks;
         this.partitionsByHost = hostState;
+        this.errCode = errCode;
     }
 
     public int version() {
         return usedVersion;
     }
 
+    public int errCode() {
+        return errCode;
+    }
+
     public int latestSupportedVersion() {
         return latestSupportedVersion;
     }
@@ -133,6 +142,9 @@ public class AssignmentInfo {
                 case 3:
                     encodeVersionThree(out);
                     break;
+                case 4:
+                    encodeVersionFour(out);
+                    break;
                 default:
                     throw new IllegalStateException("Unknown metadata version: " + usedVersion
                         + "; latest supported version: " + LATEST_SUPPORTED_VERSION);
@@ -203,6 +215,14 @@ public class AssignmentInfo {
         encodePartitionsByHost(out);
     }
 
+    private void encodeVersionFour(final DataOutputStream out) throws IOException {
+        out.writeInt(4);
+        out.writeInt(LATEST_SUPPORTED_VERSION);
+        encodeActiveAndStandbyTaskAssignment(out);
+        encodePartitionsByHost(out);
+        out.writeInt(errCode);
+    }
+
     /**
      * @throws TaskAssignmentException if method fails to decode the data or if the data version is unknown
      */
@@ -214,6 +234,7 @@ public class AssignmentInfo {
             final AssignmentInfo assignmentInfo;
 
             final int usedVersion = in.readInt();
+            int latestSupportedVersion;
             switch (usedVersion) {
                 case 1:
                     assignmentInfo = new AssignmentInfo(usedVersion, UNKNOWN);
@@ -224,10 +245,15 @@ public class AssignmentInfo {
                     decodeVersionTwoData(assignmentInfo, in);
                     break;
                 case 3:
-                    final int latestSupportedVersion = in.readInt();
+                    latestSupportedVersion = in.readInt();
                     assignmentInfo = new AssignmentInfo(usedVersion, latestSupportedVersion);
                     decodeVersionThreeData(assignmentInfo, in);
                     break;
+                case 4:
+                    latestSupportedVersion = in.readInt();
+                    assignmentInfo = new AssignmentInfo(usedVersion, latestSupportedVersion);
+                    decodeVersionFourData(assignmentInfo, in);
+                    break;
                 default:
                     final TaskAssignmentException fatalException = new TaskAssignmentException("Unable to decode assignment data: " +
                         "used version: " + usedVersion + "; latest supported version: " + LATEST_SUPPORTED_VERSION);
@@ -300,9 +326,16 @@ public class AssignmentInfo {
         decodeGlobalAssignmentData(assignmentInfo, in);
     }
 
+    private static void decodeVersionFourData(final AssignmentInfo assignmentInfo,
+                                              final DataInputStream in) throws IOException {
+        decodeVersionThreeData(assignmentInfo, in);
+        assignmentInfo.errCode = in.readInt();
+    }
+
     @Override
     public int hashCode() {
-        return usedVersion ^ latestSupportedVersion ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ partitionsByHost.hashCode();
+        return usedVersion ^ latestSupportedVersion ^ activeTasks.hashCode() ^ standbyTasks.hashCode()
+            ^ partitionsByHost.hashCode() ^ errCode;
     }
 
     @Override
@@ -311,6 +344,7 @@ public class AssignmentInfo {
             final AssignmentInfo other = (AssignmentInfo) o;
             return usedVersion == other.usedVersion &&
                     latestSupportedVersion == other.latestSupportedVersion &&
+                    errCode == other.errCode &&
                     activeTasks.equals(other.activeTasks) &&
                     standbyTasks.equals(other.standbyTasks) &&
                     partitionsByHost.equals(other.partitionsByHost);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index 4ebc956..b4ad19f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -32,7 +32,7 @@ public class SubscriptionInfo {
 
     private static final Logger log = LoggerFactory.getLogger(SubscriptionInfo.class);
 
-    public static final int LATEST_SUPPORTED_VERSION = 3;
+    public static final int LATEST_SUPPORTED_VERSION = 4;
     static final int UNKNOWN = -1;
 
     private final int usedVersion;
@@ -124,6 +124,9 @@ public class SubscriptionInfo {
             case 3:
                 buf = encodeVersionThree();
                 break;
+            case 4:
+                buf = encodeVersionFour();
+                break;
             default:
                 throw new IllegalStateException("Unknown metadata version: " + usedVersion
                     + "; latest supported version: " + LATEST_SUPPORTED_VERSION);
@@ -205,7 +208,7 @@ public class SubscriptionInfo {
     private ByteBuffer encodeVersionThree() {
         final byte[] endPointBytes = prepareUserEndPoint();
 
-        final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeByteLength(endPointBytes));
+        final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeAndFourByteLength(endPointBytes));
 
         buf.putInt(3); // used version
         buf.putInt(LATEST_SUPPORTED_VERSION); // supported version
@@ -217,7 +220,22 @@ public class SubscriptionInfo {
         return buf;
     }
 
-    protected int getVersionThreeByteLength(final byte[] endPointBytes) {
+    private ByteBuffer encodeVersionFour() {
+        final byte[] endPointBytes = prepareUserEndPoint();
+
+        final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeAndFourByteLength(endPointBytes));
+
+        buf.putInt(4); // used version
+        buf.putInt(LATEST_SUPPORTED_VERSION); // supported version
+        encodeClientUUID(buf);
+        encodeTasks(buf, prevTasks);
+        encodeTasks(buf, standbyTasks);
+        encodeUserEndPoint(buf, endPointBytes);
+
+        return buf;
+    }
+
+    protected int getVersionThreeAndFourByteLength(final byte[] endPointBytes) {
         return 4 + // used version
                4 + // latest supported version version
                16 + // client ID
@@ -247,6 +265,7 @@ public class SubscriptionInfo {
                 decodeVersionTwoData(subscriptionInfo, data);
                 break;
             case 3:
+            case 4:
                 latestSupportedVersion = data.getInt();
                 subscriptionInfo = new SubscriptionInfo(usedVersion, latestSupportedVersion);
                 decodeVersionThreeData(subscriptionInfo, data);
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsWrapper.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsWrapper.java
new file mode 100644
index 0000000..9dd1fc1
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsWrapper.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import java.util.Properties;
+
+import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+
+/**
+ *  This class allows to access the {@link KafkaStreams} a {@link StreamThread.StateListener} object.
+ *
+ */
+public class KafkaStreamsWrapper extends KafkaStreams {
+
+    public KafkaStreamsWrapper(final Topology topology,
+                               final Properties props) {
+        super(topology, props);
+    }
+
+    /**
+     * An app can set a single {@link StreamThread.StateListener} so that the app is notified when state changes.
+     *
+     * @param listener a new StreamThread state listener
+     * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}.
+     */
+    public void setStreamThreadStateListener(final StreamThread.StateListener listener) {
+        if (state == State.CREATED) {
+            for (final StreamThread thread : threads) {
+                thread.setStateListener(listener);
+            }
+        } else {
+            throw new IllegalStateException("Can only set StateListener in CREATED state. " +
+                "Current state is: " + state);
+        }
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 2ab6639..a0b8f3d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -37,6 +37,8 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import scala.Option;
@@ -61,6 +63,32 @@ public class IntegrationTestUtils {
     public static final long DEFAULT_TIMEOUT = 30 * 1000L;
     public static final String INTERNAL_LEAVE_GROUP_ON_CLOSE = "internal.leave.group.on.close";
 
+    /*
+     * Records state transition for StreamThread
+     */
+    public static class StateListenerStub implements StreamThread.StateListener {
+        boolean runningToRevokedSeen = false;
+        boolean revokedToPendingShutdownSeen = false;
+        @Override
+        public void onChange(final Thread thread,
+                             final ThreadStateTransitionValidator newState,
+                             final ThreadStateTransitionValidator oldState) {
+            if (oldState == StreamThread.State.RUNNING && newState == StreamThread.State.PARTITIONS_REVOKED) {
+                runningToRevokedSeen = true;
+            } else if (oldState == StreamThread.State.PARTITIONS_REVOKED && newState == StreamThread.State.PENDING_SHUTDOWN) {
+                revokedToPendingShutdownSeen = true;
+            }
+        }
+
+        public boolean revokedToPendingShutdownSeen() {
+            return revokedToPendingShutdownSeen;
+        }
+
+        public boolean runningToRevokedSeen() {
+            return runningToRevokedSeen;
+        }
+    }
+
     /**
      * Removes local state stores.  Useful to reset state in-between integration test runs.
      *
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
index de197fe..b1c3684 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
@@ -91,16 +91,14 @@ public class DefaultPartitionGrouperTest {
         assertEquals(expectedPartitionsForTask, grouper.partitionGroups(topicGroups, metadata));
     }
 
-    @Test
+    @Test(expected = RuntimeException.class)
     public void shouldNotCreateAnyTasksBecauseOneTopicHasUnknownPartitions() {
         final PartitionGrouper grouper = new DefaultPartitionGrouper();
-        final Map<TaskId, Set<TopicPartition>> expectedPartitionsForTask = new HashMap<>();
         final Map<Integer, Set<String>> topicGroups = new HashMap<>();
-
+    
         final int topicGroupId = 0;
-
+    
         topicGroups.put(topicGroupId, mkSet("topic1", "unknownTopic", "topic2"));
-
-        assertEquals(expectedPartitionsForTask, grouper.partitionGroups(topicGroups, metadata));
+        grouper.partitionGroups(topicGroups, metadata);
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
index b8221d8..a71f488 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
@@ -46,7 +46,7 @@ public class CopartitionedTopicsValidatorTest {
         partitions.put(new TopicPartition("second", 1), new PartitionInfo("second", 1, null, null, null));
     }
 
-    @Test(expected = TopologyException.class)
+    @Test(expected = IllegalStateException.class)
     public void shouldThrowTopologyBuilderExceptionIfNoPartitionsFoundForCoPartitionedTopic() {
         validator.validate(Collections.singleton("topic"),
                            Collections.<String, StreamsPartitionAssignor.InternalTopicMetadata>emptyMap(),
@@ -98,27 +98,6 @@ public class CopartitionedTopicsValidatorTest {
         assertThat(three.numPartitions, equalTo(15));
     }
 
-    @Test
-    public void shouldSetRepartitionTopicsPartitionCountToNotAvailableIfAnyNotAvaliable() {
-        final StreamsPartitionAssignor.InternalTopicMetadata one = createTopicMetadata("one", 1);
-        final StreamsPartitionAssignor.InternalTopicMetadata two = createTopicMetadata("two", StreamsPartitionAssignor.NOT_AVAILABLE);
-        final Map<String, StreamsPartitionAssignor.InternalTopicMetadata> repartitionTopicConfig = new HashMap<>();
-
-        repartitionTopicConfig.put(one.config.name(), one);
-        repartitionTopicConfig.put(two.config.name(), two);
-
-        validator.validate(Utils.mkSet("first",
-                                       "second",
-                                       one.config.name(),
-                                       two.config.name()),
-                           repartitionTopicConfig,
-                           cluster.withPartitions(partitions));
-
-        assertThat(one.numPartitions, equalTo(StreamsPartitionAssignor.NOT_AVAILABLE));
-        assertThat(two.numPartitions, equalTo(StreamsPartitionAssignor.NOT_AVAILABLE));
-
-    }
-
     private StreamsPartitionAssignor.InternalTopicMetadata createTopicMetadata(final String repartitionTopic,
                                                                                final int partitions) {
         final InternalTopicConfig repartitionTopicConfig
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 2ccc893..93d4e94 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -82,7 +82,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.util.Collections.singletonList;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -305,7 +305,7 @@ public class StreamThreadTest {
             internalTopologyBuilder,
             clientId,
             new LogContext(""),
-            new AtomicBoolean()
+            new AtomicInteger()
         );
         thread.maybeCommit(mockTime.milliseconds());
         mockTime.sleep(commitInterval - 10L);
@@ -339,7 +339,7 @@ public class StreamThreadTest {
             internalTopologyBuilder,
             clientId,
             new LogContext(""),
-            new AtomicBoolean()
+            new AtomicInteger()
         );
         thread.maybeCommit(mockTime.milliseconds());
         mockTime.sleep(commitInterval - 10L);
@@ -374,7 +374,7 @@ public class StreamThreadTest {
             internalTopologyBuilder,
             clientId,
             new LogContext(""),
-            new AtomicBoolean()
+            new AtomicInteger()
         );
         thread.maybeCommit(mockTime.milliseconds());
         mockTime.sleep(commitInterval + 1);
@@ -523,7 +523,7 @@ public class StreamThreadTest {
             internalTopologyBuilder,
             clientId,
             new LogContext(""),
-            new AtomicBoolean()
+            new AtomicInteger()
         );
         thread.setStateListener(
             new StreamThread.StateListener() {
@@ -560,7 +560,7 @@ public class StreamThreadTest {
             internalTopologyBuilder,
             clientId,
             new LogContext(""),
-            new AtomicBoolean()
+            new AtomicInteger()
         );
         thread.shutdown();
         EasyMock.verify(taskManager);
@@ -588,7 +588,7 @@ public class StreamThreadTest {
             internalTopologyBuilder,
             clientId,
             new LogContext(""),
-            new AtomicBoolean()
+            new AtomicInteger()
         );
         thread.shutdown();
         // Execute the run method. Verification of the mock will check that shutdown was only done once
@@ -1288,7 +1288,8 @@ public class StreamThreadTest {
                 internalTopologyBuilder,
                 clientId,
                 new LogContext(""),
-                new AtomicBoolean());
+                new AtomicInteger()
+                );
         final MetricName testMetricName = new MetricName("test_metric", "", "", new HashMap<String, String>());
         final Metric testMetric = new KafkaMetric(
                 new Object(),
@@ -1331,7 +1332,8 @@ public class StreamThreadTest {
                 internalTopologyBuilder,
                 clientId,
                 new LogContext(""),
-                new AtomicBoolean());
+                new AtomicInteger()
+                );
         final MetricName testMetricName = new MetricName("test_metric", "", "", new HashMap<String, String>());
         final Metric testMetric = new KafkaMetric(
                 new Object(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 4327e8f..2577bb8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -58,7 +58,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.not;
@@ -122,7 +122,7 @@ public class StreamsPartitionAssignorTest {
         configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
         configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, userEndPoint);
         configurationMap.put(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, taskManager);
-        configurationMap.put(StreamsConfig.InternalConfig.VERSION_PROBING_FLAG, new AtomicBoolean());
+        configurationMap.put(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE, new AtomicInteger());
         return configurationMap;
     }
 
@@ -993,20 +993,9 @@ public class StreamsPartitionAssignorTest {
 
         final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, subscriptions);
 
-        final Map<String, Integer> expectedCreatedInternalTopics = new HashMap<>();
-        expectedCreatedInternalTopics.put(applicationId + "-count-repartition", 3);
-        expectedCreatedInternalTopics.put(applicationId + "-count-changelog", 3);
-        assertThat(mockInternalTopicManager.readyTopics, equalTo(expectedCreatedInternalTopics));
+        assertThat(mockInternalTopicManager.readyTopics.isEmpty(), equalTo(true));
 
-        final List<TopicPartition> expectedAssignment = Arrays.asList(
-            new TopicPartition("topic1", 0),
-            new TopicPartition("topic1", 1),
-            new TopicPartition("topic1", 2),
-            new TopicPartition(applicationId + "-count-repartition", 0),
-            new TopicPartition(applicationId + "-count-repartition", 1),
-            new TopicPartition(applicationId + "-count-repartition", 2)
-        );
-        assertThat(new HashSet<>(assignment.get(client).partitions()), equalTo(new HashSet<>(expectedAssignment)));
+        assertThat(assignment.get(client).partitions().isEmpty(), equalTo(true));
     }
 
     @Test
@@ -1109,29 +1098,29 @@ public class StreamsPartitionAssignorTest {
     }
 
     @Test
-    public void shouldThrowKafkaExceptionVersionProbingFlagNotConfigured() {
+    public void shouldThrowKafkaExceptionAssignmentErrorCodeNotConfigured() {
         final Map<String, Object> config = configProps();
-        config.remove(StreamsConfig.InternalConfig.VERSION_PROBING_FLAG);
+        config.remove(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE);
 
         try {
             partitionAssignor.configure(config);
             fail("Should have thrown KafkaException");
         } catch (final KafkaException expected) {
-            assertThat(expected.getMessage(), equalTo("VersionProbingFlag is not specified"));
+            assertThat(expected.getMessage(), equalTo("assignmentErrorCode is not specified"));
         }
     }
 
     @Test
-    public void shouldThrowKafkaExceptionIfVersionProbingFlagConfigIsNotAtomicBoolean() {
+    public void shouldThrowKafkaExceptionIfVersionProbingFlagConfigIsNotAtomicInteger() {
         final Map<String, Object> config = configProps();
-        config.put(StreamsConfig.InternalConfig.VERSION_PROBING_FLAG, "i am not an AtomicBoolean");
+        config.put(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE, "i am not an AtomicInteger");
 
         try {
             partitionAssignor.configure(config);
             fail("Should have thrown KafkaException");
         } catch (final KafkaException expected) {
             assertThat(expected.getMessage(),
-                equalTo("java.lang.String is not an instance of java.util.concurrent.atomic.AtomicBoolean"));
+                equalTo("java.lang.String is not an instance of java.util.concurrent.atomic.AtomicInteger"));
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
index c7382e7..8b99065 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
@@ -59,33 +59,39 @@ public class AssignmentInfoTest {
 
     @Test(expected = IllegalArgumentException.class)
     public void shouldThrowForUnknownVersion1() {
-        new AssignmentInfo(0, activeTasks, standbyTasks, globalAssignment);
+        new AssignmentInfo(0, activeTasks, standbyTasks, globalAssignment, 0);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void shouldThrowForUnknownVersion2() {
-        new AssignmentInfo(AssignmentInfo.LATEST_SUPPORTED_VERSION + 1, activeTasks, standbyTasks, globalAssignment);
+        new AssignmentInfo(AssignmentInfo.LATEST_SUPPORTED_VERSION + 1, activeTasks, standbyTasks, globalAssignment, 0);
     }
 
     @Test
     public void shouldEncodeAndDecodeVersion1() {
-        final AssignmentInfo info = new AssignmentInfo(1, activeTasks, standbyTasks, globalAssignment);
-        final AssignmentInfo expectedInfo = new AssignmentInfo(1, AssignmentInfo.UNKNOWN, activeTasks, standbyTasks, Collections.<HostInfo, Set<TopicPartition>>emptyMap());
+        final AssignmentInfo info = new AssignmentInfo(1, activeTasks, standbyTasks, globalAssignment, 0);
+        final AssignmentInfo expectedInfo = new AssignmentInfo(1, AssignmentInfo.UNKNOWN, activeTasks, standbyTasks, Collections.<HostInfo, Set<TopicPartition>>emptyMap(), 0);
         assertEquals(expectedInfo, AssignmentInfo.decode(info.encode()));
     }
 
     @Test
     public void shouldEncodeAndDecodeVersion2() {
-        final AssignmentInfo info = new AssignmentInfo(2, activeTasks, standbyTasks, globalAssignment);
-        final AssignmentInfo expectedInfo = new AssignmentInfo(2, AssignmentInfo.UNKNOWN, activeTasks, standbyTasks, globalAssignment);
+        final AssignmentInfo info = new AssignmentInfo(2, activeTasks, standbyTasks, globalAssignment, 0);
+        final AssignmentInfo expectedInfo = new AssignmentInfo(2, AssignmentInfo.UNKNOWN, activeTasks, standbyTasks, globalAssignment, 0);
         assertEquals(expectedInfo, AssignmentInfo.decode(info.encode()));
     }
 
     @Test
     public void shouldEncodeAndDecodeVersion3() {
-        final AssignmentInfo info = new AssignmentInfo(3, activeTasks, standbyTasks, globalAssignment);
-        final AssignmentInfo expectedInfo = new AssignmentInfo(3, AssignmentInfo.LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, globalAssignment);
+        final AssignmentInfo info = new AssignmentInfo(3, activeTasks, standbyTasks, globalAssignment, 0);
+        final AssignmentInfo expectedInfo = new AssignmentInfo(3, AssignmentInfo.LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, globalAssignment, 0);
         assertEquals(expectedInfo, AssignmentInfo.decode(info.encode()));
     }
 
+    @Test
+    public void shouldEncodeAndDecodeVersion4() {
+        final AssignmentInfo info = new AssignmentInfo(4, activeTasks, standbyTasks, globalAssignment, 2);
+        final AssignmentInfo expectedInfo = new AssignmentInfo(4, AssignmentInfo.LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, globalAssignment, 2);
+        assertEquals(expectedInfo, AssignmentInfo.decode(info.encode()));
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
index 0611bfc..2a75c57 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
@@ -77,6 +77,13 @@ public class SubscriptionInfoTest {
     }
 
     @Test
+    public void shouldEncodeAndDecodeVersion4() {
+        final SubscriptionInfo info = new SubscriptionInfo(4, processId, activeTasks, standbyTasks, "localhost:80");
+        final SubscriptionInfo expectedInfo = new SubscriptionInfo(4, SubscriptionInfo.LATEST_SUPPORTED_VERSION, processId, activeTasks, standbyTasks, "localhost:80");
+        assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode()));
+    }
+
+    @Test
     public void shouldAllowToDecodeFutureSupportedVersion() {
         final SubscriptionInfo info = SubscriptionInfo.decode(encodeFutureVersion());
         assertEquals(SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1, info.version());
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index 1b01a73..33e9b97 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -274,7 +274,7 @@ public class StreamsUpgradeTest {
         private ByteBuffer encodeFutureVersion() {
             final byte[] endPointBytes = prepareUserEndPoint();
 
-            final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeByteLength(endPointBytes));
+            final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeAndFourByteLength(endPointBytes));
 
             buf.putInt(LATEST_SUPPORTED_VERSION + 1); // used version
             buf.putInt(LATEST_SUPPORTED_VERSION + 1); // supported version
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestBase.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestBase.scala
new file mode 100644
index 0000000..78ed591
--- /dev/null
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestBase.scala
@@ -0,0 +1,137 @@
+/*
+ * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2017-2018 Alexis Seigneurin.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala
+
+import java.util.Properties
+
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.serialization._
+import org.apache.kafka.common.utils.MockTime
+import org.apache.kafka.streams._
+import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, IntegrationTestUtils}
+import org.apache.kafka.streams.processor.internals.StreamThread
+import org.apache.kafka.streams.scala.ImplicitConversions._
+import org.apache.kafka.streams.scala.kstream._
+import org.apache.kafka.test.TestUtils
+import org.junit.Assert._
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.scalatest.junit.JUnitSuite
+
+/**
+ * Test suite base that prepares Kafka cluster for stream-table joins in Kafka Streams
+ * <p>
+ */
+class StreamToTableJoinScalaIntegrationTestBase extends JUnitSuite with StreamToTableJoinTestData {
+
+  private val privateCluster: EmbeddedKafkaCluster = new EmbeddedKafkaCluster(1)
+
+  @Rule def cluster: EmbeddedKafkaCluster = privateCluster
+
+  final val alignedTime = (System.currentTimeMillis() / 1000 + 1) * 1000
+  val mockTime: MockTime = cluster.time
+  mockTime.setCurrentTimeMs(alignedTime)
+
+  val tFolder: TemporaryFolder = new TemporaryFolder(TestUtils.tempDirectory())
+  @Rule def testFolder: TemporaryFolder = tFolder
+
+  @Before
+  def startKafkaCluster(): Unit = {
+    cluster.createTopic(userClicksTopic)
+    cluster.createTopic(userRegionsTopic)
+    cluster.createTopic(outputTopic)
+    cluster.createTopic(userClicksTopicJ)
+    cluster.createTopic(userRegionsTopicJ)
+    cluster.createTopic(outputTopicJ)
+  }
+
+  def getStreamsConfiguration(): Properties = {
+    val streamsConfiguration: Properties = new Properties()
+
+    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-table-join-scala-integration-test")
+    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
+    streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "1000")
+    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+    streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot.getPath)
+
+    streamsConfiguration
+  }
+
+  private def getUserRegionsProducerConfig(): Properties = {
+    val p = new Properties()
+    p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
+    p.put(ProducerConfig.ACKS_CONFIG, "all")
+    p.put(ProducerConfig.RETRIES_CONFIG, "0")
+    p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
+    p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
+    p
+  }
+
+  private def getUserClicksProducerConfig(): Properties = {
+    val p = new Properties()
+    p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
+    p.put(ProducerConfig.ACKS_CONFIG, "all")
+    p.put(ProducerConfig.RETRIES_CONFIG, "0")
+    p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
+    p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[LongSerializer])
+    p
+  }
+
+  private def getConsumerConfig(): Properties = {
+    val p = new Properties()
+    p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
+    p.put(ConsumerConfig.GROUP_ID_CONFIG, "join-scala-integration-test-standard-consumer")
+    p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+    p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
+    p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[LongDeserializer])
+    p
+  }
+
+  def produceNConsume(userClicksTopic: String,
+                              userRegionsTopic: String,
+                              outputTopic: String,
+                              waitTillRecordsReceived: Boolean = true): java.util.List[KeyValue[String, Long]] = {
+
+    import collection.JavaConverters._
+
+    // Publish user-region information.
+    val userRegionsProducerConfig: Properties = getUserRegionsProducerConfig()
+    IntegrationTestUtils.produceKeyValuesSynchronously(userRegionsTopic,
+                                                       userRegions.asJava,
+                                                       userRegionsProducerConfig,
+                                                       mockTime,
+                                                       false)
+
+    // Publish user-click information.
+    val userClicksProducerConfig: Properties = getUserClicksProducerConfig()
+    IntegrationTestUtils.produceKeyValuesSynchronously(userClicksTopic,
+                                                       userClicks.asJava,
+                                                       userClicksProducerConfig,
+                                                       mockTime,
+                                                       false)
+
+    if (waitTillRecordsReceived) {
+      // consume and verify result
+      val consumerConfig = getConsumerConfig()
+
+          IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, outputTopic, expectedClicksPerRegion.size)
+    } else {
+      java.util.Collections.emptyList()
+    }
+  }
+}
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
index 7891131..e5253f9 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
@@ -24,6 +24,7 @@ import org.apache.kafka.common.serialization._
 import org.apache.kafka.common.utils.MockTime
 import org.apache.kafka.streams._
 import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, IntegrationTestUtils}
+import org.apache.kafka.streams.processor.internals.StreamThread
 import org.apache.kafka.streams.scala.ImplicitConversions._
 import org.apache.kafka.streams.scala.kstream._
 import org.apache.kafka.test.TestUtils
@@ -41,28 +42,7 @@ import org.scalatest.junit.JUnitSuite
  * Note: In the current project settings SAM type conversion is turned off as it's experimental in Scala 2.11.
  * Hence the native Java API based version is more verbose.
  */
-class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite with StreamToTableJoinTestData {
-
-  private val privateCluster: EmbeddedKafkaCluster = new EmbeddedKafkaCluster(1)
-
-  @Rule def cluster: EmbeddedKafkaCluster = privateCluster
-
-  final val alignedTime = (System.currentTimeMillis() / 1000 + 1) * 1000
-  val mockTime: MockTime = cluster.time
-  mockTime.setCurrentTimeMs(alignedTime)
-
-  val tFolder: TemporaryFolder = new TemporaryFolder(TestUtils.tempDirectory())
-  @Rule def testFolder: TemporaryFolder = tFolder
-
-  @Before
-  def startKafkaCluster(): Unit = {
-    cluster.createTopic(userClicksTopic)
-    cluster.createTopic(userRegionsTopic)
-    cluster.createTopic(outputTopic)
-    cluster.createTopic(userClicksTopicJ)
-    cluster.createTopic(userRegionsTopicJ)
-    cluster.createTopic(outputTopicJ)
-  }
+class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJoinScalaIntegrationTestBase {
 
   @Test def testShouldCountClicksPerRegion(): Unit = {
 
@@ -101,7 +81,6 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite wit
 
     val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] =
       produceNConsume(userClicksTopic, userRegionsTopic, outputTopic)
-
     streams.close()
 
     import collection.JavaConverters._
@@ -172,74 +151,4 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite wit
     streams.close()
     assertEquals(actualClicksPerRegion.asScala.sortBy(_.key), expectedClicksPerRegion.sortBy(_.key))
   }
-
-  private def getStreamsConfiguration(): Properties = {
-    val streamsConfiguration: Properties = new Properties()
-
-    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-table-join-scala-integration-test")
-    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
-    streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "1000")
-    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
-    streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot.getPath)
-
-    streamsConfiguration
-  }
-
-  private def getUserRegionsProducerConfig(): Properties = {
-    val p = new Properties()
-    p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
-    p.put(ProducerConfig.ACKS_CONFIG, "all")
-    p.put(ProducerConfig.RETRIES_CONFIG, "0")
-    p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
-    p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
-    p
-  }
-
-  private def getUserClicksProducerConfig(): Properties = {
-    val p = new Properties()
-    p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
-    p.put(ProducerConfig.ACKS_CONFIG, "all")
-    p.put(ProducerConfig.RETRIES_CONFIG, "0")
-    p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
-    p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[LongSerializer])
-    p
-  }
-
-  private def getConsumerConfig(): Properties = {
-    val p = new Properties()
-    p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
-    p.put(ConsumerConfig.GROUP_ID_CONFIG, "join-scala-integration-test-standard-consumer")
-    p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
-    p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
-    p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[LongDeserializer])
-    p
-  }
-
-  private def produceNConsume(userClicksTopic: String,
-                              userRegionsTopic: String,
-                              outputTopic: String): java.util.List[KeyValue[String, Long]] = {
-
-    import collection.JavaConverters._
-
-    // Publish user-region information.
-    val userRegionsProducerConfig: Properties = getUserRegionsProducerConfig()
-    IntegrationTestUtils.produceKeyValuesSynchronously(userRegionsTopic,
-                                                       userRegions.asJava,
-                                                       userRegionsProducerConfig,
-                                                       mockTime,
-                                                       false)
-
-    // Publish user-click information.
-    val userClicksProducerConfig: Properties = getUserClicksProducerConfig()
-    IntegrationTestUtils.produceKeyValuesSynchronously(userClicksTopic,
-                                                       userClicks.asJava,
-                                                       userClicksProducerConfig,
-                                                       mockTime,
-                                                       false)
-
-    // consume and verify result
-    val consumerConfig = getConsumerConfig()
-
-    IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, outputTopic, expectedClicksPerRegion.size)
-  }
 }
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinWithIncompleteMetadataIntegrationTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinWithIncompleteMetadataIntegrationTest.scala
new file mode 100644
index 0000000..f5a098b
--- /dev/null
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinWithIncompleteMetadataIntegrationTest.scala
@@ -0,0 +1,88 @@
+/*
+ * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2017-2018 Alexis Seigneurin.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala
+
+import java.util.Properties
+
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.serialization._
+import org.apache.kafka.common.utils.MockTime
+import org.apache.kafka.streams._
+import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, IntegrationTestUtils}
+import org.apache.kafka.streams.processor.internals.StreamThread
+import org.apache.kafka.streams.scala.ImplicitConversions._
+import org.apache.kafka.streams.scala.kstream._
+import org.apache.kafka.test.TestUtils
+import org.junit.Assert._
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.scalatest.junit.JUnitSuite
+
+/**
+ * Test suite that verifies the shutdown of StreamThread when metadata is incomplete during stream-table joins in Kafka Streams
+ * <p>
+ */
+class StreamToTableJoinWithIncompleteMetadataIntegrationTest extends StreamToTableJoinScalaIntegrationTestBase {
+
+  @Test def testShouldAutoShutdownOnIncompleteMetadata(): Unit = {
+
+    // DefaultSerdes brings into scope implicit serdes (mostly for primitives) that will set up all Serialized, Produced,
+    // Consumed and Joined instances. So all APIs below that accept Serialized, Produced, Consumed or Joined will
+    // get these instances automatically
+    import Serdes._
+
+    val streamsConfiguration: Properties = getStreamsConfiguration()
+
+    val builder = new StreamsBuilder()
+
+    val userClicksStream: KStream[String, Long] = builder.stream(userClicksTopic)
+
+    val userRegionsTable: KTable[String, String] = builder.table(userRegionsTopic+"1")
+
+    // Compute the total per region by summing the individual click counts per region.
+    val clicksPerRegion: KTable[String, Long] =
+      userClicksStream
+
+      // Join the stream against the table.
+        .leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN" else region, clicks))
+
+        // Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
+        .map((_, regionWithClicks) => regionWithClicks)
+
+        // Compute the total per region by summing the individual click counts per region.
+        .groupByKey
+        .reduce(_ + _)
+
+    // Write the (continuously updating) results to the output topic.
+    clicksPerRegion.toStream.to(outputTopic)
+
+    val streams: KafkaStreamsWrapper = new KafkaStreamsWrapper(builder.build(), streamsConfiguration)
+    val listener = new IntegrationTestUtils.StateListenerStub()
+    streams.setStreamThreadStateListener(listener)
+    streams.start()
+
+    val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] =
+      produceNConsume(userClicksTopic, userRegionsTopic, outputTopic, false)
+    while (!listener.revokedToPendingShutdownSeen()) {
+      Thread.sleep(3)
+    }
+    streams.close()
+    assertEquals(listener.runningToRevokedSeen(), true)
+    assertEquals(listener.revokedToPendingShutdownSeen(), true)
+  }
+}
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index 4113467..4d7215f 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -510,22 +510,22 @@ class StreamsUpgradeTest(Test):
                     monitors[first_other_processor] = first_other_monitor
                     monitors[second_other_processor] = second_other_monitor
 
-                    leader_monitor.wait_until("Received a future (version probing) subscription (version: 4). Sending empty assignment back (with supported version 3).",
+                    leader_monitor.wait_until("Received a future (version probing) subscription (version: 5). Sending empty assignment back (with supported version 4).",
                                               timeout_sec=60,
                                               err_msg="Could not detect 'version probing' attempt at leader " + str(self.leader.node.account))
 
                     if len(self.old_processors) > 0:
-                        log_monitor.wait_until("Sent a version 4 subscription and got version 3 assignment back (successful version probing). Downgrading subscription metadata to received version and trigger new rebalance.",
+                        log_monitor.wait_until("Sent a version 5 subscription and got version 4 assignment back (successful version probing). Downgrading subscription metadata to received version and trigger new rebalance.",
                                                timeout_sec=60,
                                                err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account))
                     else:
-                        log_monitor.wait_until("Sent a version 4 subscription and got version 3 assignment back (successful version probing). Setting subscription metadata to leaders supported version 4 and trigger new rebalance.",
+                        log_monitor.wait_until("Sent a version 5 subscription and got version 4 assignment back (successful version probing). Setting subscription metadata to leaders supported version 5 and trigger new rebalance.",
                                                timeout_sec=60,
                                                err_msg="Could not detect 'successful version probing with upgraded leader' at upgrading node " + str(node.account))
-                        first_other_monitor.wait_until("Sent a version 3 subscription and group leader.s latest supported version is 4. Upgrading subscription metadata version to 4 for next rebalance.",
+                        first_other_monitor.wait_until("Sent a version 4 subscription and group leader.s latest supported version is 5. Upgrading subscription metadata version to 5 for next rebalance.",
                                                        timeout_sec=60,
                                                        err_msg="Never saw output 'Upgrade metadata to version 4' on" + str(first_other_node.account))
-                        second_other_monitor.wait_until("Sent a version 3 subscription and group leader.s latest supported version is 4. Upgrading subscription metadata version to 4 for next rebalance.",
+                        second_other_monitor.wait_until("Sent a version 4 subscription and group leader.s latest supported version is 5. Upgrading subscription metadata version to 5 for next rebalance.",
                                                         timeout_sec=60,
                                                         err_msg="Never saw output 'Upgrade metadata to version 4' on" + str(second_other_node.account))
 
@@ -553,6 +553,6 @@ class StreamsUpgradeTest(Test):
 
     def verify_metadata_no_upgraded_yet(self):
         for p in self.processors:
-            found = list(p.node.account.ssh_capture("grep \"Sent a version 3 subscription and group leader.s latest supported version is 4. Upgrading subscription metadata version to 4 for next rebalance.\" " + p.LOG_FILE, allow_fail=True))
+            found = list(p.node.account.ssh_capture("grep \"Sent a version 4 subscription and group leader.s latest supported version is 5. Upgrading subscription metadata version to 5 for next rebalance.\" " + p.LOG_FILE, allow_fail=True))
             if len(found) > 0:
                 raise Exception("Kafka Streams failed with 'group member upgraded to metadata 4 too early'")


Mime
View raw message