kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4722: Add application.id to StreamThread name
Date Wed, 08 Mar 2017 19:40:43 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk c6bccddb9 -> 29084a9b2


KAFKA-4722: Add application.id to StreamThread name

Add application.id to StreamThread name

Author: sharad.develop <sharad.develop@gmail.com>

Reviewers: Matthias J. Sax, Guozhang Wang

Closes #2617 from sharad-develop/KAFKA-4722


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/29084a9b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/29084a9b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/29084a9b

Branch: refs/heads/trunk
Commit: 29084a9b27b959532813141591ce03be2921e860
Parents: c6bccdd
Author: sharad.develop <sharad.develop@gmail.com>
Authored: Wed Mar 8 11:40:40 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Mar 8 11:40:40 2017 -0800

----------------------------------------------------------------------
 .../kafka/streams/processor/internals/StreamThread.java     | 9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/29084a9b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index d447824..9a2c3fa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -236,9 +236,8 @@ public class StreamThread extends Thread {
                         Time time,
                         StreamsMetadataState streamsMetadataState,
                         final long cacheSizeBytes) {
-        super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
+        super(clientId + "-StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
         this.applicationId = applicationId;
-        String threadName = getName();
         this.config = config;
         this.builder = builder;
         this.sourceTopicPattern = builder.sourceTopicPattern();
@@ -246,16 +245,16 @@ public class StreamThread extends Thread {
         this.processId = processId;
         this.partitionGrouper = config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG,
PartitionGrouper.class);
         this.streamsMetadataState = streamsMetadataState;
-        threadClientId = clientId + "-" + threadName;
+        threadClientId = getName();
         this.streamsMetrics = new StreamsMetricsThreadImpl(metrics, "stream-metrics", "thread."
+ threadClientId,
             Collections.singletonMap("client-id", threadClientId));
         if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) < 0) {
-            log.warn("Negative cache size passed in thread [{}]. Reverting to cache size
of 0 bytes.", threadName);
+            log.warn("Negative cache size passed in thread [{}]. Reverting to cache size
of 0 bytes.", threadClientId);
         }
         this.cache = new ThreadCache(threadClientId, cacheSizeBytes, this.streamsMetrics);
 
 
-        this.logPrefix = String.format("stream-thread [%s]", threadName);
+        this.logPrefix = String.format("stream-thread [%s]", threadClientId);
 
         // set the producer and consumer clients
         log.info("{} Creating producer client", logPrefix);


Mime
View raw message