kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject [2/3] kafka git commit: KAFKA-5754; Refactor Streams to use LogContext
Date Mon, 18 Sep 2017 08:53:26 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 73f443e..b753cf9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Meter;
 import org.apache.kafka.common.metrics.stats.SampledStat;
 import org.apache.kafka.common.metrics.stats.Sum;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KafkaClientSupplier;
 import org.apache.kafka.streams.StreamsConfig;
@@ -49,7 +50,6 @@ import org.apache.kafka.streams.processor.TaskMetadata;
 import org.apache.kafka.streams.processor.ThreadMetadata;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.util.ArrayList;
@@ -68,7 +68,7 @@ import static java.util.Collections.singleton;
 
 public class StreamThread extends Thread implements ThreadDataProvider {
 
-    private static final Logger log = LoggerFactory.getLogger(StreamThread.class);
+    private final Logger log;
     private static final AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1);
 
     /**
@@ -193,10 +193,10 @@ public class StreamThread extends Thread implements ThreadDataProvider {
                 // refused but we do not throw exception here
                 return false;
             } else if (!state.isValidTransition(newState)) {
-                log.error("{} Unexpected state transition from {} to {}", logPrefix, oldState, newState);
-                throw new StreamsException(logPrefix + " Unexpected state transition from " + oldState + " to " + newState);
+                log.error("Unexpected state transition from {} to {}", oldState, newState);
+                throw new StreamsException(logPrefix + "Unexpected state transition from " + oldState + " to " + newState);
             } else {
-                log.info("{} State transition from {} to {}", logPrefix, oldState, newState);
+                log.info("State transition from {} to {}", oldState, newState);
             }
 
             state = newState;
@@ -229,24 +229,23 @@ public class StreamThread extends Thread implements ThreadDataProvider {
         private final Time time;
         private final TaskManager taskManager;
         private final StreamThread streamThread;
-        private final String logPrefix;
+        private final Logger log;
 
         RebalanceListener(final Time time,
                           final TaskManager taskManager,
                           final StreamThread streamThread,
-                          final String logPrefix) {
+                          final Logger log) {
             this.time = time;
             this.taskManager = taskManager;
             this.streamThread = streamThread;
-            this.logPrefix = logPrefix;
+            this.log = log;
         }
 
         @Override
         public void onPartitionsAssigned(final Collection<TopicPartition> assignment) {
-            log.debug("{} at state {}: partitions {} assigned at the end of consumer rebalance.\n" +
+            log.debug("at state {}: partitions {} assigned at the end of consumer rebalance.\n" +
                             "\tcurrent suspended active tasks: {}\n" +
                             "\tcurrent suspended standby tasks: {}\n",
-                    logPrefix,
                     streamThread.state,
                     assignment,
                     taskManager.suspendedActiveTaskIds(),
@@ -260,15 +259,14 @@ public class StreamThread extends Thread implements ThreadDataProvider {
                 taskManager.createTasks(assignment);
                 streamThread.refreshMetadataState();
             } catch (final Throwable t) {
-                log.error("{} Error caught during partition assignment, " +
-                        "will abort the current process and re-throw at the end of rebalance: {}", logPrefix, t.getMessage());
+                log.error("Error caught during partition assignment, " +
+                        "will abort the current process and re-throw at the end of rebalance: {}", t.getMessage());
                 streamThread.setRebalanceException(t);
             } finally {
-                log.info("{} partition assignment took {} ms.\n" +
+                log.info("partition assignment took {} ms.\n" +
                                  "\tcurrent active tasks: {}\n" +
                                  "\tcurrent standby tasks: {}\n" +
                                  "\tprevious active tasks: {}\n",
-                         logPrefix,
                          time.milliseconds() - start,
                          taskManager.activeTaskIds(),
                          taskManager.standbyTaskIds(),
@@ -278,10 +276,9 @@ public class StreamThread extends Thread implements ThreadDataProvider {
 
         @Override
         public void onPartitionsRevoked(final Collection<TopicPartition> assignment) {
-            log.debug("{} at state {}: partitions {} revoked at the beginning of consumer rebalance.\n" +
+            log.debug("at state {}: partitions {} revoked at the beginning of consumer rebalance.\n" +
                     "\tcurrent assigned active tasks: {}\n" +
                     "\tcurrent assigned standby tasks: {}\n",
-                logPrefix,
                 streamThread.state,
                 assignment,
                 taskManager.activeTaskIds(),
@@ -293,17 +290,16 @@ public class StreamThread extends Thread implements ThreadDataProvider {
                     // suspend active tasks
                     taskManager.suspendTasksAndState();
                 } catch (final Throwable t) {
-                    log.error("{} Error caught during partition revocation, " +
-                            "will abort the current process and re-throw at the end of rebalance: {}", logPrefix, t.getMessage());
+                    log.error("Error caught during partition revocation, " +
+                            "will abort the current process and re-throw at the end of rebalance: {}", t.getMessage());
                     streamThread.setRebalanceException(t);
                 } finally {
                     streamThread.refreshMetadataState();
                     streamThread.clearStandbyRecords();
 
-                    log.info("{} partition revocation took {} ms.\n" +
+                    log.info("partition revocation took {} ms.\n" +
                                     "\tsuspended active tasks: {}\n" +
                                     "\tsuspended standby tasks: {}",
-                            logPrefix,
                             time.milliseconds() - start,
                             taskManager.suspendedActiveTaskIds(),
                             taskManager.suspendedStandbyTaskIds());
@@ -321,7 +317,8 @@ public class StreamThread extends Thread implements ThreadDataProvider {
         final Sensor taskCreatedSensor;
         final ChangelogReader storeChangelogReader;
         final Time time;
-        final String logPrefix;
+        final Logger log;
+
 
         AbstractTaskCreator(final InternalTopologyBuilder builder,
                             final StreamsConfig config,
@@ -330,7 +327,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
                             final Sensor taskCreatedSensor,
                             final ChangelogReader storeChangelogReader,
                             final Time time,
-                            final String logPrefix) {
+                            final Logger log) {
             this.applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
             this.builder = builder;
             this.config = config;
@@ -339,7 +336,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
             this.taskCreatedSensor = taskCreatedSensor;
             this.storeChangelogReader = storeChangelogReader;
             this.time = time;
-            this.logPrefix = logPrefix;
+            this.log = log;
         }
 
         Collection<Task> createTasks(final Consumer<byte[], byte[]> consumer, final Map<TaskId, Set<TopicPartition>> tasksToBeCreated) {
@@ -349,7 +346,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
                 final Set<TopicPartition> partitions = newTaskAndPartitions.getValue();
                 Task task = createTask(consumer, taskId, partitions);
                 if (task != null) {
-                    log.trace("{} Created task {} with assigned partitions {}", logPrefix, taskId, partitions);
+                    log.trace("Created task {} with assigned partitions {}", taskId, partitions);
                     createdTasks.add(task);
                 }
 
@@ -368,7 +365,6 @@ public class StreamThread extends Thread implements ThreadDataProvider {
         private final String threadClientId;
         private final Producer<byte[], byte[]> threadProducer;
 
-
         TaskCreator(final InternalTopologyBuilder builder,
                     final StreamsConfig config,
                     final StreamsMetrics streamsMetrics,
@@ -380,7 +376,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
                     final KafkaClientSupplier clientSupplier,
                     final Producer<byte[], byte[]> threadProducer,
                     final String threadClientId,
-                    final String logPrefix) {
+                    final Logger log) {
             super(builder,
                   config,
                   streamsMetrics,
@@ -388,7 +384,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
                   taskCreatedSensor,
                   storeChangelogReader,
                   time,
-                  logPrefix);
+                    log);
             this.cache = cache;
             this.clientSupplier = clientSupplier;
             this.threadProducer = threadProducer;
@@ -419,7 +415,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
             // eos
             if (threadProducer == null) {
                 final Map<String, Object> producerConfigs = config.getProducerConfigs(threadClientId + "-" + id);
-                log.info("{} Creating producer client for task {}", logPrefix, id);
+                log.info("Creating producer client for task {}", id);
                 producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, applicationId + "-" + id);
                 return clientSupplier.getProducer(producerConfigs);
             }
@@ -434,7 +430,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
                 try {
                     threadProducer.close();
                 } catch (final Throwable e) {
-                    log.error("{} Failed to close producer due to the following error:", logPrefix, e);
+                    log.error("Failed to close producer due to the following error:", e);
                 }
             }
         }
@@ -448,7 +444,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
                            final Sensor taskCreatedSensor,
                            final ChangelogReader storeChangelogReader,
                            final Time time,
-                           final String logPrefix) {
+                           final Logger log) {
             super(builder,
                   config,
                   streamsMetrics,
@@ -456,7 +452,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
                   taskCreatedSensor,
                   storeChangelogReader,
                   time,
-                  logPrefix);
+                    log);
         }
 
         @Override
@@ -468,7 +464,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
             if (!topology.stateStores().isEmpty()) {
                 return new StandbyTask(taskId, applicationId, partitions, topology, consumer, storeChangelogReader, config, streamsMetrics, stateDirectory);
             } else {
-                log.trace("{} Skipped standby task {} with assigned partitions {} since it does not have any state stores to materialize", logPrefix, taskId, partitions);
+                log.trace("Skipped standby task {} with assigned partitions {} since it does not have any state stores to materialize", taskId, partitions);
 
                 return null;
             }
@@ -601,21 +597,24 @@ public class StreamThread extends Thread implements ThreadDataProvider {
         this.time = time;
         this.streamsMetadataState = streamsMetadataState;
         this.taskManager = taskManager;
-        this.logPrefix = logPrefix(threadClientId);
+        this.logPrefix = String.format("stream-thread [%s] ", threadClientId);
         this.streamsMetrics = streamsMetrics;
         this.restoreConsumer = restoreConsumer;
         this.stateDirectory = stateDirectory;
-        this.rebalanceListener = new RebalanceListener(time, taskManager, this, logPrefix);
         this.config = config;
         this.stateLock = new Object();
         this.standbyRecords = new HashMap<>();
         this.partitionGrouper = config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
-        log.info("{} Creating consumer client", logPrefix);
+        final LogContext logContext = new LogContext(this.logPrefix);
+        this.log = logContext.logger(StreamThread.class);
+        this.rebalanceListener = new RebalanceListener(time, taskManager, this, this.log);
+
+        log.info("Creating consumer client");
         final Map<String, Object> consumerConfigs = config.getConsumerConfigs(this, applicationId, threadClientId);
 
         if (!builder.latestResetTopicsPattern().pattern().equals("") || !builder.earliestResetTopicsPattern().pattern().equals("")) {
             originalReset = (String) consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
-            log.info("{} Custom offset resets specified updating configs original auto offset reset {}", logPrefix, originalReset);
+            log.info("Custom offset resets specified updating configs original auto offset reset {}", originalReset);
             consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
         }
         this.consumer = clientSupplier.getConsumer(consumerConfigs);
@@ -643,25 +642,29 @@ public class StreamThread extends Thread implements ThreadDataProvider {
                                                                                      Collections.singletonMap("client-id",
                                                                                                               threadClientId));
 
-        final String logPrefix = logPrefix(threadClientId);
+        final String logPrefix = String.format("stream-thread [%s] ", threadClientId);
+        final LogContext logContext = new LogContext(logPrefix);
+        final Logger log = logContext.logger(StreamThread.class);
+
         if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) < 0) {
-            log.warn("{} Negative cache size passed in thread. Reverting to cache size of 0 bytes", logPrefix);
+            log.warn("Negative cache size passed in thread. Reverting to cache size of 0 bytes");
         }
-        final ThreadCache cache = new ThreadCache(threadClientId, cacheSizeBytes, streamsMetrics);
+        final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics);
 
         final boolean eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
 
-        log.info("{} Creating restore consumer client", logPrefix);
+        log.info("Creating restore consumer client");
         final Map<String, Object> consumerConfigs = config.getRestoreConsumerConfigs(threadClientId);
         final Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(consumerConfigs);
         final StoreChangelogReader changelogReader = new StoreChangelogReader(threadClientId,
                                                                               restoreConsumer,
-                                                                              stateRestoreListener);
+                                                                              stateRestoreListener,
+                                                                                logContext);
 
         Producer<byte[], byte[]> threadProducer = null;
         if (!eosEnabled) {
             final Map<String, Object> producerConfigs = config.getProducerConfigs(threadClientId);
-            log.info("{} Creating shared producer client", logPrefix);
+            log.info("Creating shared producer client");
             threadProducer = clientSupplier.getProducer(producerConfigs);
         }
 
@@ -676,7 +679,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
                                                                       clientSupplier,
                                                                       threadProducer,
                                                                       threadClientId,
-                                                                      logPrefix);
+                                                                      log);
         final AbstractTaskCreator standbyTaskCreator = new StandbyTaskCreator(builder,
                                                                               config,
                                                                               streamsMetrics,
@@ -684,16 +687,16 @@ public class StreamThread extends Thread implements ThreadDataProvider {
                                                                               streamsMetrics.taskCreatedSensor,
                                                                               changelogReader,
                                                                               time,
-                                                                              logPrefix);
+                                                                              log);
         final TaskManager taskManager = new TaskManager(changelogReader,
                                                         logPrefix,
                                                         restoreConsumer,
                                                         activeTaskCreator,
                                                         standbyTaskCreator,
-                                                        new AssignedTasks(logPrefix,
+                                                        new AssignedTasks(logContext,
                                                                           "stream task"
                                                         ),
-                                                        new AssignedTasks(logPrefix,
+                                                        new AssignedTasks(logContext,
                                                                           "standby task"
                                                         ));
 
@@ -713,10 +716,6 @@ public class StreamThread extends Thread implements ThreadDataProvider {
 
     }
 
-    private static String logPrefix(final String threadClientId) {
-        return String.format("stream-thread [%s]", threadClientId);
-    }
-
     /**
      * Execute the stream processors
      *
@@ -725,7 +724,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
      */
     @Override
     public void run() {
-        log.info("{} Starting", logPrefix);
+        log.info("Starting");
         setState(State.RUNNING);
         boolean cleanRun = false;
         try {
@@ -737,7 +736,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
         } catch (final Exception e) {
             // we have caught all Kafka related exceptions, and other runtime exceptions
             // should be due to user application errors
-            log.error("{} Encountered the following error during processing:", logPrefix, e);
+            log.error("Encountered the following error during processing:", e);
             throw e;
         } finally {
             completeShutdown(cleanRun);
@@ -808,7 +807,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
 
         if (rebalanceException != null) {
             if (!(rebalanceException instanceof ProducerFencedException)) {
-                throw new StreamsException(logPrefix + " Failed to rebalance.", rebalanceException);
+                throw new StreamsException(logPrefix + "Failed to rebalance.", rebalanceException);
             }
         }
 
@@ -823,9 +822,9 @@ public class StreamThread extends Thread implements ThreadDataProvider {
 
         for (final TopicPartition partition : partitions) {
             if (builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) {
-                addToResetList(partition, seekToBeginning, "{} Setting topic '{}' to consume from {} offset", "earliest", loggedTopics);
+                addToResetList(partition, seekToBeginning, "Setting topic '{}' to consume from {} offset", "earliest", loggedTopics);
             } else if (builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) {
-                addToResetList(partition, seekToEnd, "{} Setting topic '{}' to consume from {} offset", "latest", loggedTopics);
+                addToResetList(partition, seekToEnd, "Setting topic '{}' to consume from {} offset", "latest", loggedTopics);
             } else {
                 if (originalReset == null || (!originalReset.equals("earliest") && !originalReset.equals("latest"))) {
                     final String errorMessage = "No valid committed offset found for input topic %s (partition %s) and no valid reset policy configured." +
@@ -835,9 +834,9 @@ public class StreamThread extends Thread implements ThreadDataProvider {
                 }
 
                 if (originalReset.equals("earliest")) {
-                    addToResetList(partition, seekToBeginning, "{} No custom setting defined for topic '{}' using original config '{}' for offset reset", "earliest", loggedTopics);
+                    addToResetList(partition, seekToBeginning, "No custom setting defined for topic '{}' using original config '{}' for offset reset", "earliest", loggedTopics);
                 } else if (originalReset.equals("latest")) {
-                    addToResetList(partition, seekToEnd, "{} No custom setting defined for topic '{}' using original config '{}' for offset reset", "latest", loggedTopics);
+                    addToResetList(partition, seekToEnd, "No custom setting defined for topic '{}' using original config '{}' for offset reset", "latest", loggedTopics);
                 }
             }
         }
@@ -853,7 +852,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
     private void addToResetList(final TopicPartition partition, final Set<TopicPartition> partitions, final String logMessage, final String resetPolicy, final Set<String> loggedTopics) {
         final String topic = partition.topic();
         if (loggedTopics.add(topic)) {
-            log.info(logMessage, logPrefix, topic, resetPolicy);
+            log.info(logMessage, topic, resetPolicy);
         }
         partitions.add(partition);
     }
@@ -937,13 +936,13 @@ public class StreamThread extends Thread implements ThreadDataProvider {
         if (processLatency > 0 && processLatency > commitTime) {
             // push down
             recordsProcessedBeforeCommit = Math.max(1, (commitTime * totalProcessed) / processLatency);
-            log.debug("{} processing latency {} > commit time {} for {} records. Adjusting down recordsProcessedBeforeCommit={}",
-                logPrefix, processLatency, commitTime, totalProcessed, recordsProcessedBeforeCommit);
+            log.debug("processing latency {} > commit time {} for {} records. Adjusting down recordsProcessedBeforeCommit={}",
+                    processLatency, commitTime, totalProcessed, recordsProcessedBeforeCommit);
         } else if (prevRecordsProcessedBeforeCommit != UNLIMITED_RECORDS && processLatency > 0) {
             // push up
             recordsProcessedBeforeCommit = Math.max(1, (commitTime * totalProcessed) / processLatency);
-            log.debug("{} processing latency {} < commit time {} for {} records. Adjusting up recordsProcessedBeforeCommit={}",
-                logPrefix, processLatency, commitTime, totalProcessed, recordsProcessedBeforeCommit);
+            log.debug("processing latency {} < commit time {} for {} records. Adjusting up recordsProcessedBeforeCommit={}",
+                    processLatency, commitTime, totalProcessed, recordsProcessedBeforeCommit);
         }
 
         return recordsProcessedBeforeCommit;
@@ -955,8 +954,8 @@ public class StreamThread extends Thread implements ThreadDataProvider {
     void maybeCommit(final long now) {
         if (commitTimeMs >= 0 && lastCommitMs + commitTimeMs < now) {
             if (log.isTraceEnabled()) {
-                log.trace("{} Committing all active tasks {} and standby tasks {} since {}ms has elapsed (commit interval is {}ms)",
-                        logPrefix, taskManager.activeTaskIds(), taskManager.standbyTaskIds(), now - lastCommitMs, commitTimeMs);
+                log.trace("Committing all active tasks {} and standby tasks {} since {}ms has elapsed (commit interval is {}ms)",
+                        taskManager.activeTaskIds(), taskManager.standbyTaskIds(), now - lastCommitMs, commitTimeMs);
             }
 
             int committed = taskManager.commitAll();
@@ -964,8 +963,8 @@ public class StreamThread extends Thread implements ThreadDataProvider {
                 streamsMetrics.commitTimeSensor.record(computeLatency() / (double) committed, timerStartedMs);
             }
             if (log.isDebugEnabled()) {
-                log.info("{} Committed all active tasks {} and standby tasks {} in {}ms",
-                        logPrefix,  taskManager.activeTaskIds(), taskManager.standbyTaskIds(), timerStartedMs - now);
+                log.info("Committed all active tasks {} and standby tasks {} in {}ms",
+                        taskManager.activeTaskIds(), taskManager.standbyTaskIds(), timerStartedMs - now);
             }
 
             lastCommitMs = now;
@@ -996,7 +995,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
 
                     standbyRecords = remainingStandbyRecords;
 
-                    log.debug("{} Updated standby tasks {} in {}ms", logPrefix, taskManager.standbyTaskIds(), time.milliseconds() - now);
+                    log.debug("Updated standby tasks {} in {}ms", taskManager.standbyTaskIds(), time.milliseconds() - now);
                 }
                 processStandbyRecords = false;
             }
@@ -1008,7 +1007,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
                     final Task task = taskManager.standbyTask(partition);
 
                     if (task == null) {
-                        throw new StreamsException(logPrefix + " Missing standby task for partition " + partition);
+                        throw new StreamsException(logPrefix + "Missing standby task for partition " + partition);
                     }
 
                     final List<ConsumerRecord<byte[], byte[]>> remaining = task.update(partition, records.records(partition));
@@ -1041,7 +1040,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
      * (e.g., in testing), hence the state is set only the first time
      */
     public void shutdown() {
-        log.info("{} Informed to shut down", logPrefix);
+        log.info("Informed to shut down");
         setState(State.PENDING_SHUTDOWN);
     }
 
@@ -1152,23 +1151,23 @@ public class StreamThread extends Thread implements ThreadDataProvider {
         // intentionally do not check the returned flag
         setState(State.PENDING_SHUTDOWN);
 
-        log.info("{} Shutting down", logPrefix);
+        log.info("Shutting down");
 
         taskManager.shutdown(cleanRun);
         try {
             consumer.close();
         } catch (final Throwable e) {
-            log.error("{} Failed to close consumer due to the following error:", logPrefix, e);
+            log.error("Failed to close consumer due to the following error:", e);
         }
         try {
             restoreConsumer.close();
         } catch (final Throwable e) {
-            log.error("{} Failed to close restore consumer due to the following error:", logPrefix, e);
+            log.error("Failed to close restore consumer due to the following error:", e);
         }
         streamsMetrics.removeAllSensors();
 
         setState(State.DEAD);
-        log.info("{} Shutdown complete", logPrefix);
+        log.info("Shutdown complete");
     }
 
     private void clearStandbyRecords() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 2d896fa..7afbecf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -18,10 +18,10 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -36,7 +36,7 @@ class TaskManager {
     // initialize the task list
     // activeTasks needs to be concurrent as it can be accessed
     // by QueryableState
-    private static final Logger log = LoggerFactory.getLogger(TaskManager.class);
+    private final Logger log;
     private final AssignedTasks active;
     private final AssignedTasks standby;
     private final ChangelogReader changelogReader;
@@ -61,14 +61,18 @@ class TaskManager {
         this.standbyTaskCreator = standbyTaskCreator;
         this.active = active;
         this.standby = standby;
+
+        final LogContext logContext = new LogContext(logPrefix);
+
+        this.log = logContext.logger(getClass());
     }
 
     void createTasks(final Collection<TopicPartition> assignment) {
         if (threadMetadataProvider == null) {
-            throw new IllegalStateException(logPrefix + " taskIdProvider has not been initialized while adding stream tasks. This should not happen.");
+            throw new IllegalStateException(logPrefix + "taskIdProvider has not been initialized while adding stream tasks. This should not happen.");
         }
         if (consumer == null) {
-            throw new IllegalStateException(logPrefix + " consumer has not been initialized while adding stream tasks. This should not happen.");
+            throw new IllegalStateException(logPrefix + "consumer has not been initialized while adding stream tasks. This should not happen.");
         }
 
         changelogReader.reset();
@@ -80,7 +84,7 @@ class TaskManager {
         addStreamTasks(assignment);
         addStandbyTasks();
         final Set<TopicPartition> partitions = active.uninitializedPartitions();
-        log.trace("{} pausing partitions: {}", logPrefix, partitions);
+        log.trace("pausing partitions: {}", partitions);
         consumer.pause(partitions);
     }
 
@@ -95,7 +99,7 @@ class TaskManager {
         }
         final Map<TaskId, Set<TopicPartition>> newTasks = new HashMap<>();
         // collect newly assigned tasks and reopen re-assigned tasks
-        log.debug("{} Adding assigned tasks as active: {}", logPrefix, assignedTasks);
+        log.debug("Adding assigned tasks as active: {}", assignedTasks);
         for (final Map.Entry<TaskId, Set<TopicPartition>> entry : assignedTasks.entrySet()) {
             final TaskId taskId = entry.getKey();
             final Set<TopicPartition> partitions = entry.getValue();
@@ -106,11 +110,11 @@ class TaskManager {
                         newTasks.put(taskId, partitions);
                     }
                 } catch (final StreamsException e) {
-                    log.error("{} Failed to create an active task {} due to the following error:", logPrefix, taskId, e);
+                    log.error("Failed to create an active task {} due to the following error:", taskId, e);
                     throw e;
                 }
             } else {
-                log.warn("{} Task {} owned partitions {} are not contained in the assignment {}", logPrefix, taskId, partitions, assignment);
+                log.warn("Task {} owned partitions {} are not contained in the assignment {}", taskId, partitions, assignment);
             }
         }
 
@@ -120,7 +124,7 @@ class TaskManager {
 
         // create all newly assigned tasks (guard against race condition with other thread via backoff and retry)
         // -> other thread will call removeSuspendedTasks(); eventually
-        log.trace("{} New active tasks to be created: {}", logPrefix, newTasks);
+        log.trace("New active tasks to be created: {}", newTasks);
 
         for (final Task task : taskCreator.createTasks(consumer, newTasks)) {
             active.addNewTask(task);
@@ -132,7 +136,7 @@ class TaskManager {
         if (assignedStandbyTasks.isEmpty()) {
             return;
         }
-        log.debug("{} Adding assigned standby tasks {}", logPrefix, assignedStandbyTasks);
+        log.debug("Adding assigned standby tasks {}", assignedStandbyTasks);
         final Map<TaskId, Set<TopicPartition>> newStandbyTasks = new HashMap<>();
         // collect newly assigned standby tasks and reopen re-assigned standby tasks
         for (final Map.Entry<TaskId, Set<TopicPartition>> entry : assignedStandbyTasks.entrySet()) {
@@ -150,7 +154,7 @@ class TaskManager {
 
         // create all newly assigned standby tasks (guard against race condition with other thread via backoff and retry)
         // -> other thread will call removeSuspendedStandbyTasks(); eventually
-        log.trace("{} New standby tasks to be created: {}", logPrefix, newStandbyTasks);
+        log.trace("New standby tasks to be created: {}", newStandbyTasks);
 
         for (final Task task : standbyTaskCreator.createTasks(consumer, newStandbyTasks)) {
             standby.addNewTask(task);
@@ -174,8 +178,7 @@ class TaskManager {
      * soon the tasks will be assigned again
      */
     void suspendTasksAndState()  {
-        log.debug("{} Suspending all active tasks {} and standby tasks {}",
-                  logPrefix, active.runningTaskIds(), standby.runningTaskIds());
+        log.debug("Suspending all active tasks {} and standby tasks {}", active.runningTaskIds(), standby.runningTaskIds());
 
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
 
@@ -185,7 +188,7 @@ class TaskManager {
         firstException.compareAndSet(null, unAssignChangeLogPartitions());
 
         if (firstException.get() != null) {
-            throw new StreamsException(logPrefix + " failed to suspend stream tasks", firstException.get());
+            throw new StreamsException(logPrefix + "failed to suspend stream tasks", firstException.get());
         }
     }
 
@@ -194,15 +197,14 @@ class TaskManager {
             // un-assign the change log partitions
             restoreConsumer.assign(Collections.<TopicPartition>emptyList());
         } catch (final RuntimeException e) {
-            log.error("{} Failed to un-assign change log partitions due to the following error:", logPrefix, e);
+            log.error("Failed to un-assign change log partitions due to the following error:", e);
             return e;
         }
         return null;
     }
 
     void shutdown(final boolean clean) {
-        log.debug("{} Shutting down all active tasks {}, standby tasks {}, suspended tasks {}, and suspended standby tasks {}",
-                  logPrefix, active.runningTaskIds(), standby.runningTaskIds(),
+        log.debug("Shutting down all active tasks {}, standby tasks {}, suspended tasks {}, and suspended standby tasks {}", active.runningTaskIds(), standby.runningTaskIds(),
                   active.previousTaskIds(), standby.previousTaskIds());
 
         active.close(clean);
@@ -210,7 +212,7 @@ class TaskManager {
         try {
             threadMetadataProvider.close();
         } catch (final Throwable e) {
-            log.error("{} Failed to close KafkaStreamClient due to the following error:", logPrefix, e);
+            log.error("Failed to close KafkaStreamClient due to the following error:", e);
         }
         // remove the changelog partitions from restore consumer
         unAssignChangeLogPartitions();
@@ -256,7 +258,7 @@ class TaskManager {
         final Set<TopicPartition> resumed = active.updateRestored(restored);
 
         if (!resumed.isEmpty()) {
-            log.trace("{} resuming partitions {}", logPrefix, resumed);
+            log.trace("resuming partitions {}", resumed);
             consumer.resume(resumed);
         }
         if (active.allTasksRunning()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index 1220c02..aab9671 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -17,11 +17,11 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.internals.RecordContext;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -37,9 +37,7 @@ import java.util.NoSuchElementException;
  * @see org.apache.kafka.streams.state.Stores#create(String)
  */
 public class ThreadCache {
-    private static final Logger log = LoggerFactory.getLogger(ThreadCache.class);
-
-    private final String logPrefix;
+    private final Logger log;
     private final long maxCacheSizeBytes;
     private final StreamsMetrics metrics;
     private final Map<String, NamedCache> caches = new HashMap<>();
@@ -54,10 +52,10 @@ public class ThreadCache {
         void apply(final List<DirtyEntry> dirty);
     }
 
-    public ThreadCache(final String logPrefix, long maxCacheSizeBytes, final StreamsMetrics metrics) {
-        this.logPrefix = logPrefix;
+    public ThreadCache(final LogContext logContext, long maxCacheSizeBytes, final StreamsMetrics metrics) {
         this.maxCacheSizeBytes = maxCacheSizeBytes;
         this.metrics = metrics;
+        this.log = logContext.logger(getClass());
     }
 
     public long puts() {
@@ -129,8 +127,7 @@ public class ThreadCache {
         cache.flush();
 
         if (log.isTraceEnabled()) {
-            log.trace("{} Cache stats on flush: #puts={}, #gets={}, #evicts={}, #flushes={}",
-                logPrefix, puts(), gets(), evicts(), flushes());
+            log.trace("Cache stats on flush: #puts={}, #gets={}, #evicts={}, #flushes={}", puts(), gets(), evicts(), flushes());
         }
     }
 
@@ -250,7 +247,7 @@ public class ThreadCache {
             numEvicted++;
         }
         if (log.isTraceEnabled()) {
-            log.trace("{} Evicted {} entries from cache {}", logPrefix, numEvicted, namespace);
+            log.trace("Evicted {} entries from cache {}", numEvicted, namespace);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 33d55e2..73c5484 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -557,4 +557,5 @@ public class KafkaStreamsTest {
             mapStates.put(newState, prevCount + 1);
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 7f0d28a..21dc4f0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
@@ -87,7 +88,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
     public void initializeStore() {
         final File stateDir = TestUtils.tempDirectory();
         context = new MockProcessorContext(stateDir,
-            Serdes.String(), Serdes.String(), new NoOpRecordCollector(), new ThreadCache("testCache", 100000, new MockStreamsMetrics(new Metrics()))) {
+            Serdes.String(), Serdes.String(), new NoOpRecordCollector(), new ThreadCache(new LogContext("testCache "), 100000, new MockStreamsMetrics(new Metrics()))) {
             @Override
             public <K, V> void forward(final K key, final V value) {
                 results.add(KeyValue.pair(key, value));

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index 16e5b39..4dc17c0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.PunctuationType;
@@ -161,7 +162,7 @@ public class AbstractProcessorContextTest {
         }
 
         TestProcessorContext(final MockStreamsMetrics metrics) {
-            super(new TaskId(0, 0), "appId", new StreamsConfig(config), metrics, new StateManagerStub(), new ThreadCache("name", 0, metrics));
+            super(new TaskId(0, 0), "appId", new StreamsConfig(config), metrics, new StateManagerStub(), new ThreadCache(new LogContext("name "), 0, metrics));
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index d2d439c..02aa0a0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
@@ -121,7 +122,7 @@ public class AbstractTaskTest {
                                                       Collections.<String, String>emptyMap(),
                                                       Collections.<StateStore>emptyList()),
                                 consumer,
-                                new StoreChangelogReader(consumer, new MockStateRestoreListener()),
+                                new StoreChangelogReader(consumer, new MockStateRestoreListener(), new LogContext("stream-task-test ")),
                                 false,
                                 stateDirectory,
                                 config) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
index 7f961af..7d6bb3a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.processor.TaskId;
 import org.easymock.EasyMock;
@@ -52,7 +53,7 @@ public class AssignedTasksTest {
 
     @Before
     public void before() {
-        assignedTasks = new AssignedTasks("log", "task");
+        assignedTasks = new AssignedTasks(new LogContext("log "), "task");
         EasyMock.expect(t1.id()).andReturn(taskId1).anyTimes();
         EasyMock.expect(t2.id()).andReturn(taskId2).anyTimes();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index c56f609..fd9d070 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -108,7 +109,7 @@ public class ProcessorNodeTest {
         final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
 
         final Metrics metrics = new Metrics();
-        final MockProcessorContext context = new MockProcessorContext(anyStateSerde,  new RecordCollectorImpl(null, null), metrics);
+        final MockProcessorContext context = new MockProcessorContext(anyStateSerde,  new RecordCollectorImpl(null, null, new LogContext("processnode-test ")), metrics);
         final ProcessorNode node = new ProcessorNode("name", new NoOpProcessor(), Collections.emptySet());
         node.init(context);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index fbf45b3..1db2200 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
@@ -71,6 +72,7 @@ public class ProcessorStateManagerTest {
     private final byte[] key = new byte[]{0x0, 0x0, 0x0, 0x1};
     private final byte[] value = "the-value".getBytes(Charset.forName("UTF-8"));
     private final ConsumerRecord<byte[], byte[]> consumerRecord = new ConsumerRecord<>(changelogTopic, 0, 0, key, value);
+    private final LogContext logContext = new LogContext("process-state-manager-test ");
 
     private File baseDir;
     private File checkpointFile;
@@ -146,7 +148,8 @@ public class ProcessorStateManagerTest {
                 }
             },
             changelogReader,
-            false);
+            false,
+                logContext);
 
         try {
             stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
@@ -172,7 +175,8 @@ public class ProcessorStateManagerTest {
                 }
             },
             changelogReader,
-            false);
+            false,
+                logContext);
 
         try {
             stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback);
@@ -220,7 +224,8 @@ public class ProcessorStateManagerTest {
             stateDirectory,
             storeToChangelogTopic,
             changelogReader,
-            false);
+            false,
+                logContext);
 
         try {
             stateMgr.register(store1, true, store1.stateRestoreCallback);
@@ -252,7 +257,8 @@ public class ProcessorStateManagerTest {
             stateDirectory,
             Collections.<String, String>emptyMap(),
             changelogReader,
-            false);
+            false,
+                logContext);
         try {
             stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
 
@@ -286,7 +292,8 @@ public class ProcessorStateManagerTest {
                 }
             },
             changelogReader,
-            false);
+            false,
+                logContext);
         try {
             // make sure the checkpoint file isn't deleted
             assertTrue(checkpointFile.exists());
@@ -320,7 +327,8 @@ public class ProcessorStateManagerTest {
             stateDirectory,
             Collections.<String, String>emptyMap(),
             changelogReader,
-            false);
+            false,
+                logContext);
         stateMgr.register(nonPersistentStore, false, nonPersistentStore.stateRestoreCallback);
         assertNotNull(stateMgr.getStore(nonPersistentStoreName));
     }
@@ -338,7 +346,8 @@ public class ProcessorStateManagerTest {
             stateDirectory,
             Collections.<String, String>emptyMap(),
             changelogReader,
-            false);
+            false,
+                logContext);
         stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
         stateMgr.close(null);
         final Map<TopicPartition, Long> read = checkpoint.read();
@@ -354,7 +363,8 @@ public class ProcessorStateManagerTest {
             stateDirectory,
             Collections.singletonMap(persistentStore.name(), persistentStoreTopicName),
             changelogReader,
-            false);
+            false,
+                logContext);
         stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
 
         stateMgr.checkpoint(Collections.singletonMap(persistentStorePartition, 10L));
@@ -371,7 +381,8 @@ public class ProcessorStateManagerTest {
             stateDirectory,
             Collections.singletonMap(persistentStore.name(), persistentStoreTopicName),
             changelogReader,
-            false);
+            false,
+                logContext);
 
         stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
         final byte[] bytes = Serdes.Integer().serializer().serialize("", 10);
@@ -401,7 +412,8 @@ public class ProcessorStateManagerTest {
             stateDirectory,
             Collections.singletonMap(nonPersistentStoreName, nonPersistentStoreTopicName),
             changelogReader,
-            false);
+            false,
+                logContext);
 
         stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback);
         stateMgr.checkpoint(Collections.singletonMap(topicPartition, 876L));
@@ -419,7 +431,8 @@ public class ProcessorStateManagerTest {
             stateDirectory,
             Collections.<String, String>emptyMap(),
             changelogReader,
-            false);
+            false,
+                logContext);
 
         stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
 
@@ -439,7 +452,8 @@ public class ProcessorStateManagerTest {
             stateDirectory,
             Collections.<String, String>emptyMap(),
             changelogReader,
-            false);
+            false,
+                logContext);
 
         try {
             stateManager.register(new MockStateStoreSupplier.MockStateStore(ProcessorStateManager.CHECKPOINT_FILE_NAME, true), true, null);
@@ -458,7 +472,9 @@ public class ProcessorStateManagerTest {
             stateDirectory,
             Collections.<String, String>emptyMap(),
             changelogReader,
-            false);
+            false,
+                logContext);
+
         stateManager.register(mockStateStore, false, null);
 
         try {
@@ -480,7 +496,8 @@ public class ProcessorStateManagerTest {
             stateDirectory,
             Collections.singletonMap(storeName, changelogTopic),
             changelogReader,
-            false);
+            false,
+                logContext);
 
         final MockStateStoreSupplier.MockStateStore stateStore = new MockStateStoreSupplier.MockStateStore(storeName, true) {
             @Override
@@ -512,7 +529,8 @@ public class ProcessorStateManagerTest {
                 stateDirectory,
                 Collections.<String, String>emptyMap(),
                 changelogReader,
-                true);
+                true,
+                    logContext);
 
             assertFalse(checkpointFile.exists());
         } finally {
@@ -534,7 +552,8 @@ public class ProcessorStateManagerTest {
                 }
             },
             changelogReader,
-            false);
+            false,
+                logContext);
     }
 
     private MockStateStoreSupplier.MockStateStore getPersistentStore() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 09f46e0..7b2a41e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.junit.Test;
@@ -43,6 +44,8 @@ import static org.junit.Assert.assertEquals;
 
 public class RecordCollectorTest {
 
+    private final LogContext logContext = new LogContext("test ");
+
     private final List<PartitionInfo> infos = Arrays.asList(
             new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]),
             new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]),
@@ -68,7 +71,7 @@ public class RecordCollectorTest {
 
         final RecordCollectorImpl collector = new RecordCollectorImpl(
                 new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer),
-                "RecordCollectorTest-TestSpecificPartition");
+                "RecordCollectorTest-TestSpecificPartition", new LogContext("RecordCollectorTest-TestSpecificPartition "));
 
         collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer);
         collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer);
@@ -100,7 +103,7 @@ public class RecordCollectorTest {
 
         final RecordCollectorImpl collector = new RecordCollectorImpl(
                 new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer),
-                "RecordCollectorTest-TestStreamPartitioner");
+                "RecordCollectorTest-TestStreamPartitioner", new LogContext("RecordCollectorTest-TestStreamPartitioner "));
 
         collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
         collector.send("topic1", "9", "0", null, stringSerializer, stringSerializer, streamPartitioner);
@@ -135,7 +138,8 @@ public class RecordCollectorTest {
                         return super.send(record, callback);
                     }
                 },
-                "test");
+                "test",
+                logContext);
 
         collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
         final Long offset = collector.offsets().get(new TopicPartition("topic1", 0));
@@ -152,7 +156,8 @@ public class RecordCollectorTest {
                         throw new TimeoutException();
                     }
                 },
