kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5763; Use LogContext in NetworkClient, Selector and broker
Date Mon, 11 Sep 2017 14:38:45 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 779714c08 -> 9bb06c3ee


KAFKA-5763; Use LogContext in NetworkClient, Selector and broker

Author: Andrey Dyachkov <andrey.dyachkov@gmail.com>

Reviewers: Jason Gustafson <jason@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #3761 from adyach/kafka-5763


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9bb06c3e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9bb06c3e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9bb06c3e

Branch: refs/heads/trunk
Commit: 9bb06c3eef3ebdff77e5e3b98c26b86c40efc56c
Parents: 779714c
Author: Andrey Dyachkov <andrey.dyachkov@gmail.com>
Authored: Mon Sep 11 15:37:39 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Mon Sep 11 15:37:46 2017 +0100

----------------------------------------------------------------------
 .../org/apache/kafka/clients/NetworkClient.java | 24 +++++----
 .../kafka/clients/admin/KafkaAdminClient.java   | 19 ++++---
 .../kafka/clients/consumer/KafkaConsumer.java   |  5 +-
 .../kafka/clients/producer/KafkaProducer.java   |  5 +-
 .../apache/kafka/common/network/Selector.java   | 35 +++++++------
 .../apache/kafka/common/utils/LogContext.java   |  4 ++
 .../apache/kafka/clients/NetworkClientTest.java |  7 +--
 .../clients/consumer/internals/FetcherTest.java |  2 +-
 .../clients/producer/internals/SenderTest.java  |  2 +-
 .../kafka/common/network/NetworkTestUtils.java  |  3 +-
 .../kafka/common/network/NioEchoServer.java     | 19 +++----
 .../kafka/common/network/SelectorTest.java      | 20 ++++----
 .../kafka/common/network/SslSelectorTest.java   |  7 +--
 .../common/network/SslTransportLayerTest.java   |  9 ++--
 .../runtime/distributed/WorkerGroupMember.java  |  5 +-
 .../main/scala/kafka/admin/AdminClient.scala    |  6 ++-
 .../controller/ControllerChannelManager.scala   | 11 ++--
 .../transaction/TransactionCoordinator.scala    | 18 ++++---
 .../TransactionMarkerChannelManager.scala       | 14 ++---
 .../main/scala/kafka/network/SocketServer.scala | 14 +++--
 .../main/scala/kafka/server/KafkaServer.scala   | 17 +++---
 .../server/ReplicaFetcherBlockingSend.scala     | 11 ++--
 .../kafka/server/ReplicaFetcherThread.scala     | 54 +++++++++++---------
 .../TransactionCoordinatorTest.scala            |  5 +-
 .../unit/kafka/network/SocketServerTest.scala   | 11 ++--
 .../epoch/LeaderEpochIntegrationTest.scala      |  4 +-
 .../processor/internals/StreamsKafkaClient.java | 15 ++++--
 27 files changed, 206 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9bb06c3e/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 897cca5..31b8edb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -35,10 +35,10 @@ import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.requests.ResponseHeader;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -59,7 +59,7 @@ import java.util.Random;
  */
 public class NetworkClient implements KafkaClient {
 
-    private static final Logger log = LoggerFactory.getLogger(NetworkClient.class);
+    private final Logger log;
 
     /* the selector used to perform network i/o */
     private final Selectable selector;
@@ -118,11 +118,12 @@ public class NetworkClient implements KafkaClient {
                          int requestTimeoutMs,
                          Time time,
                          boolean discoverBrokerVersions,
-                         ApiVersions apiVersions) {
+                         ApiVersions apiVersions,
+                         LogContext logContext) {
         this(null, metadata, selector, clientId, maxInFlightRequestsPerConnection,
              reconnectBackoffMs, reconnectBackoffMax,
              socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time,
-             discoverBrokerVersions, apiVersions, null);
+             discoverBrokerVersions, apiVersions, null, logContext);
     }
 
     public NetworkClient(Selectable selector,
@@ -137,12 +138,12 @@ public class NetworkClient implements KafkaClient {
             Time time,
             boolean discoverBrokerVersions,
             ApiVersions apiVersions,
-            Sensor throttleTimeSensor) {
-
+            Sensor throttleTimeSensor,
+            LogContext logContext) {
         this(null, metadata, selector, clientId, maxInFlightRequestsPerConnection,
              reconnectBackoffMs, reconnectBackoffMax,
              socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time,
-             discoverBrokerVersions, apiVersions, throttleTimeSensor);
+             discoverBrokerVersions, apiVersions, throttleTimeSensor, logContext);
     }
 
     public NetworkClient(Selectable selector,
@@ -156,11 +157,12 @@ public class NetworkClient implements KafkaClient {
                          int requestTimeoutMs,
                          Time time,
                          boolean discoverBrokerVersions,
-                         ApiVersions apiVersions) {
+                         ApiVersions apiVersions,
+                         LogContext logContext) {
         this(metadataUpdater, null, selector, clientId, maxInFlightRequestsPerConnection,
              reconnectBackoffMs, reconnectBackoffMax,
              socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time,
-             discoverBrokerVersions, apiVersions, null);
+             discoverBrokerVersions, apiVersions, null, logContext);
     }
 
     private NetworkClient(MetadataUpdater metadataUpdater,
@@ -176,7 +178,8 @@ public class NetworkClient implements KafkaClient {
                           Time time,
                           boolean discoverBrokerVersions,
                           ApiVersions apiVersions,
-                          Sensor throttleTimeSensor) {
+                          Sensor throttleTimeSensor,
+                          LogContext logContext) {
         /* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not
          * possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the
          * super constructor is invoked.
@@ -202,6 +205,7 @@ public class NetworkClient implements KafkaClient {
         this.discoverBrokerVersions = discoverBrokerVersions;
         this.apiVersions = apiVersions;
         this.throttleTimeSensor = throttleTimeSensor;
+        this.log = logContext.logger(NetworkClient.class);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bb06c3e/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 88d99c3..20db6b7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -289,6 +289,7 @@ public class KafkaAdminClient extends AdminClient {
         ChannelBuilder channelBuilder = null;
         Selector selector = null;
         ApiVersions apiVersions = new ApiVersions();
+        LogContext logContext = createLogContext(clientId);
 
         try {
             // Since we only request node information, it's safe to pass true for allowAutoTopicCreation (and it
@@ -307,7 +308,7 @@ public class KafkaAdminClient extends AdminClient {
             String metricGrpPrefix = "admin-client";
             channelBuilder = ClientUtils.createChannelBuilder(config);
             selector = new Selector(config.getLong(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
-                    metrics, time, metricGrpPrefix, channelBuilder);
+                    metrics, time, metricGrpPrefix, channelBuilder, logContext);
             networkClient = new NetworkClient(
                 selector,
                 metadata,
@@ -320,9 +321,10 @@ public class KafkaAdminClient extends AdminClient {
                 (int) TimeUnit.HOURS.toMillis(1),
                 time,
                 true,
-                apiVersions);
+                apiVersions,
+                logContext);
             return new KafkaAdminClient(config, clientId, time, metadata, metrics, networkClient,
-                timeoutProcessorFactory);
+                timeoutProcessorFactory, logContext);
         } catch (Throwable exc) {
             closeQuietly(metrics, "Metrics");
             closeQuietly(networkClient, "NetworkClient");
@@ -338,18 +340,23 @@ public class KafkaAdminClient extends AdminClient {
 
         try {
             metrics = new Metrics(new MetricConfig(), new LinkedList<MetricsReporter>(), time);
-            return new KafkaAdminClient(config, clientId, time, metadata, metrics, client, null);
+            return new KafkaAdminClient(config, clientId, time, metadata, metrics, client, null,
+                    createLogContext(clientId));
         } catch (Throwable exc) {
             closeQuietly(metrics, "Metrics");
             throw new KafkaException("Failed create new KafkaAdminClient", exc);
         }
     }
 
+    private static LogContext createLogContext(String clientId) {
+        return new LogContext("[AdminClient clientId=" + clientId + "] ");
+    }
+
     private KafkaAdminClient(AdminClientConfig config, String clientId, Time time, Metadata metadata,
-                     Metrics metrics, KafkaClient client, TimeoutProcessorFactory timeoutProcessorFactory) {
+                     Metrics metrics, KafkaClient client, TimeoutProcessorFactory timeoutProcessorFactory,
+                     LogContext logContext) {
         this.defaultTimeoutMs = config.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG);
         this.clientId = clientId;
-        LogContext logContext = new LogContext("[AdminClient clientId=" + clientId + "] ");
         this.log = logContext.logger(KafkaAdminClient.class);
         this.time = time;
         this.metadata = metadata;

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bb06c3e/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 1cdb132..5f63af5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -704,7 +704,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, metricsRegistry.fetcherMetrics);
 
             NetworkClient netClient = new NetworkClient(
-                    new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder),
+                    new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder, logContext),
                     this.metadata,
                     clientId,
                     100, // a fixed large enough value will suffice for max in-flight requests
@@ -716,7 +716,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     time,
                     true,
                     new ApiVersions(),
-                    throttleTimeSensor);
+                    throttleTimeSensor,
+                    logContext);
             this.client = new ConsumerNetworkClient(
                     logContext,
                     netClient,

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bb06c3e/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 8766107..800e98b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -385,7 +385,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             Sensor throttleTimeSensor = Sender.throttleTimeSensor(metrics, metricsRegistry.senderMetrics);
             NetworkClient client = new NetworkClient(
                     new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
-                            this.metrics, time, "producer", channelBuilder),
+                            this.metrics, time, "producer", channelBuilder, logContext),
                     this.metadata,
                     clientId,
                     maxInflightRequests,
@@ -397,7 +397,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     time,
                     true,
                     apiVersions,
-                    throttleTimeSensor);
+                    throttleTimeSensor,
+                    logContext);
             this.sender = new Sender(logContext,
                     client,
                     this.metadata,

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bb06c3e/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index d321ba5..1e5d969 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -48,9 +48,9 @@ import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Count;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * A nioSelector interface for doing non-blocking multi-connection network I/O.
@@ -84,8 +84,8 @@ import org.slf4j.LoggerFactory;
 public class Selector implements Selectable, AutoCloseable {
 
     public static final long NO_IDLE_TIMEOUT_MS = -1;
-    private static final Logger log = LoggerFactory.getLogger(Selector.class);
 
+    private final Logger log;
     private final java.nio.channels.Selector nioSelector;
     private final Map<String, KafkaChannel> channels;
     private final Set<KafkaChannel> explicitlyMutedChannels;
@@ -113,7 +113,6 @@ public class Selector implements Selectable, AutoCloseable {
 
     /**
      * Create a new nioSelector
-     *
      * @param maxReceiveSize Max size in bytes of a single network receive (use {@link NetworkReceive#UNLIMITED} for no limit)
      * @param connectionMaxIdleMs Max idle connection time (use {@link #NO_IDLE_TIMEOUT_MS} to disable idle timeout)
      * @param metrics Registry for Selector metrics
@@ -122,17 +121,19 @@ public class Selector implements Selectable, AutoCloseable {
      * @param metricTags Additional tags to add to metrics registered by Selector
      * @param metricsPerConnection Whether or not to enable per-connection metrics
      * @param channelBuilder Channel builder for every new connection
+     * @param logContext Context for logging with additional info
      */
     public Selector(int maxReceiveSize,
-                    long connectionMaxIdleMs,
-                    Metrics metrics,
-                    Time time,
-                    String metricGrpPrefix,
-                    Map<String, String> metricTags,
-                    boolean metricsPerConnection,
-                    boolean recordTimePerConnection,
-                    ChannelBuilder channelBuilder,
-                    MemoryPool memoryPool) {
+            long connectionMaxIdleMs,
+            Metrics metrics,
+            Time time,
+            String metricGrpPrefix,
+            Map<String, String> metricTags,
+            boolean metricsPerConnection,
+            boolean recordTimePerConnection,
+            ChannelBuilder channelBuilder,
+            MemoryPool memoryPool,
+            LogContext logContext) {
         try {
             this.nioSelector = java.nio.channels.Selector.open();
         } catch (IOException e) {
@@ -158,6 +159,7 @@ public class Selector implements Selectable, AutoCloseable {
         this.idleExpiryManager = connectionMaxIdleMs < 0 ? null : new IdleExpiryManager(time, connectionMaxIdleMs);
         this.memoryPool = memoryPool;
         this.lowMemThreshold = (long) (0.1 * this.memoryPool.size());
+        this.log = logContext.logger(Selector.class);
     }
 
     public Selector(int maxReceiveSize,
@@ -167,12 +169,13 @@ public class Selector implements Selectable, AutoCloseable {
             String metricGrpPrefix,
             Map<String, String> metricTags,
             boolean metricsPerConnection,
-            ChannelBuilder channelBuilder) {
-        this(maxReceiveSize, connectionMaxIdleMs, metrics, time, metricGrpPrefix, metricTags, metricsPerConnection, false, channelBuilder, MemoryPool.NONE);
+            ChannelBuilder channelBuilder,
+            LogContext logContext) {
+        this(maxReceiveSize, connectionMaxIdleMs, metrics, time, metricGrpPrefix, metricTags, metricsPerConnection, false, channelBuilder, MemoryPool.NONE, logContext);
     }
 
-    public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder) {
-        this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, Collections.<String, String>emptyMap(), true, channelBuilder);
+    public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder, LogContext logContext) {
+        this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, Collections.<String, String>emptyMap(), true, channelBuilder, logContext);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bb06c3e/clients/src/main/java/org/apache/kafka/common/utils/LogContext.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/LogContext.java b/clients/src/main/java/org/apache/kafka/common/utils/LogContext.java
index bbf51fa..8d34ad0 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/LogContext.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/LogContext.java
@@ -43,6 +43,10 @@ public class LogContext {
         return new KafkaLogger(clazz, logPrefix);
     }
 
+    public String logPrefix() {
+        return logPrefix;
+    }
+
     private static class KafkaLogger implements Logger {
         private final Logger logger;
         private final String logPrefix;

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bb06c3e/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 069fb7a..4c3019c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.ProduceRequest;
 import org.apache.kafka.common.requests.ResponseHeader;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.DelayedReceive;
 import org.apache.kafka.test.MockSelector;
@@ -64,19 +65,19 @@ public class NetworkClientTest {
     private NetworkClient createNetworkClient() {
         return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
                 reconnectBackoffMsTest, reconnectBackoffMaxTest,
-                64 * 1024, 64 * 1024, requestTimeoutMs, time, true, new ApiVersions());
+                64 * 1024, 64 * 1024, requestTimeoutMs, time, true, new ApiVersions(), new LogContext());
     }
 
     private NetworkClient createNetworkClientWithStaticNodes() {
         return new NetworkClient(selector, new ManualMetadataUpdater(Arrays.asList(node)),
                 "mock-static", Integer.MAX_VALUE, 0, 0, 64 * 1024, 64 * 1024, requestTimeoutMs,
-                time, true, new ApiVersions());
+                time, true, new ApiVersions(), new LogContext());
     }
 
     private NetworkClient createNetworkClientWithNoVersionDiscovery() {
         return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
                 reconnectBackoffMsTest, reconnectBackoffMaxTest,
-                64 * 1024, 64 * 1024, requestTimeoutMs, time, false, new ApiVersions());
+                64 * 1024, 64 * 1024, requestTimeoutMs, time, false, new ApiVersions(), new LogContext());
     }
 
     @Before

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bb06c3e/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 5fe4369..b2ede15 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1129,7 +1129,7 @@ public class FetcherTest {
         Node node = cluster.nodes().get(0);
         NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
                 1000, 1000, 64 * 1024, 64 * 1024, 1000,
-                time, true, new ApiVersions(), throttleTimeSensor);
+                time, true, new ApiVersions(), throttleTimeSensor, new LogContext());
 
         short apiVersionsResponseVersion = ApiKeys.API_VERSIONS.latestVersion();
         ByteBuffer buffer = ApiVersionsResponse.createApiVersionsResponse(400, RecordBatch.CURRENT_MAGIC_VALUE).serialize(apiVersionsResponseVersion, new ResponseHeader(0));

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bb06c3e/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 096e7c1..85b5ba6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -240,7 +240,7 @@ public class SenderTest {
         Node node = cluster.nodes().get(0);
         NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
                 1000, 1000, 64 * 1024, 64 * 1024, 1000,
-                time, true, new ApiVersions(), throttleTimeSensor);
+                time, true, new ApiVersions(), throttleTimeSensor, new LogContext());
 
         short apiVersionsResponseVersion = ApiKeys.API_VERSIONS.latestVersion();
         ByteBuffer buffer = ApiVersionsResponse.createApiVersionsResponse(400, RecordBatch.CURRENT_MAGIC_VALUE).serialize(apiVersionsResponseVersion, new ResponseHeader(0));

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bb06c3e/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java b/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
index 68089e6..d0aa4c5 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestUtils;
@@ -42,7 +43,7 @@ public class NetworkTestUtils {
     }
 
     public static Selector createSelector(ChannelBuilder channelBuilder) {
-        return new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder);
+        return new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder, new LogContext());
     }
 
     public static void checkClientConnection(Selector selector, String node, int minMessageSize, int messageCount) throws Exception {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bb06c3e/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index fd4ef69..ed84958 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -16,6 +16,15 @@
  */
 package org.apache.kafka.common.network;
 
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.security.authenticator.CredentialCache;
+import org.apache.kafka.common.security.scram.ScramCredentialUtils;
+import org.apache.kafka.common.security.scram.ScramMechanism;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
@@ -28,14 +37,6 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.protocol.SecurityProtocol;
-import org.apache.kafka.common.security.authenticator.CredentialCache;
-import org.apache.kafka.common.security.scram.ScramCredentialUtils;
-import org.apache.kafka.common.security.scram.ScramMechanism;
-import org.apache.kafka.common.utils.MockTime;
-
 /**
  * Non-blocking EchoServer implementation that uses ChannelBuilder to create channels
  * with the configured security protocol.
@@ -66,7 +67,7 @@ public class NioEchoServer extends Thread {
             ScramCredentialUtils.createCache(credentialCache, ScramMechanism.mechanismNames());
         if (channelBuilder == null)
             channelBuilder = ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialCache);
-        this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder);
+        this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder, new LogContext());
         acceptorThread = new AcceptorThread();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bb06c3e/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index ececff6..8894873 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.memory.SimpleMemoryPool;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -39,13 +40,6 @@ import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -56,6 +50,12 @@ import java.util.Set;
 import static org.easymock.EasyMock.createControl;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 
 /**
@@ -80,7 +80,7 @@ public class SelectorTest {
         this.channelBuilder = new PlaintextChannelBuilder();
         this.channelBuilder.configure(configs);
         this.metrics = new Metrics();
-        this.selector = new Selector(5000, this.metrics, time, "MetricGroup", channelBuilder);
+        this.selector = new Selector(5000, this.metrics, time, "MetricGroup", channelBuilder, new LogContext());
     }
 
     @After
@@ -300,7 +300,7 @@ public class SelectorTest {
             public void close() {
             }
         };
-        Selector selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder);
+        Selector selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder, new LogContext());
         SocketChannel socketChannel = SocketChannel.open();
         socketChannel.configureBlocking(false);
         try {
@@ -413,7 +413,7 @@ public class SelectorTest {
         selector.close();
         MemoryPool pool = new SimpleMemoryPool(900, 900, false, null);
         selector = new Selector(NetworkReceive.UNLIMITED, 5000, metrics, time, "MetricGroup",
-            new HashMap<String, String>(), true, false, channelBuilder, pool);
+            new HashMap<String, String>(), true, false, channelBuilder, pool, new LogContext());
 
         try (ServerSocketChannel ss = ServerSocketChannel.open()) {
             ss.bind(new InetSocketAddress(0));

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bb06c3e/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
index bada4d4..bea8084 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -38,6 +38,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.security.ssl.SslFactory;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.TestSslUtils;
 import org.junit.After;
@@ -65,7 +66,7 @@ public class SslSelectorTest extends SelectorTest {
         this.channelBuilder = new SslChannelBuilder(Mode.CLIENT);
         this.channelBuilder.configure(sslClientConfigs);
         this.metrics = new Metrics();
-        this.selector = new Selector(5000, metrics, time, "MetricGroup", channelBuilder);
+        this.selector = new Selector(5000, metrics, time, "MetricGroup", channelBuilder, new LogContext());
     }
 
     @After
@@ -98,7 +99,7 @@ public class SslSelectorTest extends SelectorTest {
             }
         };
         channelBuilder.configure(sslClientConfigs);
-        Selector selector = new Selector(5000, metrics, time, "MetricGroup2", channelBuilder);
+        Selector selector = new Selector(5000, metrics, time, "MetricGroup2", channelBuilder, new LogContext());
         try {
             int reqs = 500;
             String node = "0";
@@ -183,7 +184,7 @@ public class SslSelectorTest extends SelectorTest {
         channelBuilder = new SslChannelBuilder(Mode.SERVER);
         channelBuilder.configure(sslServerConfigs);
         selector = new Selector(NetworkReceive.UNLIMITED, 5000, metrics, time, "MetricGroup", 
-                new HashMap<String, String>(), true, false, channelBuilder, pool);
+                new HashMap<String, String>(), true, false, channelBuilder, pool, new LogContext());
 
         try (ServerSocketChannel ss = ServerSocketChannel.open()) {
             ss.bind(new InetSocketAddress(0));

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bb06c3e/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index 8338ad7..2ddd349 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -44,6 +44,7 @@ import org.apache.kafka.common.security.TestSecurityConfig;
 import org.apache.kafka.common.security.ssl.SslFactory;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.TestCondition;
@@ -76,7 +77,7 @@ public class SslTransportLayerTest {
         sslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores);
         this.channelBuilder = new SslChannelBuilder(Mode.CLIENT);
         this.channelBuilder.configure(sslClientConfigs);
-        this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder);
+        this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder, new LogContext());
     }
 
     @After
@@ -581,7 +582,7 @@ public class SslTransportLayerTest {
     public void testNetworkThreadTimeRecorded() throws Exception {
         selector.close();
         this.selector = new Selector(NetworkReceive.UNLIMITED, 5000, new Metrics(), Time.SYSTEM,
-                "MetricGroup", new HashMap<String, String>(), false, true, channelBuilder, MemoryPool.NONE);
+                "MetricGroup", new HashMap<String, String>(), false, true, channelBuilder, MemoryPool.NONE, new LogContext());
 
         String node = "0";
         server = createEchoServer(SecurityProtocol.SSL);
@@ -623,7 +624,7 @@ public class SslTransportLayerTest {
         String node = "0";
         server = createEchoServer(securityProtocol);
         clientChannelBuilder.configure(sslClientConfigs);
-        this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", clientChannelBuilder);
+        this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", clientChannelBuilder, new LogContext());
         InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
         selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
 
@@ -672,7 +673,7 @@ public class SslTransportLayerTest {
 
         };
         this.channelBuilder.configure(sslClientConfigs);
-        this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder);
+        this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder, new LogContext());
     }
 
     private NioEchoServer createEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol) throws Exception {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bb06c3e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index 4b7711a..d61a29c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -98,7 +98,7 @@ public class WorkerGroupMember {
             String metricGrpPrefix = "connect";
             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
             NetworkClient netClient = new NetworkClient(
-                    new Selector(config.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder),
+                    new Selector(config.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder, logContext),
                     this.metadata,
                     clientId,
                     100, // a fixed large enough value will suffice
@@ -109,7 +109,8 @@ public class WorkerGroupMember {
                     config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG),
                     time,
                     true,
-                    new ApiVersions());
+                    new ApiVersions(),
+                    logContext);
             this.client = new ConsumerNetworkClient(
                     logContext,
                     netClient,

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bb06c3e/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 8f2e1ba..fadacec 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -452,7 +452,8 @@ object AdminClient {
       metrics,
       time,
       "admin",
-      channelBuilder)
+      channelBuilder,
+      new LogContext())
 
     val networkClient = new NetworkClient(
       selector,
@@ -466,7 +467,8 @@ object AdminClient {
       requestTimeoutMs,
       time,
       true,
-      new ApiVersions)
+      new ApiVersions,
+      new LogContext())
 
     val highLevelClient = new ConsumerNetworkClient(
       new LogContext(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bb06c3e/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 57ff203..b34284f 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -33,7 +33,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
 import org.apache.kafka.common.requests.UpdateMetadataRequest.EndPoint
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.JaasContext
-import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.utils.{LogContext, Time}
 import org.apache.kafka.common.{Node, TopicPartition}
 
 import scala.collection.JavaConverters._
@@ -108,6 +108,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
     val messageQueue = new LinkedBlockingQueue[QueueItem]
     debug("Controller %d trying to connect to broker %d".format(config.brokerId, broker.id))
     val brokerNode = broker.getNode(config.interBrokerListenerName)
+    val logContext = new LogContext(s"[Controller id=${config.brokerId}, targetBrokerId=${brokerNode.idString}] ")
     val networkClient = {
       val channelBuilder = ChannelBuilders.clientChannelBuilder(
         config.interBrokerSecurityProtocol,
@@ -123,9 +124,10 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
         metrics,
         time,
         "controller-channel",
-        Map("broker-id" -> broker.id.toString).asJava,
+        Map("broker-id" -> brokerNode.idString).asJava,
         false,
-        channelBuilder
+        channelBuilder,
+        logContext
       )
       new NetworkClient(
         selector,
@@ -139,7 +141,8 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
         config.requestTimeoutMs,
         time,
         false,
-        new ApiVersions
+        new ApiVersions,
+        logContext
       )
     }
     val threadName = threadNamePrefix match {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bb06c3e/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index e201e91..6b9b7ef 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -27,7 +27,7 @@ import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.requests.TransactionResult
-import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.utils.{LogContext, Time}
 
 object TransactionCoordinator {
 
@@ -52,11 +52,16 @@ object TransactionCoordinator {
 
     val producerIdManager = new ProducerIdManager(config.brokerId, zkUtils)
     // we do not need to turn on reaper thread since no tasks will be expired and there are no completed tasks to be purged
-    val txnMarkerPurgatory = DelayedOperationPurgatory[DelayedTxnMarker]("txn-marker-purgatory", config.brokerId, reaperEnabled = false, timerEnabled = false)
+    val txnMarkerPurgatory = DelayedOperationPurgatory[DelayedTxnMarker]("txn-marker-purgatory", config.brokerId,
+      reaperEnabled = false, timerEnabled = false)
     val txnStateManager = new TransactionStateManager(config.brokerId, zkUtils, scheduler, replicaManager, txnConfig, time)
-    val txnMarkerChannelManager = TransactionMarkerChannelManager(config, metrics, metadataCache, txnStateManager, txnMarkerPurgatory, time)
 
-    new TransactionCoordinator(config.brokerId, scheduler, producerIdManager, txnStateManager, txnMarkerChannelManager, time)
+    val logContext = new LogContext(s"[TransactionCoordinator id=${config.brokerId}] ")
+    val txnMarkerChannelManager = TransactionMarkerChannelManager(config, metrics, metadataCache, txnStateManager,
+      txnMarkerPurgatory, time, logContext)
+
+    new TransactionCoordinator(config.brokerId, scheduler, producerIdManager, txnStateManager, txnMarkerChannelManager,
+      time, logContext)
   }
 
   private def initTransactionError(error: Errors): InitProducerIdResult = {
@@ -81,8 +86,9 @@ class TransactionCoordinator(brokerId: Int,
                              producerIdManager: ProducerIdManager,
                              txnManager: TransactionStateManager,
                              txnMarkerChannelManager: TransactionMarkerChannelManager,
-                             time: Time) extends Logging {
-  this.logIdent = "[Transaction Coordinator " + brokerId + "]: "
+                             time: Time,
+                             logContext: LogContext) extends Logging {
+  this.logIdent = logContext.logPrefix
 
   import TransactionCoordinator._
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bb06c3e/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 9c3ffd9..2e666ed 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -27,12 +27,10 @@ import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network._
 import org.apache.kafka.common.requests.{TransactionResult, WriteTxnMarkersRequest}
 import org.apache.kafka.common.security.JaasContext
-import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.utils.{LogContext, Time}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.WriteTxnMarkersRequest.TxnMarkerEntry
-
 import com.yammer.metrics.core.Gauge
-
 import java.util
 import java.util.concurrent.{BlockingQueue, ConcurrentHashMap, LinkedBlockingQueue}
 
@@ -45,8 +43,8 @@ object TransactionMarkerChannelManager {
             metadataCache: MetadataCache,
             txnStateManager: TransactionStateManager,
             txnMarkerPurgatory: DelayedOperationPurgatory[DelayedTxnMarker],
-            time: Time): TransactionMarkerChannelManager = {
-
+            time: Time,
+            logContext: LogContext): TransactionMarkerChannelManager = {
     val channelBuilder = ChannelBuilders.clientChannelBuilder(
       config.interBrokerSecurityProtocol,
       JaasContext.Type.SERVER,
@@ -63,7 +61,8 @@ object TransactionMarkerChannelManager {
       "txn-marker-channel",
       Map.empty[String, String].asJava,
       false,
-      channelBuilder
+      channelBuilder,
+      logContext
     )
     val networkClient = new NetworkClient(
       selector,
@@ -77,7 +76,8 @@ object TransactionMarkerChannelManager {
       config.requestTimeoutMs,
       time,
       false,
-      new ApiVersions
+      new ApiVersions,
+      logContext
     )
 
     new TransactionMarkerChannelManager(config,

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bb06c3e/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index c93fb55..ffe75a8 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -40,7 +40,7 @@ import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.protocol.types.SchemaException
 import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
-import org.apache.kafka.common.utils.{KafkaThread, Time}
+import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time}
 
 import scala.collection._
 import JavaConverters._
@@ -62,7 +62,8 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
   private val maxConnectionsPerIp = config.maxConnectionsPerIp
   private val maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides
 
-  this.logIdent = "[Socket Server on Broker " + config.brokerId + "], "
+  private val logContext = new LogContext(s"[SocketServer brokerId=${config.brokerId}] ")
+  this.logIdent = logContext.logPrefix
 
   private val memoryPoolSensor = metrics.sensor("MemoryPoolUtilization")
   private val memoryPoolDepletedPercentMetricName = metrics.metricName("MemoryPoolAvgDepletedPercent", "socket-server-metrics")
@@ -166,7 +167,8 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
       config,
       metrics,
       credentialProvider,
-      memoryPool
+      memoryPool,
+      logContext
     )
   }
 
@@ -382,7 +384,8 @@ private[kafka] class Processor(val id: Int,
                                config: KafkaConfig,
                                metrics: Metrics,
                                credentialProvider: CredentialProvider,
-                               memoryPool: MemoryPool) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
+                               memoryPool: MemoryPool,
+                               logContext: LogContext) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
 
   private object ConnectionId {
     def fromString(s: String): Option[ConnectionId] = s.split("-") match {
@@ -430,7 +433,8 @@ private[kafka] class Processor(val id: Int,
     false,
     true,
     channelBuilder,
-    memoryPool)
+    memoryPool,
+    logContext)
 
   // Connection ids have the format `localAddr:localPort-remoteAddr:remotePort-index`. The index is a
   // non-negative incrementing value that ensures that even if remotePort is reused after a connection is

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bb06c3e/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 2689980..69c81e6 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -43,11 +43,11 @@ import org.apache.kafka.common.network._
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledShutdownResponse}
 import org.apache.kafka.common.security.{JaasContext, JaasUtils}
-import org.apache.kafka.common.utils.{AppInfoParser, Time}
+import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
 import org.apache.kafka.common.{ClusterResource, Node}
 
 import scala.collection.JavaConverters._
-import scala.collection.{Seq, Map, mutable}
+import scala.collection.{Map, Seq, mutable}
 
 object KafkaServer {
   // Copy the subset of properties that are relevant to Logs
@@ -101,6 +101,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
   private val jmxPrefix: String = "kafka.server"
 
+  private var logContext: LogContext = null
+
   var metrics: Metrics = null
 
   val brokerState: BrokerState = new BrokerState
@@ -162,7 +164,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
     "yammer-metrics-count",
     new Gauge[Int] {
       def value = {
-        com.yammer.metrics.Metrics.defaultRegistry().allMetrics().size()
+        com.yammer.metrics.Metrics.defaultRegistry.allMetrics.size
       }
     }
   )
@@ -198,7 +200,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         /* generate brokerId */
         val (brokerId, initialOfflineDirs) = getBrokerIdAndOfflineDirs
         config.brokerId = brokerId
-        this.logIdent = "[Kafka Server " + config.brokerId + "], "
+        logContext = new LogContext(s"[KafkaServer id=${config.brokerId}] ")
+        this.logIdent = logContext.logPrefix
 
         /* create and configure metrics */
         val reporters = config.getConfiguredInstances(KafkaConfig.MetricReporterClassesProp, classOf[MetricsReporter],
@@ -381,7 +384,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
           "kafka-server-controlled-shutdown",
           Map.empty.asJava,
           false,
-          channelBuilder
+          channelBuilder,
+          logContext
         )
         new NetworkClient(
           selector,
@@ -395,7 +399,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
           config.requestTimeoutMs,
           time,
           false,
-          new ApiVersions)
+          new ApiVersions,
+          logContext)
       }
 
       var shutdownSucceeded: Boolean = false

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bb06c3e/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
index c24cf0a..0bf2bd3 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
@@ -24,7 +24,7 @@ import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network._
 import org.apache.kafka.common.requests.AbstractRequest
 import org.apache.kafka.common.security.JaasContext
