kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4023: Add thread id and task id for logging prefix in Streams
Date Tue, 06 Sep 2016 18:38:58 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a960faf5f -> ed639e826


KAFKA-4023: Add thread id and task id for logging prefix in Streams

Author: bbejeck <bbejeck@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1803 from bbejeck/KAFKA-4023_add_thread_id_prefix_for_logging


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ed639e82
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ed639e82
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ed639e82

Branch: refs/heads/trunk
Commit: ed639e8263d9409a9836a55938174122d6ff3ffa
Parents: a960faf
Author: Bill Bejeck <bbejeck@gmail.com>
Authored: Tue Sep 6 11:38:53 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Sep 6 11:38:53 2016 -0700

----------------------------------------------------------------------
 .../internals/ProcessorStateManager.java        | 22 ++++-----
 .../processor/internals/RecordCollector.java    |  7 ++-
 .../processor/internals/StandbyTask.java        |  7 +++
 .../internals/StreamPartitionAssignor.java      | 30 ++++++------
 .../streams/processor/internals/StreamTask.java | 12 ++---
 .../processor/internals/StreamThread.java       | 51 ++++++++++----------
 .../internals/assignment/TaskAssignor.java      | 10 ++--
 .../internals/RecordCollectorTest.java          |  8 +--
 .../internals/assignment/TaskAssignorTest.java  | 24 ++++-----
 .../streams/state/KeyValueStoreTestDriver.java  |  2 +-
 .../state/internals/RocksDBWindowStoreTest.java | 20 ++++----
 .../state/internals/StateStoreTestUtils.java    |  2 +-
 .../state/internals/StoreChangeLoggerTest.java  |  2 +-
 .../apache/kafka/test/KStreamTestDriver.java    |  2 +-
 14 files changed, 105 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ed639e82/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 11c61a9..8aeeb62 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -89,7 +89,7 @@ public class ProcessorStateManager {
         this.sourceStoreToSourceTopic = sourceStoreToSourceTopic;
 
         if (!stateDirectory.lock(taskId, 5)) {
-            throw new IOException("Failed to lock the state directory: " + baseDir.getCanonicalPath());
+            throw new IOException(String.format("task [%s] Failed to lock the state directory: %s", taskId, baseDir.getCanonicalPath()));
         }
 
         // load the checkpoint information
@@ -117,11 +117,11 @@ public class ProcessorStateManager {
     public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) {
 
         if (store.name().equals(CHECKPOINT_FILE_NAME)) {
-            throw new IllegalArgumentException("Illegal store name: " + CHECKPOINT_FILE_NAME);
+            throw new IllegalArgumentException(String.format("task [%s]  Illegal store name: %s", taskId, CHECKPOINT_FILE_NAME));
         }
 
         if (this.stores.containsKey(store.name())) {
-            throw new IllegalArgumentException("Store " + store.name() + " has already been registered.");
+            throw new IllegalArgumentException(String.format("task [%s]  Store %s has already been registered.", taskId, store.name()));
         }
 
         if (loggingEnabled) {
@@ -135,7 +135,7 @@ public class ProcessorStateManager {
         } else if (sourceStoreToSourceTopic != null && sourceStoreToSourceTopic.containsKey(store.name())) {
             topic = sourceStoreToSourceTopic.get(store.name());
         } else {
-            throw new IllegalArgumentException("Store is neither built from source topic, nor has a changelog.");
+            throw new IllegalArgumentException(String.format("task [%s]  Store is neither built from source topic, nor has a changelog.", taskId));
         }
 
         // block until the partition is ready for this state changelog topic or time has elapsed
@@ -153,7 +153,7 @@ public class ProcessorStateManager {
 
             List<PartitionInfo> partitionInfos = restoreConsumer.partitionsFor(topic);
             if (partitionInfos == null) {
-                throw new StreamsException("Could not find partition info for topic: " + topic);
+                throw new StreamsException(String.format("task [%s]  Could not find partition info for topic: %s", taskId, topic));
             }
             for (PartitionInfo partitionInfo : partitionInfos) {
                 if (partitionInfo.partition() == partition) {
@@ -164,7 +164,7 @@ public class ProcessorStateManager {
         } while (partitionNotFound && System.currentTimeMillis() < startTime + waitTime);
 
         if (partitionNotFound)
-            throw new StreamsException("Store " + store.name() + "'s change log (" + topic + ") does not contain partition " + partition);
+            throw new StreamsException(String.format("task [%s]  Store %s's change log (%s) does not contain partition %s", taskId, store.name(), topic, partition));
 
         this.stores.put(store.name(), store);
 
@@ -181,7 +181,7 @@ public class ProcessorStateManager {
 
         // subscribe to the store's partition
         if (!restoreConsumer.subscription().isEmpty()) {
-            throw new IllegalStateException("Restore consumer should have not subscribed to any partitions beforehand");
+            throw new IllegalStateException(String.format("task [%s]  Restore consumer should have not subscribed to any partitions beforehand", taskId));
         }
         TopicPartition storePartition = new TopicPartition(topicName, getPartition(topicName));
         restoreConsumer.assign(Collections.singletonList(storePartition));
@@ -217,7 +217,7 @@ public class ProcessorStateManager {
                 } else if (restoreConsumer.position(storePartition) > endOffset) {
                     // For a logging enabled changelog (no offset limit),
                     // the log end offset should not change while restoring since it is only written by this thread.
-                    throw new IllegalStateException("Log end offset should not change while restoring");
+                    throw new IllegalStateException(String.format("task [%s] Log end offset should not change while restoring", taskId));
                 }
             }
 
@@ -290,7 +290,7 @@ public class ProcessorStateManager {
 
     public void flush() {
         if (!this.stores.isEmpty()) {
-            log.debug("Flushing stores.");
+            log.debug("task [{}] Flushing stores.", taskId);
             for (StateStore store : this.stores.values())
                 store.flush();
         }
@@ -304,9 +304,9 @@ public class ProcessorStateManager {
             // attempting to flush and close the stores, just in case they
             // are not closed by a ProcessorNode yet
             if (!stores.isEmpty()) {
-                log.debug("Closing stores.");
+                log.debug("task [{}] Closing stores.", taskId);
                 for (Map.Entry<String, StateStore> entry : stores.entrySet()) {
-                    log.debug("Closing storage engine {}", entry.getKey());
+                    log.debug("task [{}} Closing storage engine {}", taskId, entry.getKey());
                     entry.getValue().flush();
                     entry.getValue().close();
                 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed639e82/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index fea616f..3b53be7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -49,10 +49,12 @@ public class RecordCollector {
 
     private final Producer<byte[], byte[]> producer;
     private final Map<TopicPartition, Long> offsets;
+    private String streamTaskId = null;
 
-    public RecordCollector(Producer<byte[], byte[]> producer) {
+    public RecordCollector(Producer<byte[], byte[]> producer, String streamTaskId) {
         this.producer = producer;
         this.offsets = new HashMap<>();
+        this.streamTaskId = streamTaskId;
     }
 
     public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
@@ -81,7 +83,8 @@ public class RecordCollector {
                     TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
                     offsets.put(tp, metadata.offset());
                 } else {
-                    log.error("Error sending record to topic {}", topic, exception);
+                    String prefix = String.format("task [%s]", streamTaskId);
+                    log.error("{} Error sending record to topic {}", prefix, topic, exception);
                 }
             }
         });

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed639e82/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 08b4f07..a22bea9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -23,6 +23,8 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.TaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -34,6 +36,7 @@ import java.util.Map;
  */
 public class StandbyTask extends AbstractTask {
 
+    private static final Logger log = LoggerFactory.getLogger(StandbyTask.class);
     private final Map<TopicPartition, Long> checkpointedOffsets;
 
     /**
@@ -58,6 +61,8 @@ public class StandbyTask extends AbstractTask {
                        StreamsMetrics metrics, final StateDirectory stateDirectory) {
         super(id, applicationId, partitions, topology, consumer, restoreConsumer, true, stateDirectory);
 
+        log.info("task [{}] Creating processorContext", id());
+
         // initialize the topology with its own context
         this.processorContext = new StandbyContextImpl(id, applicationId, config, stateMgr, metrics);
 
@@ -81,10 +86,12 @@ public class StandbyTask extends AbstractTask {
      * @return a list of records not consumed
      */
     public List<ConsumerRecord<byte[], byte[]>> update(TopicPartition partition, List<ConsumerRecord<byte[], byte[]>> records) {
+        log.debug("task [{}] updates for partition [{}]", id(), partition);
         return stateMgr.updateStandbyStates(partition, records);
     }
 
     public void commit() {
+        log.debug("task [{}] flushing", id());
         stateMgr.flush();
 
         // reinitialize offset limits

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed639e82/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 09e192d..bb8379c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -117,7 +117,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         }
 
         if (!(o instanceof StreamThread)) {
-            KafkaException ex = new KafkaException(o.getClass().getName() + " is not an instance of " + StreamThread.class.getName());
+            KafkaException ex = new KafkaException(String.format("%s is not an instance of %s", o.getClass().getName(), StreamThread.class.getName()));
             log.error(ex.getMessage(), ex);
             throw ex;
         }
@@ -129,16 +129,16 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         if (userEndPoint != null && !userEndPoint.isEmpty()) {
             final String[] hostPort = userEndPoint.split(":");
             if (hostPort.length != 2) {
-                throw new ConfigException(String.format("Config %s isn't in the correct format. Expected a host:port pair" +
+                throw new ConfigException(String.format("stream-thread [%s] Config %s isn't in the correct format. Expected a host:port pair" +
                                                        " but received %s",
-                                                        StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint));
+                        streamThread.getName(), StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint));
             } else {
                 try {
                     Integer.valueOf(hostPort[1]);
                     this.userEndPointConfig = userEndPoint;
                 } catch (NumberFormatException nfe) {
-                    throw new ConfigException(String.format("Invalid port %s supplied in %s for config %s",
-                                                           hostPort[1], userEndPoint, StreamsConfig.APPLICATION_SERVER_CONFIG));
+                    throw new ConfigException(String.format("stream-thread [%s] Invalid port %s supplied in %s for config %s",
+                            streamThread.getName(), hostPort[1], userEndPoint, StreamsConfig.APPLICATION_SERVER_CONFIG));
                 }
             }
 
@@ -149,7 +149,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                     (String) configs.get(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG),
                     configs.containsKey(StreamsConfig.REPLICATION_FACTOR_CONFIG) ? (Integer) configs.get(StreamsConfig.REPLICATION_FACTOR_CONFIG) : 1);
         } else {
-            log.info("Config '{}' isn't supplied and hence no internal topics will be created.", StreamsConfig.ZOOKEEPER_CONNECT_CONFIG);
+            log.info("stream-thread [{}] Config '{}' isn't supplied and hence no internal topics will be created.",  streamThread.getName(), StreamsConfig.ZOOKEEPER_CONNECT_CONFIG);
         }
     }
 
@@ -189,7 +189,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         Map<TopicPartition, PartitionInfo> partitionInfos = new HashMap<>();
         // if ZK is specified, prepare the internal source topic before calling partition grouper
         if (internalTopicManager != null) {
-            log.debug("Starting to validate internal topics in partition assignor.");
+            log.debug("stream-thread [{}] Starting to validate internal topics in partition assignor.", streamThread.getName());
 
             for (Map.Entry<String, Set<TaskId>> entry : topicToTaskIds.entrySet()) {
                 String topic = entry.getKey();
@@ -220,7 +220,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                     partitionInfos.put(new TopicPartition(partition.topic(), partition.partition()), partition);
             }
 
-            log.info("Completed validating internal topics in partition assignor.");
+            log.info("stream-thread [{}] Completed validating internal topics in partition assignor", streamThread.getName());
         } else {
             List<String> missingTopics = new ArrayList<>();
             for (String topic : topicToTaskIds.keySet()) {
@@ -230,8 +230,8 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                 }
             }
             if (!missingTopics.isEmpty()) {
-                log.warn("Topic {} do not exists but couldn't created as the config '{}' isn't supplied",
-                         missingTopics, StreamsConfig.ZOOKEEPER_CONNECT_CONFIG);
+                log.warn("stream-thread [{}] Topic {} do not exists but couldn't created as the config '{}' isn't supplied",
+                        streamThread.getName(), missingTopics, StreamsConfig.ZOOKEEPER_CONNECT_CONFIG);
 
             }
         }
@@ -389,7 +389,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         }
 
         // assign tasks to clients
-        states = TaskAssignor.assign(states, partitionsForTask.keySet(), numStandbyReplicas);
+        states = TaskAssignor.assign(states, partitionsForTask.keySet(), numStandbyReplicas, streamThread.getName());
 
         final List<AssignmentSupplier> assignmentSuppliers = new ArrayList<>();
 
@@ -523,8 +523,8 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                 taskIds.add(iter.next());
             } else {
                 TaskAssignmentException ex = new TaskAssignmentException(
-                        "failed to find a task id for the partition=" + partition.toString() +
-                        ", partitions=" + partitions.size() + ", assignmentInfo=" + info.toString()
+                        String.format("stream-thread [%s] failed to find a task id for the partition=%s" +
+                        ", partitions=%d, assignmentInfo=%s", streamThread.getName(), partition.toString(), partitions.size(), info.toString())
                 );
                 log.error(ex.getMessage(), ex);
                 throw ex;
@@ -581,14 +581,14 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                 List<PartitionInfo> infos = metadata.partitionsForTopic(topic);
 
                 if (infos == null)
-                    throw new TopologyBuilderException("External source topic not found: " + topic);
+                    throw new TopologyBuilderException(String.format("stream-thread [%s] External source topic not found: %s", streamThread.getName(), topic));
 
                 if (numPartitions == -1) {
                     numPartitions = infos.size();
                 } else if (numPartitions != infos.size()) {
                     String[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]);
                     Arrays.sort(topics);
-                    throw new TopologyBuilderException("Topics not copartitioned: [" + Utils.mkString(Arrays.asList(topics), ",") + "]");
+                    throw new TopologyBuilderException(String.format("stream-thread [%s] Topics not copartitioned: [%s]", streamThread.getName(), Utils.mkString(Arrays.asList(topics), ",")));
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed639e82/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 18b7646..18ca0ee 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -105,9 +105,9 @@ public class StreamTask extends AbstractTask implements Punctuator {
         this.consumedOffsets = new HashMap<>();
 
         // create the record recordCollector that maintains the produced offsets
-        this.recordCollector = new RecordCollector(producer);
+        this.recordCollector = new RecordCollector(producer, id().toString());
 
-        log.info("Creating restoration consumer client for stream task #" + id());
+        log.info("task [{}] Creating restoration consumer client", id());
 
         // initialize the topology with its own context
         this.processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics);
@@ -169,11 +169,11 @@ public class StreamTask extends AbstractTask implements Punctuator {
             this.currNode = recordInfo.node();
             TopicPartition partition = recordInfo.partition();
 
-            log.debug("Start processing one record [{}]", currRecord);
+            log.debug("task [{}] Start processing one record [{}]", id(), currRecord);
 
             this.currNode.process(currRecord.key(), currRecord.value());
 
-            log.debug("Completed processing one record [{}]", currRecord);
+            log.debug("task [{}] Completed processing one record [{}]", id(), currRecord);
 
             // update the consumed offset map after processing is done
             consumedOffsets.put(partition, currRecord.offset());
@@ -222,7 +222,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
     @Override
     public void punctuate(ProcessorNode node, long timestamp) {
         if (currNode != null)
-            throw new IllegalStateException("Current node is not null");
+            throw new IllegalStateException(String.format("task [%s] Current node is not null", id()));
 
         currNode = node;
         currRecord = new StampedRecord(DUMMY_RECORD, timestamp);
@@ -291,7 +291,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
      */
     public void schedule(long interval) {
         if (currNode == null)
-            throw new IllegalStateException("Current node is null");
+            throw new IllegalStateException(String.format("task [%s] Current node is null", id()));
 
         punctuationQueue.schedule(new PunctuationSchedule(currNode, interval));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed639e82/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index c0e54b9..d8f6003 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -162,12 +162,12 @@ public class StreamThread extends Thread {
         // set the producer and consumer clients
         String threadName = getName();
         threadClientId = clientId + "-" + threadName;
-        log.info("Creating producer client for stream thread [{}]", threadName);
+        log.info("stream-thread [{}] Creating producer client", threadName);
         this.producer = clientSupplier.getProducer(config.getProducerConfigs(threadClientId));
-        log.info("Creating consumer client for stream thread [{}]", threadName);
+        log.info("stream-thread [{}] Creating consumer client", threadName);
         this.consumer = clientSupplier.getConsumer(
                 config.getConsumerConfigs(this, applicationId, threadClientId));
-        log.info("Creating restore consumer client for stream thread [{}]", threadName);
+        log.info("stream-thread [{}] Creating restore consumer client", threadName);
         this.restoreConsumer = clientSupplier.getRestoreConsumer(
                 config.getRestoreConsumerConfigs(threadClientId));
 
@@ -210,7 +210,7 @@ public class StreamThread extends Thread {
      */
     @Override
     public void run() {
-        log.info("Starting stream thread [" + this.getName() + "]");
+        log.info("Starting stream thread [{}]", this.getName());
 
         try {
             runLoop();
@@ -220,7 +220,7 @@ public class StreamThread extends Thread {
         } catch (Exception e) {
             // we have caught all Kafka related exceptions, and other runtime exceptions
             // should be due to user application errors
-            log.error("Streams application error during processing in thread [" + this.getName() + "]: ", e);
+            log.error("stream-thread [{}] Streams application error during processing: ", this.getName(),  e);
             throw e;
         } finally {
             shutdown();
@@ -239,7 +239,7 @@ public class StreamThread extends Thread {
     }
 
     private void shutdown() {
-        log.info("Shutting down stream thread [" + this.getName() + "]");
+        log.info("Shutting down stream-thread [{}]", this.getName());
 
         // Exceptions should not prevent this call from going through all shutdown steps
         try {
@@ -258,22 +258,22 @@ public class StreamThread extends Thread {
         try {
             producer.close();
         } catch (Throwable e) {
-            log.error("Failed to close producer in thread [" + this.getName() + "]: ", e);
+            log.error("stream-thread [{}] Failed to close producer: ", this.getName(), e);
         }
         try {
             consumer.close();
         } catch (Throwable e) {
-            log.error("Failed to close consumer in thread [" + this.getName() + "]: ", e);
+            log.error("stream-thread [{}] Failed to close consumer: ", this.getName(), e);
         }
         try {
             restoreConsumer.close();
         } catch (Throwable e) {
-            log.error("Failed to close restore consumer in thread [" + this.getName() + "]: ", e);
+            log.error("stream-thread [{}] Failed to close restore consumer: ", this.getName(), e);
         }
 
         removeStreamTasks();
 
-        log.info("Stream thread shutdown complete [" + this.getName() + "]");
+        log.info("stream-thread [{}] Stream thread shutdown complete", this.getName());
     }
 
     /**
@@ -317,7 +317,7 @@ public class StreamThread extends Thread {
                 lastPoll = time.milliseconds();
 
                 if (rebalanceException != null)
-                    throw new StreamsException("Failed to rebalance", rebalanceException);
+                    throw new StreamsException(String.format("stream-thread [%s] Failed to rebalance", this.getName()), rebalanceException);
 
                 if (!records.isEmpty()) {
                     for (TopicPartition partition : records.partitions()) {
@@ -406,8 +406,8 @@ public class StreamThread extends Thread {
                     StandbyTask task = standbyTasksByPartition.get(partition);
 
                     if (task == null) {
-                        log.error("missing standby task for partition {}", partition);
-                        throw new StreamsException("missing standby task for partition " + partition);
+                        log.error("stream-thread [{}]  missing standby task for partition {} ", this.getName(), partition);
+                        throw new StreamsException(String.format("stream-thread [%s] missing standby task for partition %s", this.getName(), partition));
                     }
 
                     List<ConsumerRecord<byte[], byte[]>> remaining = task.update(partition, records.records(partition));
@@ -422,7 +422,7 @@ public class StreamThread extends Thread {
 
     private boolean stillRunning() {
         if (!running.get()) {
-            log.debug("Shutting down at user request.");
+            log.debug("stream-thread [{}] Shutting down at user request", this.getName());
             return false;
         }
 
@@ -437,7 +437,7 @@ public class StreamThread extends Thread {
                 sensors.punctuateTimeSensor.record(computeLatency());
 
         } catch (KafkaException e) {
-            log.error("Failed to punctuate active task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
+            log.error("stream-thread [{}] Failed to punctuate active task #{}", this.getName(), task.id(), e);
             throw e;
         }
     }
@@ -449,7 +449,7 @@ public class StreamThread extends Thread {
         long now = time.milliseconds();
 
         if (commitTimeMs >= 0 && lastCommitMs + commitTimeMs < now) {
-            log.trace("Committing processor instances because the commit interval has elapsed.");
+            log.trace("stream-thread [{}] Committing processor instances because the commit interval has elapsed", this.getName());
 
             commitAll();
             lastCommitMs = now;
@@ -490,10 +490,10 @@ public class StreamThread extends Thread {
             task.commit();
         } catch (CommitFailedException e) {
             // commit failed. Just log it.
-            log.warn("Failed to commit " + task.getClass().getSimpleName() + " #" + task.id() + " in thread [" + this.getName() + "]: ", e);
+            log.warn("stream-thread [{}] Failed to commit {} #{}", this.getName(), task.getClass().getSimpleName(), task.id(), e);
         } catch (KafkaException e) {
             // commit failed due to an unexpected exception. Log it and rethrow the exception.
-            log.error("Failed to commit " + task.getClass().getSimpleName() + " #" + task.id() + " in thread [" + this.getName() + "]: ", e);
+            log.error("stream-thread [{}] Failed to commit {} #{}", this.getName(), task.getClass().getSimpleName(), task.id(), e);
             throw e;
         }
 
@@ -549,7 +549,7 @@ public class StreamThread extends Thread {
 
     private void addStreamTasks(Collection<TopicPartition> assignment) {
         if (partitionAssignor == null)
-            throw new IllegalStateException("Partition assignor has not been initialized while adding stream tasks: this should not happen.");
+            throw new IllegalStateException(String.format("stream-thread [%s] Partition assignor has not been initialized while adding stream tasks: this should not happen.", this.getName()));
 
         HashMap<TaskId, Set<TopicPartition>> partitionsForTask = new HashMap<>();
 
@@ -577,7 +577,7 @@ public class StreamThread extends Thread {
                 for (TopicPartition partition : partitions)
                     activeTasksByPartition.put(partition, task);
             } catch (StreamsException e) {
-                log.error("Failed to create an active task #" + taskId + " in thread [" + this.getName() + "]: ", e);
+                log.error("stream-thread [{}] Failed to create an active task #{}", this.getName(), taskId, e);
                 throw e;
             }
         }
@@ -595,16 +595,16 @@ public class StreamThread extends Thread {
             activeTasksByPartition.clear();
 
         } catch (Exception e) {
-            log.error("Failed to remove stream tasks in thread [" + this.getName() + "]: ", e);
+            log.error("stream-thread [{}] Failed to remove stream tasks", this.getName(), e);
         }
     }
 
     private void closeOne(AbstractTask task) {
-        log.info("Removing a task {}", task.id());
+        log.info("stream-thread [{}] Removing a task {}", this.getName(), task.id());
         try {
             task.close();
         } catch (StreamsException e) {
-            log.error("Failed to close a " + task.getClass().getSimpleName() + " #" + task.id() + " in thread [" + this.getName() + "]: ", e);
+            log.error("stream-thread [{}] Failed to close a {}  #{}", this.getName(), task.getClass().getSimpleName(), task.id(), e);
         }
         sensors.taskDestructionSensor.record();
     }
@@ -623,7 +623,7 @@ public class StreamThread extends Thread {
 
     private void addStandbyTasks() {
         if (partitionAssignor == null)
-            throw new IllegalStateException("Partition assignor has not been initialized while adding standby tasks: this should not happen.");
+            throw new IllegalStateException(String.format("stream-thread [%s] Partition assignor has not been initialized while adding standby tasks: this should not happen.", this.getName()));
 
         Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>();
 
@@ -669,6 +669,7 @@ public class StreamThread extends Thread {
     public String toString() {
         StringBuilder sb = new StringBuilder("StreamsThread appId:" + this.applicationId + "\n");
         sb.append("\tStreamsThread clientId:" + clientId + "\n");
+        sb.append("\tStreamsThread threadId:" + this.getName() + "\n");
 
         // iterate and print active tasks
         if (activeTasks != null) {
@@ -706,7 +707,7 @@ public class StreamThread extends Thread {
             restoreConsumer.assign(Collections.<TopicPartition>emptyList());
 
         } catch (Exception e) {
-            log.error("Failed to remove standby tasks in thread [" + this.getName() + "]: ", e);
+            log.error("Failed to remove standby tasks in thread [{}]: ", this.getName(), e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed639e82/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
index e246c4b..fadb43f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
@@ -33,23 +33,23 @@ public class TaskAssignor<C, T extends Comparable<T>> {
 
     private static final Logger log = LoggerFactory.getLogger(TaskAssignor.class);
 
-    public static <C, T extends Comparable<T>> Map<C, ClientState<T>> assign(Map<C, ClientState<T>> states, Set<T> tasks, int numStandbyReplicas) {
+    public static <C, T extends Comparable<T>> Map<C, ClientState<T>> assign(Map<C, ClientState<T>> states, Set<T> tasks, int numStandbyReplicas, String streamThreadId) {
         long seed = 0L;
         for (C client : states.keySet()) {
             seed += client.hashCode();
         }
 
         TaskAssignor<C, T> assignor = new TaskAssignor<>(states, tasks, seed);
-        log.info("Assigning tasks to clients: {}, prevAssignmentBalanced: {}, " +
-            "prevClientsUnchangeed: {}, tasks: {}, replicas: {}",
-            states, assignor.prevAssignmentBalanced, assignor.prevClientsUnchanged,
+        log.info("stream-thread [{}] Assigning tasks to clients: {}, prevAssignmentBalanced: {}, " +
+            "prevClientsUnchanged: {}, tasks: {}, replicas: {}",
+            streamThreadId, states, assignor.prevAssignmentBalanced, assignor.prevClientsUnchanged,
             tasks, numStandbyReplicas);
 
         assignor.assignTasks();
         if (numStandbyReplicas > 0)
             assignor.assignStandbyTasks(numStandbyReplicas);
 
-        log.info("Assigned with: " + assignor.states);
+        log.info("stream-thread [{}] Assigned with: {}", streamThreadId, assignor.states);
         return assignor.states;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed639e82/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index b1a4a02..8d5a549 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -65,8 +65,8 @@ public class RecordCollectorTest {
     public void testSpecificPartition() {
 
         RecordCollector collector = new RecordCollector(
-                new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer)
-        );
+                new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer),
+                "RecordCollectorTest-TestSpecificPartition");
 
         collector.send(new ProducerRecord<>("topic1", 0, "999", "0"), stringSerializer, stringSerializer);
         collector.send(new ProducerRecord<>("topic1", 0, "999", "0"), stringSerializer, stringSerializer);
@@ -97,8 +97,8 @@ public class RecordCollectorTest {
     public void testStreamPartitioner() {
 
         RecordCollector collector = new RecordCollector(
-                new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer)
-        );
+                new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer),
+                "RecordCollectorTest-TestStreamPartitioner");
 
         collector.send(new ProducerRecord<>("topic1", "3", "0"), stringSerializer, stringSerializer, streamPartitioner);
         collector.send(new ProducerRecord<>("topic1", "9", "0"), stringSerializer, stringSerializer, streamPartitioner);

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed639e82/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorTest.java
index 28364ab..4333087 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorTest.java
@@ -46,7 +46,7 @@ public class TaskAssignorTest {
 
         // # of clients and # of tasks are equal.
         tasks = mkSet(0, 1, 2, 3, 4, 5);
-        assignments = TaskAssignor.assign(states, tasks, 0);
+        assignments = TaskAssignor.assign(states, tasks, 0, "TaskAssignorTest-TestAssignWithoutStandby");
         numActiveTasks = 0;
         numAssignedTasks = 0;
         for (ClientState<Integer> assignment : assignments.values()) {
@@ -60,7 +60,7 @@ public class TaskAssignorTest {
 
         // # of clients < # of tasks
         tasks = mkSet(0, 1, 2, 3, 4, 5, 6, 7);
-        assignments = TaskAssignor.assign(states, tasks, 0);
+        assignments = TaskAssignor.assign(states, tasks, 0, "TaskAssignorTest-TestAssignWithoutStandby");
         numActiveTasks = 0;
         numAssignedTasks = 0;
         for (ClientState<Integer> assignment : assignments.values()) {
@@ -76,7 +76,7 @@ public class TaskAssignorTest {
 
         // # of clients > # of tasks
         tasks = mkSet(0, 1, 2, 3);
-        assignments = TaskAssignor.assign(states, tasks, 0);
+        assignments = TaskAssignor.assign(states, tasks, 0, "TaskAssignorTest-TestAssignWithoutStandby");
         numActiveTasks = 0;
         numAssignedTasks = 0;
         for (ClientState<Integer> assignment : assignments.values()) {
@@ -108,7 +108,7 @@ public class TaskAssignorTest {
         // 1 standby replicas.
         numActiveTasks = 0;
         numAssignedTasks = 0;
-        assignments = TaskAssignor.assign(states, tasks, 1);
+        assignments = TaskAssignor.assign(states, tasks, 1, "TaskAssignorTest-TestAssignWithStandby");
         for (ClientState<Integer> assignment : assignments.values()) {
             numActiveTasks += assignment.activeTasks.size();
             numAssignedTasks += assignment.assignedTasks.size();
@@ -122,7 +122,7 @@ public class TaskAssignorTest {
         tasks = mkSet(0, 1, 2, 3, 4, 5, 6, 7);
 
         // 1 standby replicas.
-        assignments = TaskAssignor.assign(states, tasks, 1);
+        assignments = TaskAssignor.assign(states, tasks, 1, "TaskAssignorTest-TestAssignWithStandby");
         numActiveTasks = 0;
         numAssignedTasks = 0;
         for (ClientState<Integer> assignment : assignments.values()) {
@@ -140,7 +140,7 @@ public class TaskAssignorTest {
         tasks = mkSet(0, 1, 2, 3);
 
         // 1 standby replicas.
-        assignments = TaskAssignor.assign(states, tasks, 1);
+        assignments = TaskAssignor.assign(states, tasks, 1, "TaskAssignorTest-TestAssignWithStandby");
         numActiveTasks = 0;
         numAssignedTasks = 0;
         for (ClientState<Integer> assignment : assignments.values()) {
@@ -158,7 +158,7 @@ public class TaskAssignorTest {
         tasks = mkSet(0, 1);
 
         // 1 standby replicas.
-        assignments = TaskAssignor.assign(states, tasks, 1);
+        assignments = TaskAssignor.assign(states, tasks, 1, "TaskAssignorTest-TestAssignWithStandby");
         numActiveTasks = 0;
         numAssignedTasks = 0;
         for (ClientState<Integer> assignment : assignments.values()) {
@@ -173,7 +173,7 @@ public class TaskAssignorTest {
         assertEquals(tasks.size() * 2, numAssignedTasks);
 
         // 2 standby replicas.
-        assignments = TaskAssignor.assign(states, tasks, 2);
+        assignments = TaskAssignor.assign(states, tasks, 2, "TaskAssignorTest-TestAssignWithStandby");
         numActiveTasks = 0;
         numAssignedTasks = 0;
         for (ClientState<Integer> assignment : assignments.values()) {
@@ -187,7 +187,7 @@ public class TaskAssignorTest {
         assertEquals(tasks.size() * 3, numAssignedTasks);
 
         // 3 standby replicas.
-        assignments = TaskAssignor.assign(states, tasks, 3);
+        assignments = TaskAssignor.assign(states, tasks, 3, "TaskAssignorTest-TestAssignWithStandby");
         numActiveTasks = 0;
         numAssignedTasks = 0;
         for (ClientState<Integer> assignment : assignments.values()) {
@@ -220,7 +220,7 @@ public class TaskAssignorTest {
             state.prevAssignedTasks.add(task);
             states.put(i++, state);
         }
-        assignments = TaskAssignor.assign(states, mkSet(0, 1, 2, 3, 4, 5), 0);
+        assignments = TaskAssignor.assign(states, mkSet(0, 1, 2, 3, 4, 5), 0, "TaskAssignorTest-TestStickiness");
         for (int client : states.keySet()) {
             Set<Integer> oldActive = states.get(client).prevActiveTasks;
             Set<Integer> oldAssigned = states.get(client).prevAssignedTasks;
@@ -244,7 +244,7 @@ public class TaskAssignorTest {
             }
             states.put(i++, state);
         }
-        assignments = TaskAssignor.assign(states, mkSet(0, 1, 2, 3), 0);
+        assignments = TaskAssignor.assign(states, mkSet(0, 1, 2, 3), 0, "TaskAssignorTest-TestStickiness");
         for (int client : states.keySet()) {
             Set<Integer> oldActive = states.get(client).prevActiveTasks;
             Set<Integer> oldAssigned = states.get(client).prevAssignedTasks;
@@ -266,7 +266,7 @@ public class TaskAssignorTest {
             state.prevAssignedTasks.addAll(taskSet);
             states.put(i++, state);
         }
-        assignments = TaskAssignor.assign(states, mkSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), 0);
+        assignments = TaskAssignor.assign(states, mkSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), 0, "TaskAssignorTest-TestStickiness");
         for (int client : states.keySet()) {
             Set<Integer> oldActive = states.get(client).prevActiveTasks;
             Set<Integer> oldAssigned = states.get(client).prevAssignedTasks;

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed639e82/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 5519ab4..140ea35 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -195,7 +195,7 @@ public class KeyValueStoreTestDriver<K, V> {
         ByteArraySerializer rawSerializer = new ByteArraySerializer();
         Producer<byte[], byte[]> producer = new MockProducer<>(true, rawSerializer, rawSerializer);
 
-        this.recordCollector = new RecordCollector(producer) {
+        this.recordCollector = new RecordCollector(producer, "KeyValueStoreTestDriver") {
             @SuppressWarnings("unchecked")
             @Override
             public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed639e82/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 9a6a260..84c0320 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -75,7 +75,7 @@ public class RocksDBWindowStoreTest {
     public void shouldOnlyIterateOpenSegments() throws Exception {
         final File baseDir = TestUtils.tempDirectory();
         Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
-        RecordCollector recordCollector = new RecordCollector(producer) {
+        RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-ShouldOnlyIterateOpenSegments") {
             @Override
             public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
             }
@@ -116,7 +116,7 @@ public class RocksDBWindowStoreTest {
         try {
             final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
-            RecordCollector recordCollector = new RecordCollector(producer) {
+            RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestPutAndFetch") {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                     changeLog.add(new KeyValue<>(
@@ -212,7 +212,7 @@ public class RocksDBWindowStoreTest {
         try {
             final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
-            RecordCollector recordCollector = new RecordCollector(producer) {
+            RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestPutAndFetchBefore") {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                     changeLog.add(new KeyValue<>(
@@ -308,7 +308,7 @@ public class RocksDBWindowStoreTest {
         try {
             final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
-            RecordCollector recordCollector = new RecordCollector(producer) {
+            RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestPutAndFetchAfter") {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                     changeLog.add(new KeyValue<>(
@@ -404,7 +404,7 @@ public class RocksDBWindowStoreTest {
         try {
             final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
-            RecordCollector recordCollector = new RecordCollector(producer) {
+            RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestPutSameKeyTimestamp") {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                     changeLog.add(new KeyValue<>(
@@ -463,7 +463,7 @@ public class RocksDBWindowStoreTest {
         try {
             final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
-            RecordCollector recordCollector = new RecordCollector(producer) {
+            RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestRolling") {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                     changeLog.add(new KeyValue<>(
@@ -580,7 +580,7 @@ public class RocksDBWindowStoreTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
-            RecordCollector recordCollector = new RecordCollector(producer) {
+            RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestRestore") {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                     changeLog.add(new KeyValue<>(
@@ -629,7 +629,7 @@ public class RocksDBWindowStoreTest {
         File baseDir2 = Files.createTempDirectory("test").toFile();
         try {
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
-            RecordCollector recordCollector = new RecordCollector(producer) {
+            RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestRestoreII") {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                     changeLog.add(new KeyValue<>(
@@ -684,7 +684,7 @@ public class RocksDBWindowStoreTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
-            RecordCollector recordCollector = new RecordCollector(producer) {
+            RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestSegmentMaintenance") {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                     // do nothing
@@ -787,7 +787,7 @@ public class RocksDBWindowStoreTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
-            RecordCollector recordCollector = new RecordCollector(producer) {
+            RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestInitialLoading") {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                     // do nothing

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed639e82/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
index e30c7ae..ec5d841 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
@@ -45,7 +45,7 @@ public class StateStoreTestUtils {
 
     static class NoOpRecordCollector extends RecordCollector {
         public NoOpRecordCollector() {
-            super(null);
+            super(null, "StateStoreTestUtils");
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed639e82/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index 09f12fb..19cd8e9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -42,7 +42,7 @@ public class StoreChangeLoggerTest {
     private final Map<Integer, String> written = new HashMap<>();
 
     private final ProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, String.class),
-            new RecordCollector(null) {
+            new RecordCollector(null, "StoreChangeLoggerTest") {
                 @SuppressWarnings("unchecked")
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed639e82/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 3901d3a..ccc9cb1 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -203,7 +203,7 @@ public class KStreamTestDriver {
 
     private class MockRecordCollector extends RecordCollector {
         public MockRecordCollector() {
-            super(null);
+            super(null, "KStreamTestDriver");
         }
 
         @Override


Mime
View raw message