-                "test");
+                "test",
+                logContext);
 
         collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
 
@@ -169,7 +174,8 @@ public class RecordCollectorTest {
                         return null;
                     }
                 },
-                "test");
+                "test",
+                logContext);
         collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
         collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
     }
@@ -185,7 +191,8 @@ public class RecordCollectorTest {
                         return null;
                     }
                 },
-                "test");
+                "test",
+                logContext);
         collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
         collector.flush();
     }
@@ -201,7 +208,8 @@ public class RecordCollectorTest {
                         return null;
                     }
                 },
-                "test");
+                "test",
+                logContext);
         collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
         collector.close();
     }
@@ -217,7 +225,8 @@ public class RecordCollectorTest {
                 }
 
             },
-            "test");
+            "test",
+                logContext);
         collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index c6e9eac..c33a9c4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
 import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
@@ -53,7 +54,7 @@ public class RecordQueueTest {
     private final String[] topics = {"topic"};
 
     final MockProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class),
-            new RecordCollectorImpl(null, null));
+            new RecordCollectorImpl(null, null,  new LogContext("record-queue-test ")));
     private final MockSourceNode mockSourceNodeWithMetrics = new MockSourceNode<>(topics, intDeserializer, intDeserializer);
     private final RecordQueue queue = new RecordQueue(new TopicPartition(topics[0], 1),
             mockSourceNodeWithMetrics,

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
index 9da341b..ef99b8a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.test.MockProcessorContext;
@@ -36,7 +37,7 @@ public class SinkNodeTest {
     private final Serializer anySerializer = Serdes.Bytes().serializer();
     private final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
     private final MockProcessorContext context = new MockProcessorContext(anyStateSerde,
-        new RecordCollectorImpl(new MockProducer<byte[], byte[]>(true, anySerializer, anySerializer), null));
+        new RecordCollectorImpl(new MockProducer<byte[], byte[]>(true, anySerializer, anySerializer), null, new LogContext("sinknode-test ")));
     private final SinkNode sink = new SinkNode<>("anyNodeName", "any-output-topic", anySerializer, anySerializer, null);
 
     @Before

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index fd63bd6..7a7b119 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
@@ -133,7 +134,7 @@ public class StandbyTaskTest {
 
     private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
     private final MockRestoreConsumer restoreStateConsumer = new MockRestoreConsumer();
-    private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer, stateRestoreListener);
+    private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer, stateRestoreListener, new LogContext("standby-task-test "));
 
     private final byte[] recordValue = intSerializer.serialize(null, 10);
     private final byte[] recordKey = intSerializer.serialize(null, 1);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