-import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.utils.{LogContext, Time}
 import org.apache.kafka.clients.{ApiVersions, ClientResponse, ManualMetadataUpdater, NetworkClient}
 import org.apache.kafka.common.Node
 import org.apache.kafka.common.requests.AbstractRequest.Builder
@@ -43,7 +43,8 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint,
                                  metrics: Metrics,
                                  time: Time,
                                  fetcherId: Int,
-                                 clientId: String) extends BlockingSend {
+                                 clientId: String,
+                                 logContext: LogContext) extends BlockingSend {
 
   private val sourceNode = new Node(sourceBroker.id, sourceBroker.host, sourceBroker.port)
   private val socketTimeout: Int = brokerConfig.replicaSocketTimeoutMs
@@ -65,7 +66,8 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint,
       "replica-fetcher",
       Map("broker-id" -> sourceBroker.id.toString, "fetcher-id" -> fetcherId.toString).asJava,
       false,
-      channelBuilder
+      channelBuilder,
+      logContext
     )
     new NetworkClient(
       selector,
@@ -79,7 +81,8 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint,
       brokerConfig.requestTimeoutMs,
       time,
       false,
-      new ApiVersions
+      new ApiVersions,
+      logContext
     )
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bb06c3e/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index e856ca1..cf652d6 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.requests.{EpochEndOffset, FetchResponse, ListOffsetRequest, ListOffsetResponse, OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse, FetchRequest => JFetchRequest}
-import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.utils.{LogContext, Time}
 import scala.collection.JavaConverters._
 import scala.collection.{Map, mutable}
 
