kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject [4/5] kafka git commit: KAFKA-5702; extract refactor StreamThread
Date Fri, 11 Aug 2017 11:14:10 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/3e69ce80/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index c157531..099682b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -40,20 +40,18 @@ import org.apache.kafka.common.metrics.stats.Sum;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KafkaClientSupplier;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskIdFormatException;
 import org.apache.kafka.streams.processor.PartitionGrouper;
 import org.apache.kafka.streams.processor.StateRestoreListener;
-import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -64,14 +62,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.regex.Pattern;
 
 import static java.util.Collections.singleton;
 
-public class StreamThread extends Thread {
+public class StreamThread extends Thread implements ThreadDataProvider {
 
     private static final Logger log = LoggerFactory.getLogger(StreamThread.class);
     private static final AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1);
@@ -157,60 +152,48 @@ public class StreamThread extends Thread {
         void onChange(final Thread thread, final ThreadStateTransitionValidator newState, final ThreadStateTransitionValidator oldState);
     }
 
-    private class RebalanceListener implements ConsumerRebalanceListener {
-        private final Time time;
-        private final int requestTimeOut;
 
-        RebalanceListener(final Time time, final int requestTimeOut) {
+    static class RebalanceListener implements ConsumerRebalanceListener {
+        private final Time time;
+        private final TaskManager taskManager;
+        private final StreamThread streamThread;
+        private final String logPrefix;
+
+        RebalanceListener(final Time time,
+                          final TaskManager taskManager,
+                          final StreamThread streamThread,
+                          final String logPrefix) {
             this.time = time;
-            this.requestTimeOut = requestTimeOut;
+            this.taskManager = taskManager;
+            this.streamThread = streamThread;
+            this.logPrefix = logPrefix;
         }
 
         @Override
         public void onPartitionsAssigned(final Collection<TopicPartition> assignment) {
-            log.debug("{} at state {}: new partitions {} assigned at the end of consumer rebalance.\n" +
-                    "\tassigned active tasks: {}\n" +
-                    "\tassigned standby tasks: {}\n" +
-                    "\tcurrent suspended active tasks: {}\n" +
-                    "\tcurrent suspended standby tasks: {}\n" +
-                    "\tprevious active tasks: {}",
-                logPrefix,
-                state,
-                assignment,
-                partitionAssignor.activeTasks().keySet(),
-                partitionAssignor.standbyTasks().keySet(),
-                suspendedTasks.keySet(),
-                suspendedStandbyTasks.keySet(),
-                prevActiveTasks);
-
             final long start = time.milliseconds();
             try {
-                storeChangelogReader =
-                    new StoreChangelogReader(getName(), restoreConsumer, time, requestTimeOut,
-                                             globalStateRestoreListener);
-                setState(State.ASSIGNING_PARTITIONS);
-                // do this first as we may have suspended standby tasks that
-                // will become active or vice versa
-                closeNonAssignedSuspendedStandbyTasks();
-                closeNonAssignedSuspendedTasks();
-                addStreamTasks(assignment, start);
-                storeChangelogReader.restore();
-                addStandbyTasks(start);
-                streamsMetadataState.onChange(partitionAssignor.getPartitionsByHostState(), partitionAssignor.clusterMetadata());
-                setState(State.RUNNING);
+                streamThread.setState(State.ASSIGNING_PARTITIONS);
+                taskManager.createTasks(assignment);
+                final RuntimeException exception = streamThread.unAssignChangeLogPartitions();
+                if (exception != null) {
+                    throw exception;
+                }
+                streamThread.refreshMetadataState();
+                streamThread.setState(State.RUNNING);
             } catch (final Throwable t) {
-                rebalanceException = t;
+                streamThread.setRebalanceException(t);
                 throw t;
             } finally {
                 log.info("{} partition assignment took {} ms.\n" +
-                        "\tcurrent active tasks: {}\n" +
-                        "\tcurrent standby tasks: {}\n" +
-                        "\tprevious active tasks: {}\n",
-                    logPrefix,
-                    time.milliseconds() - start,
-                    activeTasks.keySet(),
-                    standbyTasks.keySet(),
-                    prevActiveTasks);
+                                 "\tcurrent active tasks: {}\n" +
+                                 "\tcurrent standby tasks: {}\n" +
+                                 "\tprevious active tasks: {}\n",
+                         logPrefix,
+                         time.milliseconds() - start,
+                         taskManager.activeTaskIds(),
+                         taskManager.standbyTaskIds(),
+                         taskManager.prevActiveTaskIds());
             }
         }
 
@@ -220,39 +203,74 @@ public class StreamThread extends Thread {
                     "\tcurrent assigned active tasks: {}\n" +
                     "\tcurrent assigned standby tasks: {}\n",
                 logPrefix,
-                state,
+                streamThread.state,
                 assignment,
-                activeTasks.keySet(), standbyTasks.keySet());
+                taskManager.activeTaskIds(),
+                taskManager.standbyTaskIds());
 
             final long start = time.milliseconds();
             try {
-                setState(State.PARTITIONS_REVOKED);
+                streamThread.setState(State.PARTITIONS_REVOKED);
                 // suspend active tasks
-                suspendTasksAndState();
+                taskManager.suspendTasksAndState();
             } catch (final Throwable t) {
-                rebalanceException = t;
+                streamThread.setRebalanceException(t);
                 throw t;
             } finally {
-                streamsMetadataState.onChange(Collections.<HostInfo, Set<TopicPartition>>emptyMap(), partitionAssignor.clusterMetadata());
-                removeStreamTasks();
-                removeStandbyTasks();
+                streamThread.refreshMetadataState();
+                taskManager.removeTasks();
+                streamThread.clearStandbyRecords();
 
                 log.info("{} partition revocation took {} ms.\n" +
                         "\tsuspended active tasks: {}\n" +
                         "\tsuspended standby tasks: {}",
                     logPrefix,
                     time.milliseconds() - start,
-                    suspendedTasks.keySet(),
-                    suspendedStandbyTasks.keySet());
+                    taskManager.suspendedActiveTaskIds(),
+                    taskManager.suspendedStandbyTaskIds());
             }
         }
     }
 