index 8024cd9..2bb5b7b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.junit.Before;
@@ -42,6 +43,7 @@ public class StateConsumerTest {
     private final MockTime time = new MockTime();
     private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
     private final Map<TopicPartition, Long> partitionOffsets = new HashMap<>();
+    private final LogContext logContext = new LogContext("test ");
     private GlobalStreamThread.StateConsumer stateConsumer;
     private StateMaintainerStub stateMaintainer;
 
@@ -50,7 +52,7 @@ public class StateConsumerTest {
         partitionOffsets.put(topicOne, 20L);
         partitionOffsets.put(topicTwo, 30L);
         stateMaintainer = new StateMaintainerStub(partitionOffsets);
-        stateConsumer = new GlobalStreamThread.StateConsumer("test", consumer, stateMaintainer, time, 10L, FLUSH_INTERVAL);
+        stateConsumer = new GlobalStreamThread.StateConsumer(logContext, consumer, stateMaintainer, time, 10L, FLUSH_INTERVAL);
     }
 
     @Test
@@ -107,7 +109,7 @@ public class StateConsumerTest {
 
     @Test
     public void shouldNotFlushWhenFlushIntervalIsZero() {
-        stateConsumer = new GlobalStreamThread.StateConsumer("test", consumer, stateMaintainer, time, 10L, -1);
+        stateConsumer = new GlobalStreamThread.StateConsumer(logContext, consumer, stateMaintainer, time, 10L, -1);
         stateConsumer.initialize();
         time.sleep(100);
         stateConsumer.pollAndUpdate();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index c574bbc..5a3fa69 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.StateRestoreListener;
@@ -51,8 +52,9 @@ public class StoreChangelogReaderTest {
     private final CompositeRestoreListener restoreListener = new CompositeRestoreListener(callback);
     private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
     private final StateRestoreListener stateRestoreListener = new MockStateRestoreListener();
-    private final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, stateRestoreListener);
     private final TopicPartition topicPartition = new TopicPartition("topic", 0);
+    private final LogContext logContext = new LogContext("test-reader ");
+    private final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, stateRestoreListener, logContext);
 
     @Before
     public void setUp() {
@@ -70,7 +72,7 @@ public class StoreChangelogReaderTest {
             }
         };
 
-        final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, stateRestoreListener);
+        final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, stateRestoreListener, logContext);
         changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true,
                 "storeName"));
         changelogReader.restore();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 2b719d1..91dd422 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