@@ -54,8 +54,13 @@ class ReplicaFetcherThread(name: String,
   type REQ = FetchRequest
   type PD = PartitionData
 
+  private val replicaId = brokerConfig.brokerId
+  private val logContext = new LogContext(s"[ReplicaFetcher replicaId=$replicaId, leaderId=${sourceBroker.id}, " +
+    s"fetcherId=$fetcherId] ")
+  this.logIdent = logContext.logPrefix
   private val leaderEndpoint = leaderEndpointBlockingSend.getOrElse(
-    new ReplicaFetcherBlockingSend(sourceBroker, brokerConfig, metrics, time, fetcherId, s"broker-${brokerConfig.brokerId}-fetcher-$fetcherId"))
+    new ReplicaFetcherBlockingSend(sourceBroker, brokerConfig, metrics, time, fetcherId,
+      s"broker-$replicaId-fetcher-$fetcherId", logContext))
   private val fetchRequestVersion: Short =
     if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV1) 5
     else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV0) 4
@@ -63,7 +68,6 @@ class ReplicaFetcherThread(name: String,
     else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_0_IV0) 2
     else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_9_0) 1
     else 0
-  private val replicaId = brokerConfig.brokerId
   private val maxWait = brokerConfig.replicaFetchWaitMaxMs
   private val minBytes = brokerConfig.replicaFetchMinBytes
   private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes
@@ -85,17 +89,18 @@ class ReplicaFetcherThread(name: String,
     maybeWarnIfOversizedRecords(records, topicPartition)
 
     if (fetchOffset != replica.logEndOffset.messageOffset)
-      throw new RuntimeException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(topicPartition, fetchOffset, replica.logEndOffset.messageOffset))
+      throw new RuntimeException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(
+        topicPartition, fetchOffset, replica.logEndOffset.messageOffset))
     if (logger.isTraceEnabled)
-      trace("Follower %d has replica log end offset %d for partition %s. Received %d messages and leader hw %d"
-        .format(replica.brokerId, replica.logEndOffset.messageOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark))
+      trace("Follower has replica log end offset %d for partition %s. Received %d messages and leader hw %d"
+        .format(replica.logEndOffset.messageOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark))
 
     // Append the leader's messages to the log
     replica.log.get.appendAsFollower(records)
 
     if (logger.isTraceEnabled)
-      trace("Follower %d has replica log end offset %d after appending %d bytes of messages for partition %s"
-        .format(replica.brokerId, replica.logEndOffset.messageOffset, records.sizeInBytes, topicPartition))
+      trace("Follower has replica log end offset %d after appending %d bytes of messages for partition %s"
+        .format(replica.logEndOffset.messageOffset, records.sizeInBytes, topicPartition))
     val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.highWatermark)
     val leaderLogStartOffset = partitionData.logStartOffset
     // for the follower replica, we do not need to keep