-    abstract class AbstractTaskCreator {
+
+    static abstract class AbstractTaskCreator {
         final static long MAX_BACKOFF_TIME_MS = 1000L;
-        void retryWithBackoff(final Map<TaskId, Set<TopicPartition>> tasksToBeCreated, final long start) {
+        private final long rebalanceTimeoutMs;
+        final String applicationId;
+        final InternalTopologyBuilder builder;
+        final StreamsConfig config;
+        final StreamsMetrics streamsMetrics;
+        final StateDirectory stateDirectory;
+        final Sensor taskCreatedSensor;
+        final ChangelogReader storeChangelogReader;
+        final Time time;
+        final String logPrefix;
+
+        AbstractTaskCreator(final InternalTopologyBuilder builder,
+                            final StreamsConfig config,
+                            final StreamsMetrics streamsMetrics,
+                            final StateDirectory stateDirectory,
+                            final Sensor taskCreatedSensor,
+                            final ChangelogReader storeChangelogReader,
+                            final Time time,
+                            final long rebalanceTimeoutMs,
+                            final String logPrefix) {
+            this.applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
+            this.builder = builder;
+            this.config = config;
+            this.streamsMetrics = streamsMetrics;
+            this.stateDirectory = stateDirectory;
+            this.taskCreatedSensor = taskCreatedSensor;
+            this.storeChangelogReader = storeChangelogReader;
+            this.time = time;
+            this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+            this.logPrefix = logPrefix;
+        }
+
+        Map<Task, Set<TopicPartition>> retryWithBackoff(final Consumer<byte[], byte[]> consumer, final Map<TaskId, Set<TopicPartition>> tasksToBeCreated, final long start) {
             long backoffTimeMs = 50L;
             final Set<TaskId> retryingTasks = new HashSet<>();
+            final Map<Task, Set<TopicPartition>> createdTasks = new HashMap<>();
             while (true) {
                 final Iterator<Map.Entry<TaskId, Set<TopicPartition>>> it = tasksToBeCreated.entrySet().iterator();
                 while (it.hasNext()) {
@@ -261,7 +279,11 @@ public class StreamThread extends Thread {
                     final Set<TopicPartition> partitions = newTaskAndPartitions.getValue();
 
                     try {
-                        createTask(taskId, partitions);
+                        final Task task = createTask(consumer, taskId, partitions);
+                        if (task != null) {
+                            log.trace("{} Created task {} with assigned partitions {}", logPrefix, taskId, partitions);
+                            createdTasks.put(task, partitions);
+                        }
                         it.remove();
                         backoffTimeMs = 50L;
                         retryingTasks.remove(taskId);
@@ -286,48 +308,138 @@ public class StreamThread extends Thread {
                     // ignore
                 }
             }
+            return createdTasks;
         }
 
-        abstract void createTask(final TaskId id, final Set<TopicPartition> partitions);
+        abstract Task createTask(final Consumer<byte[], byte[]> consumer, final TaskId id, final Set<TopicPartition> partitions);
+
+        public void close() {}
     }
 
-    class TaskCreator extends AbstractTaskCreator {
+    static class TaskCreator extends AbstractTaskCreator {
+        private final ThreadCache cache;
+        private final KafkaClientSupplier clientSupplier;
+        private final String threadClientId;
+        private final Producer<byte[], byte[]> threadProducer;
+
+
+        TaskCreator(final InternalTopologyBuilder builder,
+                    final StreamsConfig config,
+                    final StreamsMetrics streamsMetrics,
+                    final StateDirectory stateDirectory,
+                    final Sensor taskCreatedSensor,
+                    final ChangelogReader storeChangelogReader,
+                    final ThreadCache cache,
+                    final Time time,
+                    final KafkaClientSupplier clientSupplier,
+                    final Producer<byte[], byte[]> threadProducer,
+                    final String threadClientId,
+                    final long rebalanceTimeoutMs,
+                    final String logPrefix) {
+            super(
+                    builder,
+                  config,
+                  streamsMetrics,
+                  stateDirectory,
+                  taskCreatedSensor,
+                  storeChangelogReader,
+                  time,
+                  rebalanceTimeoutMs,
+                  logPrefix);
+            this.cache = cache;
+            this.clientSupplier = clientSupplier;
+            this.threadProducer = threadProducer;
+            this.threadClientId = threadClientId;
+        }
+
         @Override
-        void createTask(final TaskId taskId, final Set<TopicPartition> partitions) {
-            final StreamTask task = createStreamTask(taskId, partitions);
+        StreamTask createTask(final Consumer<byte[], byte[]> consumer, final TaskId taskId, final Set<TopicPartition> partitions) {
+            taskCreatedSensor.record();
+
+            return new StreamTask(
+                    taskId,
+                    applicationId,
+                    partitions,
+                    builder.build(taskId.topicGroupId),
+                    consumer,
+                    storeChangelogReader,
+                    config,
+                    streamsMetrics,
+                    stateDirectory,
+                    cache,
+                    time,
+                    createProducer(taskId));
 
-            activeTasks.put(taskId, task);
+        }
 
-            for (final TopicPartition partition : partitions) {
-                activeTasksByPartition.put(partition, task);
+        @Override
+        public void close() {
+            if (threadProducer != null) {
+                try {
+                    threadProducer.close();
+                } catch (final Throwable e) {
+                    log.error("{} Failed to close producer due to the following error:", logPrefix, e);
+                }
             }
         }
-    }
 
-    class StandbyTaskCreator extends AbstractTaskCreator {
-        private final Map<TopicPartition, Long> checkpointedOffsets;
+        private Producer<byte[], byte[]> createProducer(final TaskId id) {
+            // eos
+            if (threadProducer == null) {
+                final Map<String, Object> producerConfigs = config.getProducerConfigs(threadClientId + "-" + id);
+                log.info("{} Creating producer client for task {}", logPrefix, id);
+                producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, applicationId + "-" + id);
+                return clientSupplier.getProducer(producerConfigs);
+            }
+
+            return threadProducer;
 
-        StandbyTaskCreator(final Map<TopicPartition, Long> checkpointedOffsets) {
-            this.checkpointedOffsets = checkpointedOffsets;
+        }
+    }
+
+    static class StandbyTaskCreator extends AbstractTaskCreator {
+        StandbyTaskCreator(final InternalTopologyBuilder builder,
+                           final StreamsConfig config,
+                           final StreamsMetrics streamsMetrics,
+                           final StateDirectory stateDirectory,
+                           final Sensor taskCreatedSensor,
+                           final ChangelogReader storeChangelogReader,
+                           final Time time,
+                           final long rebalanceTimeoutMs,
+                           final String logPrefix) {
+            super(builder,
+                  config,
+                  streamsMetrics,
+                  stateDirectory,
+                  taskCreatedSensor,
+                  storeChangelogReader,
+                  time,
+                  rebalanceTimeoutMs,
+                  logPrefix);
         }
 
         @Override
-        void createTask(final TaskId taskId, final Set<TopicPartition> partitions) {
-            final StandbyTask task = createStandbyTask(taskId, partitions);
-            updateStandByTaskMaps(checkpointedOffsets, taskId, partitions, task);
+        StandbyTask createTask(final Consumer<byte[], byte[]> consumer, final TaskId taskId, final Set<TopicPartition> partitions) {
+            taskCreatedSensor.record();
+
+            final ProcessorTopology topology = builder.build(taskId.topicGroupId);
+
+            if (!topology.stateStores().isEmpty()) {
+                return new StandbyTask(taskId, applicationId, partitions, topology, consumer, storeChangelogReader, config, streamsMetrics, stateDirectory);
+            } else {
+                log.trace("{} Skipped standby task {} with assigned partitions {} since it does not have any state stores to materialize", logPrefix, taskId, partitions);
+
+                return null;
+            }
         }
-    }
 
-    interface StreamTaskAction {
-        String name();
-        void apply(final StreamTask task);
     }
 
     /**
      * This class extends {@link StreamsMetricsImpl(Metrics, String, String, Map)} and
      * overrides one of its functions for efficiency
      */
-    private class StreamsMetricsThreadImpl extends StreamsMetricsImpl {
+    static class StreamsMetricsThreadImpl extends StreamsMetricsImpl {
         final Sensor commitTimeSensor;
         final Sensor pollTimeSensor;
         final Sensor processTimeSensor;
@@ -369,12 +481,6 @@ public class StreamThread extends Thread {
 
         }
 
-
-        @Override
-        public void recordLatency(final Sensor sensor, final long startNs, final long endNs) {
-            sensor.record(endNs - startNs, timerStartedMs);
-        }
-
         void removeAllSensors() {
             removeSensor(commitTimeSensor);
             removeSensor(pollTimeSensor);
@@ -388,93 +494,69 @@ public class StreamThread extends Thread {
     }
 
 
-    private volatile State state = State.CREATED;
     private final Object stateLock = new Object();
-
-    private StreamThread.StateListener stateListener = null;
-    final PartitionGrouper partitionGrouper;
     private final StreamsMetadataState streamsMetadataState;
-    public final String applicationId;
-    public final String clientId;
-    public final UUID processId;
-
-    protected final StreamsConfig config;
-    protected final InternalTopologyBuilder builder;
-    Producer<byte[], byte[]> threadProducer;
-    private final KafkaClientSupplier clientSupplier;
-    protected final Consumer<byte[], byte[]> consumer;
-    final Consumer<byte[], byte[]> restoreConsumer;
-
     private final String logPrefix;
-    private final String threadClientId;
-    private final Pattern sourceTopicPattern;
-    private final Map<TaskId, StreamTask> activeTasks;
-    private final Map<TaskId, StandbyTask> standbyTasks;
-    private final Map<TopicPartition, StreamTask> activeTasksByPartition;
-    private final Map<TopicPartition, StandbyTask> standbyTasksByPartition;
-    private final Set<TaskId> prevActiveTasks;
-    private final Map<TaskId, StreamTask> suspendedTasks;
-    private final Map<TaskId, StandbyTask> suspendedStandbyTasks;
+    private final TaskManager taskManager;
     private final Time time;
-    private final int rebalanceTimeoutMs;
     private final long pollTimeMs;
     private final long commitTimeMs;
+    private final PartitionGrouper partitionGrouper;
+    private final UUID processId;
+    private final StateDirectory stateDirectory;
     private final StreamsMetricsThreadImpl streamsMetrics;
-    // TODO: this is not private only for tests, should be better refactored
-    final StateDirectory stateDirectory;
+
+    private long lastCommitMs;
     private String originalReset;
-    private StreamPartitionAssignor partitionAssignor;
+    private ThreadMetadataProvider metadataProvider;
+    private boolean processStandbyRecords = false;
+    private Throwable rebalanceException;
+    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords = new HashMap<>();
+    private StreamThread.StateListener stateListener;
+    private volatile State state = State.CREATED;
     private long timerStartedMs;
-    private long lastCommitMs;
-    private Throwable rebalanceException = null;
-    private final boolean eosEnabled;
 
-    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords;
-    private boolean processStandbyRecords = false;
+    final StreamsConfig config;
+    final ConsumerRebalanceListener rebalanceListener;
+    final Consumer<byte[], byte[]> restoreConsumer;
 
-    private final ThreadCache cache;
-    private StoreChangelogReader storeChangelogReader;
+    protected final Consumer<byte[], byte[]> consumer;
+    protected final InternalTopologyBuilder builder;
 
-    private final TaskCreator taskCreator = new TaskCreator();
+    public final String applicationId;
+    public final String clientId;
 
-    final ConsumerRebalanceListener rebalanceListener;
-    private StateRestoreListener globalStateRestoreListener;
     private final static int UNLIMITED_RECORDS = -1;
 
     public StreamThread(final InternalTopologyBuilder builder,
-                        final StreamsConfig config,
-                        final KafkaClientSupplier clientSupplier,
-                        final String applicationId,
                         final String clientId,
+                        final String threadClientId,
+                        final StreamsConfig config,
                         final UUID processId,
-                        final Metrics metrics,
                         final Time time,
                         final StreamsMetadataState streamsMetadataState,
-                        final long cacheSizeBytes,
+                        final TaskManager taskManager,
+                        final StreamsMetricsThreadImpl streamsMetrics,
+                        final KafkaClientSupplier clientSupplier,
+                        final Consumer<byte[], byte[]> restoreConsumer,
                         final StateDirectory stateDirectory) {
-        super(clientId + "-StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
-        this.applicationId = applicationId;
-        this.config = config;
+        super(threadClientId);
         this.builder = builder;
-        this.clientSupplier = clientSupplier;
-        sourceTopicPattern = builder.sourceTopicPattern();
         this.clientId = clientId;
+        this.applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
+        this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
+        this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
         this.processId = processId;
-        partitionGrouper = config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
+        this.time = time;
         this.streamsMetadataState = streamsMetadataState;
-        threadClientId = getName();
-        logPrefix = String.format("stream-thread [%s]", threadClientId);
-
-        streamsMetrics = new StreamsMetricsThreadImpl(metrics, "stream-metrics", "thread." + threadClientId,
-            Collections.singletonMap("client-id", threadClientId));
-        if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) < 0) {
-            log.warn("{} Negative cache size passed in thread. Reverting to cache size of 0 bytes", logPrefix);
-        }
-        cache = new ThreadCache(threadClientId, cacheSizeBytes, streamsMetrics);
-        eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
-
-
-        // set the consumer clients
+        this.taskManager = taskManager;
+        this.logPrefix = logPrefix(threadClientId);
+        this.streamsMetrics = streamsMetrics;
+        this.restoreConsumer = restoreConsumer;
+        this.stateDirectory = stateDirectory;
+        this.rebalanceListener = new RebalanceListener(time, taskManager, this, logPrefix);
+        this.config = config;
+        this.partitionGrouper = config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
         log.info("{} Creating consumer client", logPrefix);
         final Map<String, Object> consumerConfigs = config.getConsumerConfigs(this, applicationId, threadClientId);
 
@@ -483,34 +565,101 @@ public class StreamThread extends Thread {
             log.info("{} Custom offset resets specified updating configs original auto offset reset {}", logPrefix, originalReset);
             consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
         }
+        this.consumer = clientSupplier.getConsumer(consumerConfigs);
+        taskManager.setConsumer(consumer);
+    }
+
+    @SuppressWarnings("ConstantConditions")
+    public static StreamThread create(final InternalTopologyBuilder builder,
+                                      final StreamsConfig config,
+                                      final KafkaClientSupplier clientSupplier,
+                                      final UUID processId,
+                                      final String clientId,
+                                      final Metrics metrics,
+                                      final Time time,
+                                      final StreamsMetadataState streamsMetadataState,
+                                      final long cacheSizeBytes,
+                                      final StateDirectory stateDirectory,
+                                      final StateRestoreListener stateRestoreListener) {
+
+        final String threadClientId = clientId + "-StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement();
+        final StreamsMetricsThreadImpl streamsMetrics = new StreamsMetricsThreadImpl(metrics,
+                                                                                     "stream-metrics",
+                                                                                     "thread." + threadClientId,
+                                                                                     Collections.singletonMap("client-id",
+                                                                                                              threadClientId));
+
+        final String logPrefix = logPrefix(threadClientId);
+        if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) < 0) {
+            log.warn("{} Negative cache size passed in thread. Reverting to cache size of 0 bytes", logPrefix);
+        }
+        final ThreadCache cache = new ThreadCache(threadClientId, cacheSizeBytes, streamsMetrics);
+
+        final boolean eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
 
-        consumer = clientSupplier.getConsumer(consumerConfigs);
         log.info("{} Creating restore consumer client", logPrefix);
-        restoreConsumer = clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(threadClientId));
-        // initialize the task list
-        // activeTasks needs to be concurrent as it can be accessed
-        // by QueryableState
-        activeTasks = new ConcurrentHashMap<>();
-        standbyTasks = new HashMap<>();
-        activeTasksByPartition = new HashMap<>();
-        standbyTasksByPartition = new HashMap<>();
-        prevActiveTasks = new HashSet<>();
-        suspendedTasks = new HashMap<>();
-        suspendedStandbyTasks = new HashMap<>();
-
-        // standby KTables
-        standbyRecords = new HashMap<>();
+        final Map<String, Object> consumerConfigs = config.getRestoreConsumerConfigs(threadClientId);
+        final Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(consumerConfigs);
+
 
-        this.stateDirectory = stateDirectory;
         final Object maxPollInterval = consumerConfigs.get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
-        rebalanceTimeoutMs =  (Integer) ConfigDef.parseType(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval, Type.INT);
-        pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
-        commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
+        final int rebalanceTimeoutMs = (Integer) ConfigDef.parseType(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval, Type.INT);
+
+        final StoreChangelogReader changelogReader = new StoreChangelogReader(threadClientId,
+                                                                              restoreConsumer,
+                                                                              time,
+                                                                              config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
+                                                                              stateRestoreListener);
+
+        Producer<byte[], byte[]> threadProducer = null;
+        if (!eosEnabled) {
+            final Map<String, Object> producerConfigs = config.getProducerConfigs(threadClientId);
+            log.info("{} Creating shared producer client", logPrefix);
+            threadProducer = clientSupplier.getProducer(producerConfigs);
+        }
+
+        final AbstractTaskCreator activeTaskCreator = new TaskCreator(builder,
+                                                                      config,
+                                                                      streamsMetrics,
+                                                                      stateDirectory,
+                                                                      streamsMetrics.taskCreatedSensor,
+                                                                      changelogReader,
+                                                                      cache,
+                                                                      time,
+                                                                      clientSupplier,
+                                                                      threadProducer,
+                                                                      threadClientId,
+                                                                      rebalanceTimeoutMs,
+                                                                      logPrefix);
+        final AbstractTaskCreator standbyTaskCreator = new StandbyTaskCreator(builder,
+                                                                              config,
+                                                                              streamsMetrics,
+                                                                              stateDirectory,
+                                                                              streamsMetrics.taskCreatedSensor,
+                                                                              changelogReader,
+                                                                              time,
+                                                                              rebalanceTimeoutMs,
+                                                                              logPrefix);
+        final TaskManager taskManager = new TaskManager(changelogReader, time, logPrefix, restoreConsumer, activeTaskCreator, standbyTaskCreator);
+
+        return new StreamThread(builder,
+                                clientId,
+                                threadClientId,
+                                config,
+                                processId,
+                                time,
+                                streamsMetadataState,
+                                taskManager,
+                                streamsMetrics,
+                                clientSupplier,
+                                restoreConsumer,
+                                stateDirectory);
 
-        this.time = time;
-        timerStartedMs = time.milliseconds();
-        lastCommitMs = timerStartedMs;
-        rebalanceListener = new RebalanceListener(time, config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
+
+    }
+
+    private static String logPrefix(final String threadClientId) {
+        return String.format("stream-thread [%s]", threadClientId);
     }
 
     /**
@@ -540,22 +689,26 @@ public class StreamThread extends Thread {
         }
     }
 
+    void setRebalanceException(final Throwable rebalanceException) {
+        this.rebalanceException = rebalanceException;
+    }
+
     /**
      * Main event loop for polling, and processing records through topologies.
      */
     private void runLoop() {
         long recordsProcessedBeforeCommit = UNLIMITED_RECORDS;
-        consumer.subscribe(sourceTopicPattern, rebalanceListener);
-
+        consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener);
+        lastCommitMs = time.milliseconds();
         while (stillRunning()) {
             timerStartedMs = time.milliseconds();
 
             // try to fetch some records if necessary
             final ConsumerRecords<byte[], byte[]> records = pollRequests();
-            if (records != null && !records.isEmpty() && !activeTasks.isEmpty()) {
+            if (records != null && !records.isEmpty() && taskManager.hasActiveTasks()) {
                 streamsMetrics.pollTimeSensor.record(computeLatency(), timerStartedMs);
                 addRecordsToTasks(records);
-                final long totalProcessed = processAndPunctuateStreamTime(activeTasks, recordsProcessedBeforeCommit);
+                final long totalProcessed = processAndPunctuateStreamTime(taskManager.activeTasks(), recordsProcessedBeforeCommit);
                 if (totalProcessed > 0) {
                     final long processLatency = computeLatency();
                     streamsMetrics.processTimeSensor.record(processLatency / (double) totalProcessed,
@@ -646,7 +799,7 @@ public class StreamThread extends Thread {
             int numAddedRecords = 0;
 
             for (final TopicPartition partition : records.partitions()) {
-                final StreamTask task = activeTasksByPartition.get(partition);
+                final Task task = taskManager.activeTask(partition);
                 numAddedRecords += task.addRecords(partition, records.records(partition));
             }
             streamsMetrics.skippedRecordsSensor.record(records.count() - numAddedRecords, timerStartedMs);
@@ -661,7 +814,7 @@ public class StreamThread extends Thread {
      *                                     if UNLIMITED_RECORDS, then commit is never called
      * @return Number of records processed since last commit.
      */
-    private long processAndPunctuateStreamTime(final Map<TaskId, StreamTask> tasks,
+    private long processAndPunctuateStreamTime(final Map<TaskId, Task> tasks,
                                                final long recordsProcessedBeforeCommit) {
 
         long totalProcessedEachRound;
@@ -670,9 +823,9 @@ public class StreamThread extends Thread {
         // until no task has any records left
         do {
             totalProcessedEachRound = 0;
-            final Iterator<Map.Entry<TaskId, StreamTask>> it = tasks.entrySet().iterator();
+            final Iterator<Map.Entry<TaskId, Task>> it = tasks.entrySet().iterator();
             while (it.hasNext()) {
-                final StreamTask task = it.next().getValue();
+                final Task task = it.next().getValue();
                 try {
                     // we processed one record,
                     // if more are buffered waiting for the next round
@@ -684,7 +837,7 @@ public class StreamThread extends Thread {
                         totalProcessedSinceLastMaybeCommit++;
                     }
                 } catch (final ProducerFencedException e) {
-                    closeZombieTask(task);
+                    taskManager.closeZombieTask(task);
                     it.remove();
                 }
             }
@@ -700,7 +853,7 @@ public class StreamThread extends Thread {
         } while (totalProcessedEachRound != 0);
 
         // go over the tasks again to punctuate or commit
-        final RuntimeException e = performOnStreamTasks(new StreamTaskAction() {
+        final RuntimeException e = taskManager.performOnActiveTasks(new TaskManager.TaskAction() {
             private String name;
             @Override
             public String name() {
@@ -708,7 +861,7 @@ public class StreamThread extends Thread {
             }
 
             @Override
-            public void apply(final StreamTask task) {
+            public void apply(final Task task) {
                 name = "punctuate";
                 maybePunctuateStreamTime(task);
                 if (task.commitNeeded()) {
@@ -733,7 +886,7 @@ public class StreamThread extends Thread {
         return totalProcessedSinceLastMaybeCommit;
     }
 
-    private void maybePunctuateStreamTime(final StreamTask task) {
+    private void maybePunctuateStreamTime(final Task task) {
         try {
             // check whether we should punctuate based on the task's partition group timestamp;
             // which are essentially based on record timestamp.
@@ -747,14 +900,14 @@ public class StreamThread extends Thread {
     }
 
     private void maybePunctuateSystemTime() {
-        final RuntimeException e = performOnStreamTasks(new StreamTaskAction() {
+        final RuntimeException e = taskManager.performOnActiveTasks(new TaskManager.TaskAction() {
             @Override
             public String name() {
                 return "punctuate";
             }
 
             @Override
-            public void apply(final StreamTask task) {
+            public void apply(final Task task) {
                 try {
                     // check whether we should punctuate based on system timestamp
                     if (task.maybePunctuateSystemTime()) {
@@ -810,14 +963,14 @@ public class StreamThread extends Thread {
         if (commitTimeMs >= 0 && lastCommitMs + commitTimeMs < now) {
             if (log.isTraceEnabled()) {
                 log.trace("{} Committing all active tasks {} and standby tasks {} since {}ms has elapsed (commit interval is {}ms)",
-                        logPrefix, activeTasks.keySet(), standbyTasks.keySet(), now - lastCommitMs, commitTimeMs);
+                        logPrefix, taskManager.activeTaskIds(), taskManager.standbyTaskIds(), now - lastCommitMs, commitTimeMs);
             }
 
             commitAll();
 
             if (log.isDebugEnabled()) {
                 log.info("{} Committed all active tasks {} and standby tasks {} in {}ms",
-                        logPrefix, activeTasks.keySet(), standbyTasks.keySet(), timerStartedMs - now);
+                        logPrefix,  taskManager.activeTaskIds(), taskManager.standbyTaskIds(), timerStartedMs - now);
             }
 
             lastCommitMs = now;
@@ -830,30 +983,32 @@ public class StreamThread extends Thread {
      * Commit the states of all its tasks
      */
     private void commitAll() {
-        final RuntimeException e = performOnStreamTasks(new StreamTaskAction() {
+        final TaskManager.TaskAction commitAction = new TaskManager.TaskAction() {
             @Override
             public String name() {
                 return "commit";
             }
 
             @Override
-            public void apply(final StreamTask task) {
+            public void apply(final Task task) {
                 commitOne(task);
             }
-        });
+        };
+        final RuntimeException e = taskManager.performOnActiveTasks(commitAction);
         if (e != null) {
             throw e;
         }
 
-        for (final StandbyTask task : standbyTasks.values()) {
-            commitOne(task);
+        final RuntimeException standbyTaskCommitException = taskManager.performOnStandbyTasks(commitAction);
+        if (standbyTaskCommitException != null) {
+            throw standbyTaskCommitException;
         }
     }
 
     /**
      * Commit the state of a task
      */
-    private void commitOne(final AbstractTask task) {
+    private void commitOne(final Task task) {
         try {
             task.commit();
         } catch (final CommitFailedException e) {
@@ -869,7 +1024,7 @@ public class StreamThread extends Thread {
     }
 
     private void maybeUpdateStandbyTasks(final long now) {
-        if (!standbyTasks.isEmpty()) {
+        if (taskManager.hasStandbyTasks()) {
             if (processStandbyRecords) {
                 if (!standbyRecords.isEmpty()) {
                     final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> remainingStandbyRecords = new HashMap<>();
@@ -878,7 +1033,7 @@ public class StreamThread extends Thread {
                         final TopicPartition partition = entry.getKey();
                         List<ConsumerRecord<byte[], byte[]>> remaining = entry.getValue();
                         if (remaining != null) {
-                            final StandbyTask task = standbyTasksByPartition.get(partition);
+                            final Task task = taskManager.standbyTask(partition);
                             remaining = task.update(partition, remaining);
                             if (remaining != null) {
                                 remainingStandbyRecords.put(partition, remaining);
@@ -890,7 +1045,7 @@ public class StreamThread extends Thread {
 
                     standbyRecords = remainingStandbyRecords;
 
-                    log.debug("{} Updated standby tasks {} in {}ms", logPrefix, standbyTasks.keySet(), time.milliseconds() - now);
+                    log.debug("{} Updated standby tasks {} in {}ms", logPrefix, taskManager.standbyTaskIds(), time.milliseconds() - now);
                 }
                 processStandbyRecords = false;
             }
@@ -899,7 +1054,7 @@ public class StreamThread extends Thread {
 
             if (!records.isEmpty()) {
                 for (final TopicPartition partition : records.partitions()) {
-                    final StandbyTask task = standbyTasksByPartition.get(partition);
+                    final Task task = taskManager.standbyTask(partition);
 
                     if (task == null) {
                         throw new StreamsException(logPrefix + " Missing standby task for partition " + partition);
@@ -950,15 +1105,25 @@ public class StreamThread extends Thread {
         }
     }
 
-    public Map<TaskId, StreamTask> tasks() {
-        return Collections.unmodifiableMap(activeTasks);
+    public Map<TaskId, Task> tasks() {
+        return Collections.unmodifiableMap(taskManager.activeTasks());
     }
 
     /**
      * Returns ids of tasks that were being executed before the rebalance.
      */
     public Set<TaskId> prevActiveTasks() {
-        return Collections.unmodifiableSet(prevActiveTasks);
+        return taskManager.prevActiveTaskIds();
+    }
+
+    @Override
+    public InternalTopologyBuilder builder() {
+        return builder;
+    }
+
+    @Override
+    public String name() {
+        return getName();
     }
 
     /**
@@ -991,6 +1156,21 @@ public class StreamThread extends Thread {
         return tasks;
     }
 
+    @Override
+    public UUID processId() {
+        return processId;
+    }
+
+    @Override
+    public StreamsConfig config() {
+        return config;
+    }
+
+    @Override
+    public PartitionGrouper partitionGrouper() {
+        return partitionGrouper;
+    }
+
     /**
      * Set the {@link StreamThread.StateListener} to be notified when state changes. Note this API is internal to
      * Kafka Streams and is not intended to be used by an external application.
@@ -1000,16 +1180,6 @@ public class StreamThread extends Thread {
     }
 
     /**
-     * Set the listener invoked at the beginning, end of batch updates and the conclusion of
-     * restoring a {@link StateStore}.
-     *
-     * @param globalStateRestoreListener  listener for capturing state store restoration status.
-     */
-    public void setGlobalStateRestoreListener(final StateRestoreListener globalStateRestoreListener) {
-        this.globalStateRestoreListener = globalStateRestoreListener;
-    }
-
-    /**
      * @return The state this instance is in
      */
     public State state() {
@@ -1065,6 +1235,7 @@ public class StreamThread extends Thread {
      * This is useful in debugging scenarios.
      * @return A string representation of the StreamThread instance.
      */
+    @SuppressWarnings("ThrowableNotThrown")
     public String toString(final String indent) {
         final StringBuilder sb = new StringBuilder()
             .append(indent).append("StreamsThread appId: ").append(applicationId).append("\n")
@@ -1072,46 +1243,41 @@ public class StreamThread extends Thread {
             .append(indent).append("\tStreamsThread threadId: ").append(getName()).append("\n");
 
         // iterate and print active tasks
-        if (activeTasks != null) {
-            sb.append(indent).append("\tActive tasks:\n");
-            for (final Map.Entry<TaskId, StreamTask> entry : activeTasks.entrySet()) {
-                final StreamTask task = entry.getValue();
-                sb.append(indent).append(task.toString(indent + "\t\t"));
+        final TaskManager.TaskAction printAction = new TaskManager.TaskAction() {
+            @Override
+            public String name() {
+                return "print";
             }
-        }
 
-        // iterate and print standby tasks
-        if (standbyTasks != null) {
-            sb.append(indent).append("\tStandby tasks:\n");
-            for (final StandbyTask task : standbyTasks.values()) {
+            @Override
+            public void apply(final Task task) {
                 sb.append(indent).append(task.toString(indent + "\t\t"));
             }
-            sb.append("\n");
-        }
+        };
 
+        sb.append(indent).append("\tActive tasks:\n");
+        taskManager.performOnActiveTasks(printAction);
+        sb.append(indent).append("\tStandby tasks:\n");
+        taskManager.performOnStandbyTasks(printAction);
         return sb.toString();
     }
 
     String threadClientId() {
-        return threadClientId;
+        return getName();
     }
 
-    void setPartitionAssignor(final StreamPartitionAssignor partitionAssignor) {
-        this.partitionAssignor = partitionAssignor;
+    public void setThreadMetadataProvider(final ThreadMetadataProvider metadataProvider) {
+        this.metadataProvider = metadataProvider;
+        taskManager.setThreadMetadataProvider(metadataProvider);
     }
 
     private void shutdown(final boolean cleanRun) {
         log.info("{} Shutting down", logPrefix);
-        shutdownTasksAndState(cleanRun);
+        taskManager.shutdown(cleanRun);
 
         // close all embedded clients
-        if (threadProducer != null) {
-            try {
-                threadProducer.close();
-            } catch (final Throwable e) {
-                log.error("{} Failed to close producer due to the following error:", logPrefix, e);
-            }
-        }
+        taskManager.closeProducer();
+
         try {
             consumer.close();
         } catch (final Throwable e) {
@@ -1122,105 +1288,13 @@ public class StreamThread extends Thread {
         } catch (final Throwable e) {
             log.error("{} Failed to close restore consumer due to the following error:", logPrefix, e);
         }
-        try {
-            partitionAssignor.close();
-        } catch (final Throwable e) {
-            log.error("{} Failed to close KafkaStreamClient due to the following error:", logPrefix, e);
-        }
-
-        removeStreamTasks();
-        removeStandbyTasks();
-
-        // clean up global tasks
 
+        taskManager.removeTasks();
         log.info("{} Stream thread shutdown complete", logPrefix);
         setState(State.DEAD);
         streamsMetrics.removeAllSensors();
     }
 
-    private void shutdownTasksAndState(final boolean cleanRun) {
-        log.debug("{} Shutting down all active tasks {}, standby tasks {}, suspended tasks {}, and suspended standby tasks {}",
-            logPrefix, activeTasks.keySet(), standbyTasks.keySet(),
-            suspendedTasks.keySet(), suspendedStandbyTasks.keySet());
-
-        for (final AbstractTask task : allTasks()) {
-            try {
-                task.close(cleanRun);
-            } catch (final RuntimeException e) {
-                log.error("{} Failed while closing {} {} due to the following error:",
-                    logPrefix,
-                    task.getClass().getSimpleName(),
-                    task.id(),
-                    e);
-            }
-        }
-
-        // remove the changelog partitions from restore consumer
-        unAssignChangeLogPartitions();
-    }
-
-    /**
-     * Similar to shutdownTasksAndState, however does not close the task managers, in the hope that
-     * soon the tasks will be assigned again
-     */
-    private void suspendTasksAndState()  {
-        log.debug("{} Suspending all active tasks {} and standby tasks {}",
-            logPrefix, activeTasks.keySet(), standbyTasks.keySet());
-
-        final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
-
-        firstException.compareAndSet(null, performOnStreamTasks(new StreamTaskAction() {
-            @Override
-            public String name() {
-                return "suspend";
-            }
-
-            @Override
-            public void apply(final StreamTask task) {
-                try {
-                    task.suspend();
-                } catch (final CommitFailedException e) {
-                    // commit failed during suspension. Just log it.
-                    log.warn("{} Failed to commit task {} state when suspending due to CommitFailedException", logPrefix, task.id);
-                } catch (final Exception e) {
-                    log.error("{} Suspending task {} failed due to the following error:", logPrefix, task.id, e);
-                    try {
-                        task.close(false);
-                    } catch (final Exception f) {
-                        log.error("{} After suspending failed, closing the same task {} failed again due to the following error:", logPrefix, task.id, f);
-                    }
-                    throw e;
-                }
-            }
-        }));
-
-        for (final StandbyTask task : standbyTasks.values()) {
-            try {
-                try {
-                    task.suspend();
-                } catch (final Exception e) {
-                    log.error("{} Suspending standby task {} failed due to the following error:", logPrefix, task.id, e);
-                    try {
-                        task.close(false);
-                    } catch (final Exception f) {
-                        log.error("{} After suspending failed, closing the same standby task {} failed again due to the following error:", logPrefix, task.id, f);
-                    }
-                    throw e;
-                }
-            } catch (final RuntimeException e) {
-                firstException.compareAndSet(null, e);
-            }
-        }
-
-        // remove the changelog partitions from restore consumer
-        firstException.compareAndSet(null, unAssignChangeLogPartitions());
-
-        updateSuspendedTasks();
-
-        if (firstException.get() != null) {
-            throw new StreamsException(logPrefix + " failed to suspend stream tasks", firstException.get());
-        }
-    }
 
     private RuntimeException unAssignChangeLogPartitions() {
         try {
@@ -1233,313 +1307,11 @@ public class StreamThread extends Thread {
         return null;
     }
 
-    private List<AbstractTask> allTasks() {
-        final List<AbstractTask> tasks = activeAndStandbytasks();
-        tasks.addAll(suspendedAndSuspendedStandbytasks());
-        return tasks;
-    }
-
-    private List<AbstractTask> activeAndStandbytasks() {
-        final List<AbstractTask> tasks = new ArrayList<AbstractTask>(activeTasks.values());
-        tasks.addAll(standbyTasks.values());
-        return tasks;
-    }
-
-    private List<AbstractTask> suspendedAndSuspendedStandbytasks() {
-        final List<AbstractTask> tasks = new ArrayList<AbstractTask>(suspendedTasks.values());
-        tasks.addAll(suspendedStandbyTasks.values());
-        return tasks;
-    }
-
-    private StreamTask findMatchingSuspendedTask(final TaskId taskId, final Set<TopicPartition> partitions) {
-        if (suspendedTasks.containsKey(taskId)) {
-            final StreamTask task = suspendedTasks.get(taskId);
-            if (task.partitions().equals(partitions)) {
-                return task;
-            }
-        }
-        return null;
-    }
-
-    private StandbyTask findMatchingSuspendedStandbyTask(final TaskId taskId, final Set<TopicPartition> partitions) {
-        if (suspendedStandbyTasks.containsKey(taskId)) {
-            final StandbyTask task = suspendedStandbyTasks.get(taskId);
-            if (task.partitions().equals(partitions)) {
-                return task;
-            }
-        }
-        return null;
-    }
-
-    private void closeNonAssignedSuspendedTasks() {
-        final Map<TaskId, Set<TopicPartition>> newTaskAssignment = partitionAssignor.activeTasks();
-        final Iterator<Map.Entry<TaskId, StreamTask>> suspendedTaskIterator = suspendedTasks.entrySet().iterator();
-        while (suspendedTaskIterator.hasNext()) {
-            final Map.Entry<TaskId, StreamTask> next = suspendedTaskIterator.next();
-            final StreamTask task = next.getValue();
-            final Set<TopicPartition> assignedPartitionsForTask = newTaskAssignment.get(next.getKey());
-            if (!task.partitions().equals(assignedPartitionsForTask)) {
-                log.debug("{} Closing suspended and not re-assigned task {}", logPrefix, task.id());
-                try {
-                    task.closeSuspended(true, null);
-                } catch (final Exception e) {
-                    log.error("{} Failed to close suspended task {} due to the following error:", logPrefix, next.getKey(), e);
-                } finally {
-                    suspendedTaskIterator.remove();
-                }
-            }
-        }
-    }
-
-    private void closeNonAssignedSuspendedStandbyTasks() {
-        final Set<TaskId> currentSuspendedTaskIds = partitionAssignor.standbyTasks().keySet();
-        final Iterator<Map.Entry<TaskId, StandbyTask>> standByTaskIterator = suspendedStandbyTasks.entrySet().iterator();
-        while (standByTaskIterator.hasNext()) {
-            final Map.Entry<TaskId, StandbyTask> suspendedTask = standByTaskIterator.next();
-            if (!currentSuspendedTaskIds.contains(suspendedTask.getKey())) {
-                final StandbyTask task = suspendedTask.getValue();
-                log.debug("{} Closing suspended and not re-assigned standby task {}", logPrefix, task.id());
-                try {
-                    task.close(true);
-                } catch (final Exception e) {
-                    log.error("{} Failed to remove suspended standby task {} due to the following error:", logPrefix, task.id(), e);
-                } finally {
-                    standByTaskIterator.remove();
-                }
-            }
-        }
-    }
-
-    // visible for testing
-    protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
-        streamsMetrics.taskCreatedSensor.record();
-
-        try {
-            return new StreamTask(
-                id,
-                applicationId,
-                partitions,
-                builder.build(id.topicGroupId),
-                consumer,
-                storeChangelogReader,
-                config,
-                streamsMetrics,
-                stateDirectory,
-                cache,
-                time,
-                createProducer(id));
-        } finally {
-            log.trace("{} Created active task {} with assigned partitions {}", logPrefix, id, partitions);
-        }
-    }
-
-    private Producer<byte[], byte[]> createProducer(final TaskId id) {
-
-        final Producer<byte[], byte[]> producer;
-        if (eosEnabled) {
-            final Map<String, Object> producerConfigs = config.getProducerConfigs(threadClientId + "-" + id);
-            log.info("{} Creating producer client for task {}", logPrefix, id);
-            producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, applicationId + "-" + id);
-            producer = clientSupplier.getProducer(producerConfigs);
-        } else {
-            if (threadProducer == null) {
-                final Map<String, Object> producerConfigs = config.getProducerConfigs(threadClientId);
-                log.info("{} Creating shared producer client", logPrefix);
-                threadProducer = clientSupplier.getProducer(producerConfigs);
-            }
-            producer = threadProducer;
-        }
-
-        return producer;
-    }
-
-    private void addStreamTasks(final Collection<TopicPartition> assignment, final long start) {
-        if (partitionAssignor == null) {
-            throw new IllegalStateException(logPrefix + " Partition assignor has not been initialized while adding stream tasks: this should not happen.");
-        }
-
-        final Map<TaskId, Set<TopicPartition>> newTasks = new HashMap<>();
-
-        // collect newly assigned tasks and reopen re-assigned tasks
-        log.debug("{} Adding assigned tasks as active: {}", logPrefix, partitionAssignor.activeTasks());
-        for (final Map.Entry<TaskId, Set<TopicPartition>> entry : partitionAssignor.activeTasks().entrySet()) {
-            final TaskId taskId = entry.getKey();
-            final Set<TopicPartition> partitions = entry.getValue();
-
-            if (assignment.containsAll(partitions)) {
-                try {
-                    final StreamTask task = findMatchingSuspendedTask(taskId, partitions);
-                    if (task != null) {
-                        suspendedTasks.remove(taskId);
-                        task.resume();
-
-                        activeTasks.put(taskId, task);
-
-                        for (final TopicPartition partition : partitions) {
-                            activeTasksByPartition.put(partition, task);
-                        }
-                    } else {
-                        newTasks.put(taskId, partitions);
-                    }
-                } catch (final StreamsException e) {
-                    log.error("{} Failed to create an active task {} due to the following error:", logPrefix, taskId, e);
-                    throw e;
-                }
-            } else {
-                log.warn("{} Task {} owned partitions {} are not contained in the assignment {}", logPrefix, taskId, partitions, assignment);
-            }
-        }
-
-        // create all newly assigned tasks (guard against race condition with other thread via backoff and retry)
-        // -> other thread will call removeSuspendedTasks(); eventually
-        log.trace("{} New active tasks to be created: {}", logPrefix, newTasks);
-
-        taskCreator.retryWithBackoff(newTasks, start);
-    }
-
-    // visible for testing
-    protected StandbyTask createStandbyTask(final TaskId id, final Collection<TopicPartition> partitions) {
-        streamsMetrics.taskCreatedSensor.record();
-
-        final ProcessorTopology topology = builder.build(id.topicGroupId);
-
-        if (!topology.stateStores().isEmpty()) {
-            try {
-                return new StandbyTask(id, applicationId, partitions, topology, consumer, storeChangelogReader, config, streamsMetrics, stateDirectory);
-            } finally {
-                log.trace("{} Created standby task {} with assigned partitions {}", logPrefix, id, partitions);
-            }
-        } else {
-            log.trace("{} Skipped standby task {} with assigned partitions {} since it does not have any state stores to materialize", logPrefix, id, partitions);
-
-            return null;
-        }
-    }
-
-    private void addStandbyTasks(final long start) {
-        if (partitionAssignor == null) {
-            throw new IllegalStateException(logPrefix + " Partition assignor has not been initialized while adding standby tasks: this should not happen.");
-        }
-
-        final Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>();
-
-        final Map<TaskId, Set<TopicPartition>> newStandbyTasks = new HashMap<>();
-
-        log.debug("{} Adding assigned standby tasks {}", logPrefix, partitionAssignor.standbyTasks());
-        // collect newly assigned standby tasks and reopen re-assigned standby tasks
-        for (final Map.Entry<TaskId, Set<TopicPartition>> entry : partitionAssignor.standbyTasks().entrySet()) {
-            final TaskId taskId = entry.getKey();
-            final Set<TopicPartition> partitions = entry.getValue();
-            final StandbyTask task = findMatchingSuspendedStandbyTask(taskId, partitions);
-
-            if (task != null) {
-                suspendedStandbyTasks.remove(taskId);
-                task.resume();
-            } else {
-                newStandbyTasks.put(taskId, partitions);
-            }
-
-            updateStandByTaskMaps(checkpointedOffsets, taskId, partitions, task);
-        }
-
-        // create all newly assigned standby tasks (guard against race condition with other thread via backoff and retry)
-        // -> other thread will call removeSuspendedStandbyTasks(); eventually
-        log.trace("{} New standby tasks to be created: {}", logPrefix, newStandbyTasks);
-
-        new StandbyTaskCreator(checkpointedOffsets).retryWithBackoff(newStandbyTasks, start);
-
-        restoreConsumer.assign(new ArrayList<>(checkpointedOffsets.keySet()));
-
-        for (final Map.Entry<TopicPartition, Long> entry : checkpointedOffsets.entrySet()) {
-            final TopicPartition partition = entry.getKey();
-            final long offset = entry.getValue();
-            if (offset >= 0) {
-                restoreConsumer.seek(partition, offset);
-            } else {
-                restoreConsumer.seekToBeginning(singleton(partition));
-            }
-        }
-    }
-
-    private void updateStandByTaskMaps(final Map<TopicPartition, Long> checkpointedOffsets,
-                                       final TaskId taskId,
-                                       final Set<TopicPartition> partitions,
-                                       final StandbyTask task) {
-        if (task != null) {
-            standbyTasks.put(taskId, task);
-            for (final TopicPartition partition : partitions) {
-                standbyTasksByPartition.put(partition, task);
-            }
-            // collect checked pointed offsets to position the restore consumer
-            // this include all partitions from which we restore states
-            for (final TopicPartition partition : task.checkpointedOffsets().keySet()) {
-                standbyTasksByPartition.put(partition, task);
-            }
-            checkpointedOffsets.putAll(task.checkpointedOffsets());
-        }
-    }
-
-    private void updateSuspendedTasks() {
-        suspendedTasks.clear();
-        suspendedTasks.putAll(activeTasks);
-        suspendedStandbyTasks.putAll(standbyTasks);
-    }
-
-    private void removeStreamTasks() {
-        log.debug("{} Removing all active tasks {}", logPrefix, activeTasks.keySet());
-
-        try {
-            prevActiveTasks.clear();
-            prevActiveTasks.addAll(activeTasks.keySet());
-
-            activeTasks.clear();
-            activeTasksByPartition.clear();
-        } catch (final Exception e) {
-            log.error("{} Failed to remove stream tasks due to the following error:", logPrefix, e);
-        }
-    }
-
-    private void removeStandbyTasks() {
-        log.debug("{} Removing all standby tasks {}", logPrefix, standbyTasks.keySet());
-
-        standbyTasks.clear();
-        standbyTasksByPartition.clear();
+    private void clearStandbyRecords() {
         standbyRecords.clear();
     }
 
-    private void closeZombieTask(final StreamTask task) {
-        log.warn("{} Producer of task {} fenced; closing zombie task", logPrefix, task.id);
-        try {
-            task.close(false);
-        } catch (final Exception e) {
-            log.warn("{} Failed to close zombie task due to {}, ignore and proceed", logPrefix, e);
-        }
-        activeTasks.remove(task.id);
-    }
-
-
-    private RuntimeException performOnStreamTasks(final StreamTaskAction action) {
-        RuntimeException firstException = null;
-        final Iterator<Map.Entry<TaskId, StreamTask>> it = activeTasks.entrySet().iterator();
-        while (it.hasNext()) {
-            final StreamTask task = it.next().getValue();
-            try {
-                action.apply(task);
-            } catch (final ProducerFencedException e) {
-                closeZombieTask(task);
-                it.remove();
-            } catch (final RuntimeException t) {
-                log.error("{} Failed to {} stream task {} due to the following error:",
-                    logPrefix,
-                    action.name(),
-                    task.id(),
-                    t);
-                if (firstException == null) {
-                    firstException = t;
-                }
-            }
-        }
-
-        return firstException;
+    private void refreshMetadataState() {
+        streamsMetadataState.onChange(metadataProvider.getPartitionsByHostState(), metadataProvider.clusterMetadata());
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e69ce80/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
index 3fd1613..f470b74 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
@@ -195,7 +195,7 @@ public class StreamsMetadataState {
      * @param currentState  the current mapping of {@link HostInfo} -> {@link TopicPartition}s
      * @param clusterMetadata    the current clusterMetadata {@link Cluster}
      */
-    public synchronized void onChange(final Map<HostInfo, Set<TopicPartition>> currentState, final Cluster clusterMetadata) {
+    synchronized void onChange(final Map<HostInfo, Set<TopicPartition>> currentState, final Cluster clusterMetadata) {
         this.clusterMetadata = clusterMetadata;
         rebuildMetadata(currentState);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e69ce80/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
new file mode 100644
index 0000000..177306a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public interface Task {
+    void resume();
+
+    void commit();
+
+    void suspend();
+
+    void close(boolean clean);
+
+    TaskId id();
+
+    String applicationId();
+
+    Set<TopicPartition> partitions();
+
+    ProcessorTopology topology();
+
+    ProcessorContext context();
+
+    StateStore getStore(String name);
+
+    void closeSuspended(boolean clean, RuntimeException e);
+
+    Map<TopicPartition, Long> checkpointedOffsets();
+
+    boolean process();
+
+    boolean commitNeeded();
+
+    boolean maybePunctuateStreamTime();
+
+    boolean maybePunctuateSystemTime();
+
+    List<ConsumerRecord<byte[], byte[]>> update(TopicPartition partition, List<ConsumerRecord<byte[], byte[]>> remaining);
+
+    String toString(String indent);
+
+    int addRecords(TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> records);
+}


Mime
View raw message