@@ -114,7 +115,7 @@ public class StreamTaskTest {
     private final MockProducer<byte[], byte[]> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer);
     private final MockConsumer<byte[], byte[]> restoreStateConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
     private final StateRestoreListener stateRestoreListener = new MockStateRestoreListener();
-    private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer, stateRestoreListener);
+    private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer, stateRestoreListener, new LogContext("stream-task-test "));
     private final byte[] recordValue = intSerializer.serialize(null, 10);
     private final byte[] recordKey = intSerializer.serialize(null, 1);
     private final String applicationId = "applicationId";
@@ -124,7 +125,7 @@ public class StreamTaskTest {
     private final MockTime time = new MockTime();
     private File baseDir = TestUtils.tempDirectory();
     private StateDirectory stateDirectory;
-    private final RecordCollectorImpl recordCollector = new RecordCollectorImpl(producer, "taskId");
+    private final RecordCollectorImpl recordCollector = new RecordCollectorImpl(producer, "taskId", new LogContext("taskId "));
     private StreamsConfig config;
     private StreamsConfig eosConfig;
     private StreamTask task;
@@ -551,7 +552,7 @@ public class StreamTaskTest {
             changelogReader, config, streamsMetrics, stateDirectory, null, time, producer) {
 
             @Override
-            RecordCollector createRecordCollector() {
+            RecordCollector createRecordCollector(final LogContext logContext) {
                 return new NoOpRecordCollector() {
                     @Override
                     public void flush() {
@@ -605,7 +606,7 @@ public class StreamTaskTest {
             changelogReader, config, streamsMetrics, stateDirectory, null, time, producer) {
 
             @Override
-            RecordCollector createRecordCollector() {
+            RecordCollector createRecordCollector(final LogContext logContext) {
                 return new NoOpRecordCollector() {
                     @Override
                     public Map<TopicPartition, Long> offsets() {
@@ -672,7 +673,7 @@ public class StreamTaskTest {
             changelogReader, testConfig, streamsMetrics, stateDirectory, null, time, producer) {
 
             @Override
-            RecordCollector createRecordCollector() {
+            RecordCollector createRecordCollector(final LogContext logContext) {
                 return new NoOpRecordCollector() {
                     @Override
                     public Map<TopicPartition, Long> offsets() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index c6d12c6..bf433da 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -184,7 +185,7 @@ public class KeyValueStoreTestDriver<K, V> {
         final ByteArraySerializer rawSerializer = new ByteArraySerializer();
         final Producer<byte[], byte[]> producer = new MockProducer<>(true, rawSerializer, rawSerializer);
 
-        final RecordCollector recordCollector = new RecordCollectorImpl(producer, "KeyValueStoreTestDriver") {
+        final RecordCollector recordCollector = new RecordCollectorImpl(producer, "KeyValueStoreTestDriver", new LogContext("KeyValueStoreTestDriver ")) {
             @Override
             public <K1, V1> void send(final String topic,
                                       final K1 key,
@@ -226,7 +227,7 @@ public class KeyValueStoreTestDriver<K, V> {
         props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, RocksDBKeyValueStoreTest.TheRocksDbConfigSetter.class);
 
         context = new MockProcessorContext(stateDir, serdes.keySerde(), serdes.valueSerde(), recordCollector, null) {
-            ThreadCache cache = new ThreadCache("testCache", 1024 * 1024L, metrics());
+            ThreadCache cache = new ThreadCache(new LogContext("testCache "), 1024 * 1024L, metrics());
 
             @Override
             public ThreadCache getCache() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index a4a4bd7..73cdb25 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
@@ -69,7 +70,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
         cacheFlushListener = new CacheFlushListenerStub<>();
         store = new CachingKeyValueStore<>(underlyingStore, Serdes.String(), Serdes.String());
         store.setFlushListener(cacheFlushListener);
-        cache = new ThreadCache("testCache", maxCacheSizeBytes, new MockStreamsMetrics(new Metrics()));
+        cache = new ThreadCache(new LogContext("testCache "), maxCacheSizeBytes, new MockStreamsMetrics(new Metrics()));
         context = new MockProcessorContext(null, null, null, (RecordCollector) null, cache);
         topic = "topic";
         context.setRecordContext(new ProcessorRecordContext(10, 0, 0, topic));

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index 14ac52c..db19294 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.Windowed;
@@ -74,7 +75,7 @@ public class CachingSessionStoreTest {
                                                  Serdes.String(),
                                                  Segments.segmentInterval(retention, numSegments)
                                                  );
-        cache = new ThreadCache("testCache", MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
+        cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
         context = new MockProcessorContext(TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache);
         context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, "topic"));
         cachingStore.init(context, cachingStore);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index 2621927..ea2c47e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
@@ -77,7 +78,7 @@ public class CachingWindowStoreTest {
                                                 WINDOW_SIZE,
                                                 Segments.segmentInterval(retention, numSegments));
         cachingStore.setFlushListener(cacheListener);
-        cache = new ThreadCache("testCache", MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
+        cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
         topic = "topic";
         context = new MockProcessorContext(TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache);
         context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, topic));

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index 613dbf6..cf07927 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.test.MockProcessorContext;
@@ -68,7 +69,7 @@ public class ChangeLoggingKeyValueBytesStoreTest {
             Serdes.String(),
             Serdes.Long(),
             collector,
-            new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())));
+            new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
         context.setTime(0);
         store.init(context, store);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java
index 6f502c6..8190fd2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStoreTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -76,7 +77,7 @@ public class ChangeLoggingKeyValueStoreTest {
             Serdes.String(),
             Serdes.Long(),
             collector,
-            new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())));
+            new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
         context.setTime(0);
         store.init(context, store);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java
index 3e579bc..edc0b94 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -39,7 +40,7 @@ public class MergedSortedCacheKeyValueBytesStoreIteratorTest {
     @Before
     public void setUp() throws Exception {
         store = new InMemoryKeyValueStore<>(namespace, Serdes.Bytes(), Serdes.ByteArray());
-        cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics()));
+        cache = new ThreadCache(new LogContext("testCache "), 10000L, new MockStreamsMetrics(new Metrics()));
     }
 
     @Test
@@ -146,7 +147,7 @@ public class MergedSortedCacheKeyValueBytesStoreIteratorTest {
     @Test
     public void shouldPeekNextKey() throws Exception {
         final KeyValueStore<Bytes, byte[]> kv = new InMemoryKeyValueStore<>("one", Serdes.Bytes(), Serdes.ByteArray());
-        final ThreadCache cache = new ThreadCache("testCache", 1000000L, new MockStreamsMetrics(new Metrics()));
+        final ThreadCache cache = new ThreadCache(new LogContext("testCache "), 1000000L, new MockStreamsMetrics(new Metrics()));
         byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}};
         for (int i = 0; i < bytes.length - 1; i += 2) {
             kv.put(Bytes.wrap(bytes[i]), bytes[i]);


Mime
View raw message