@@ -104,7 +109,7 @@ class ReplicaFetcherThread(name: String,
     replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)
     replica.maybeIncrementLogStartOffset(leaderLogStartOffset)
     if (logger.isTraceEnabled)
-      trace(s"Follower ${replica.brokerId} set replica high watermark for partition $topicPartition to $followerHighWatermark")
+      trace(s"Follower set replica high watermark for partition $topicPartition to $followerHighWatermark")
     if (quota.isThrottled(topicPartition))
       quota.record(records.sizeInBytes)
     replicaMgr.brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes)
@@ -144,14 +149,13 @@ class ReplicaFetcherThread(name: String,
       if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchEntityConfig(replicaMgr.zkUtils,
         ConfigType.Topic, topicPartition.topic)).uncleanLeaderElectionEnable) {
         // Log a fatal error and shutdown the broker to ensure that data loss does not occur unexpectedly.
-        fatal(s"Exiting because log truncation is not allowed for partition $topicPartition, current leader " +
-          s"${sourceBroker.id}'s latest offset $leaderEndOffset is less than replica ${brokerConfig.brokerId}'s latest " +
-          s"offset ${replica.logEndOffset.messageOffset}")
+        fatal(s"Exiting because log truncation is not allowed for partition $topicPartition, current leader's " +
+          s"latest offset $leaderEndOffset is less than replica's latest offset ${replica.logEndOffset.messageOffset}")
         throw new FatalExitError
       }
 
-      warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's latest offset %d"
-        .format(brokerConfig.brokerId, topicPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderEndOffset))
+      warn(s"Reset fetch offset for partition $topicPartition from ${replica.logEndOffset.messageOffset} to current " +
+        s"leader's latest offset $leaderEndOffset")
       replicaMgr.logManager.truncateTo(Map(topicPartition -> leaderEndOffset))
       leaderEndOffset
     } else {
@@ -178,8 +182,8 @@ class ReplicaFetcherThread(name: String,
        *
        */
       val leaderStartOffset: Long = earliestOrLatestOffset(topicPartition, ListOffsetRequest.EARLIEST_TIMESTAMP)
-      warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's start offset %d"
-        .format(brokerConfig.brokerId, topicPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderStartOffset))
+      warn(s"Reset fetch offset for partition $topicPartition from ${replica.logEndOffset.messageOffset} to current " +
+        s"leader's start offset $leaderStartOffset")
       val offsetToFetch = Math.max(leaderStartOffset, replica.logEndOffset.messageOffset)
       // Only truncate log when current leader's log start offset is greater than follower's log end offset.
       if (leaderStartOffset > replica.logEndOffset.messageOffset)
@@ -256,9 +260,9 @@ class ReplicaFetcherThread(name: String,
         partitionsWithError += tp
       } else {
         val truncationOffset =
-          if (epochOffset.endOffset() == UNDEFINED_EPOCH_OFFSET)
+          if (epochOffset.endOffset == UNDEFINED_EPOCH_OFFSET)
             highWatermark(replica, epochOffset)
-          else if (epochOffset.endOffset() >= replica.logEndOffset.messageOffset)
+          else if (epochOffset.endOffset >= replica.logEndOffset.messageOffset)
             logEndOffset(replica, epochOffset)
           else
             epochOffset.endOffset
@@ -279,7 +283,7 @@ class ReplicaFetcherThread(name: String,
       .filter { case (_, state) => state.isTruncatingLog }
       .map { case (tp, _) => tp -> epochCache(tp).latestEpoch }.toMap
 
-    debug(s"Build leaderEpoch request $result for broker $sourceBroker")
+    debug(s"Build leaderEpoch request $result")
 
     result
   }
@@ -292,7 +296,7 @@ class ReplicaFetcherThread(name: String,
       try {
         val response = leaderEndpoint.sendRequest(epochRequest)
         result = response.responseBody.asInstanceOf[OffsetsForLeaderEpochResponse].responses.asScala
-        debug(s"Receive leaderEpoch response $result from broker $sourceBroker")
+        debug(s"Receive leaderEpoch response $result")
       } catch {
         case t: Throwable =>
           warn(s"Error when sending leader epoch request for $partitions", t)
@@ -314,12 +318,14 @@ class ReplicaFetcherThread(name: String,
 
   private def logEndOffset(replica: Replica, epochOffset: EpochEndOffset): Long = {
     val logEndOffset = replica.logEndOffset.messageOffset
-    info(s"Based on follower's leader epoch, leader replied with an offset ${epochOffset.endOffset()} >= the follower's log end offset $logEndOffset in ${replica.topicPartition}. No truncation needed.")
+    info(s"Based on follower's leader epoch, leader replied with an offset ${epochOffset.endOffset} >= the " +
+      s"follower's log end offset $logEndOffset in ${replica.topicPartition}. No truncation needed.")
     logEndOffset
   }
 
   private def highWatermark(replica: Replica, epochOffset: EpochEndOffset): Long = {
-    warn(s"Based on follower's leader epoch, leader replied with an unknown offset in ${replica.topicPartition}. High watermark ${replica.highWatermark.messageOffset} will be used for truncation.")
+    warn(s"Based on follower's leader epoch, leader replied with an unknown offset in ${replica.topicPartition}. " +
+      s"High watermark ${replica.highWatermark.messageOffset} will be used for truncation.")
     replica.highWatermark.messageOffset
   }
 
@@ -336,9 +342,9 @@ class ReplicaFetcherThread(name: String,
 object ReplicaFetcherThread {
 
   private[server] class FetchRequest(val underlying: JFetchRequest.Builder) extends AbstractFetcherThread.FetchRequest {
-    def isEmpty: Boolean = underlying.fetchData().isEmpty
+    def isEmpty: Boolean = underlying.fetchData.isEmpty
     def offset(topicPartition: TopicPartition): Long =
-      underlying.fetchData().asScala(topicPartition).fetchOffset
+      underlying.fetchData.asScala(topicPartition).fetchOffset
     override def toString = underlying.toString
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bb06c3e/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
index 023a7ca..dae52c8 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
@@ -20,7 +20,7 @@ import kafka.utils.MockScheduler
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.TransactionResult
-import org.apache.kafka.common.utils.MockTime
+import org.apache.kafka.common.utils.{LogContext, MockTime}
 import org.easymock.{Capture, EasyMock, IAnswer}
 import org.junit.Assert._
 import org.junit.Test
@@ -52,7 +52,8 @@ class TransactionCoordinatorTest {
     pidManager,
     transactionManager,
     transactionMarkerChannelManager,
-    time)
+    time,
+    new LogContext)
 
   var result: InitProducerIdResult = _
   var error: Errors = Errors.NONE

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bb06c3e/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 227714b..2df37b7 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -38,7 +38,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
 import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
 import org.apache.kafka.common.requests.{AbstractRequest, ProduceRequest, RequestHeader}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.utils.{MockTime, Time}
+import org.apache.kafka.common.utils.{LogContext, MockTime, Time}
 import org.junit.Assert._
 import org.junit._
 import org.scalatest.junit.JUnitSuite
@@ -284,7 +284,7 @@ class SocketServerTest extends JUnitSuite {
       override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
                                 protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
         new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas,
-          config.connectionsMaxIdleMs, listenerName, protocol, config, metrics, credentialProvider, memoryPool) {
+          config.connectionsMaxIdleMs, listenerName, protocol, config, metrics, credentialProvider, memoryPool, new LogContext()) {
           override protected[network] def connectionId(socket: Socket): String = overrideConnectionId
           override protected[network] def createSelector(channelBuilder: ChannelBuilder): Selector = {
            val testableSelector = new TestableSelector(config, channelBuilder, time, metrics)
@@ -518,7 +518,7 @@ class SocketServerTest extends JUnitSuite {
       override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
                                 protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
         new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas,
-          config.connectionsMaxIdleMs, listenerName, protocol, config, metrics, credentialProvider, MemoryPool.NONE) {
+          config.connectionsMaxIdleMs, listenerName, protocol, config, metrics, credentialProvider, MemoryPool.NONE, new LogContext()) {
           override protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send) {
             conn.close()
             super.sendResponse(response, responseSend)
@@ -899,7 +899,8 @@ class SocketServerTest extends JUnitSuite {
     override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
                                 protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
       new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas,
-        config.connectionsMaxIdleMs, listenerName, protocol, config, metrics, credentialProvider, memoryPool) {
+        config.connectionsMaxIdleMs, listenerName, protocol, config, metrics, credentialProvider, memoryPool, new
+            LogContext()) {
 
         override protected[network] def createSelector(channelBuilder: ChannelBuilder): Selector = {
            val testableSelector = new TestableSelector(config, channelBuilder, time, metrics)
@@ -947,7 +948,7 @@ class SocketServerTest extends JUnitSuite {
 
   class TestableSelector(config: KafkaConfig, channelBuilder: ChannelBuilder, time: Time, metrics: Metrics)
         extends Selector(config.socketRequestMaxBytes, config.connectionsMaxIdleMs,
-            metrics, time, "socket-server", new HashMap, false, true, channelBuilder, MemoryPool.NONE) {
+            metrics, time, "socket-server", new HashMap, false, true, channelBuilder, MemoryPool.NONE, new LogContext()) {
 
     val failures = mutable.Map[SelectorOperation, Exception]()
     val operationCounts = mutable.Map[SelectorOperation, Int]().withDefaultValue(0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bb06c3e/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index 9d9b1c7..89c12fe 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors._
 import org.apache.kafka.common.requests.EpochEndOffset._
 import org.apache.kafka.common.serialization.StringSerializer
-import org.apache.kafka.common.utils.SystemTime
+import org.apache.kafka.common.utils.{LogContext, SystemTime}
 import org.apache.kafka.common.TopicPartition
 
 import org.junit.Assert._
@@ -216,7 +216,7 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
 
   private def sender(from: KafkaServer, to: KafkaServer): BlockingSend = {
     val endPoint = from.metadataCache.getAliveBrokers.find(_.id == to.config.brokerId).get.getBrokerEndPoint(from.config.interBrokerListenerName)
-    new ReplicaFetcherBlockingSend(endPoint, from.config, new Metrics(), new SystemTime(), 42, "TestFetcher")
+    new ReplicaFetcherBlockingSend(endPoint, from.config, new Metrics(), new SystemTime(), 42, "TestFetcher", new LogContext())
   }
 
   private def waitForEpochChangeTo(topic: String, partition: Int, epoch: Int): Unit = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bb06c3e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
index b24614f..d725ed8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
@@ -45,6 +45,7 @@ import org.apache.kafka.common.requests.CreateTopicsRequest;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
@@ -128,18 +129,21 @@ public class StreamsKafkaClient {
         final Metrics metrics = new Metrics(metricConfig, reporters, time);
 
         final ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(streamsConfig);
+        final String clientId = streamsConfig.getString(StreamsConfig.CLIENT_ID_CONFIG);
+        final LogContext logContext = createLogContext(clientId);
 
         final Selector selector = new Selector(
                 streamsConfig.getLong(StreamsConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
                 metrics,
                 time,
                 "kafka-client",
-                channelBuilder);
+                channelBuilder,
+                logContext);
 
         final KafkaClient kafkaClient = new NetworkClient(
                 selector,
                 metadata,
-                streamsConfig.getString(StreamsConfig.CLIENT_ID_CONFIG),
+                clientId,
                 MAX_INFLIGHT_REQUESTS, // a fixed large enough value will suffice
                 streamsConfig.getLong(StreamsConfig.RECONNECT_BACKOFF_MS_CONFIG),
                 streamsConfig.getLong(StreamsConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
@@ -148,10 +152,15 @@ public class StreamsKafkaClient {
                 streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG),
                 time,
                 true,
-                new ApiVersions());
+                new ApiVersions(),
+                logContext);
         return new StreamsKafkaClient(streamsConfig, kafkaClient, reporters);
     }
 
+    private static LogContext createLogContext(String clientId) {
+        return new LogContext("[StreamsKafkaClient clientId=" + clientId + "] ");
+    }
+
     public static StreamsKafkaClient create(final StreamsConfig streamsConfig) {
         return create(Config.fromStreamsConfig(streamsConfig));
     }


Mime
View raw message