From commits-return-3651-apmail-kafka-commits-archive=kafka.apache.org@kafka.apache.org Fri Feb 26 23:00:45 2016 Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8CA0F184EF for ; Fri, 26 Feb 2016 23:00:45 +0000 (UTC) Received: (qmail 298 invoked by uid 500); 26 Feb 2016 23:00:45 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 267 invoked by uid 500); 26 Feb 2016 23:00:45 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 257 invoked by uid 99); 26 Feb 2016 23:00:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Feb 2016 23:00:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0D0F9E0211; Fri, 26 Feb 2016 23:00:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: guozhang@apache.org To: commits@kafka.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: KAFKA-3278: concatenate thread name to clientId when producer and consumers config is created Date: Fri, 26 Feb 2016 23:00:45 +0000 (UTC) Repository: kafka Updated Branches: refs/heads/trunk 4e0ae79d5 -> d4e60b9f5 KAFKA-3278: concatenate thread name to clientId when producer and consumers config is created guozhangwang made the changes as requested, I reverted my original commit and that seems to have closed the other pull request - sorry if that mucks up the process a bit Author: tomdearman Reviewers: Guozhang Wang Closes #978 from tomdearman/KAFKA-3278 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d4e60b9f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d4e60b9f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d4e60b9f Branch: refs/heads/trunk Commit: d4e60b9f59e04e82125b404f173c4fccc949d906 Parents: 4e0ae79 Author: Tom Dearman Authored: Fri Feb 26 15:00:38 2016 -0800 Committer: Guozhang Wang Committed: Fri Feb 26 15:00:38 2016 -0800 ---------------------------------------------------------------------- .../streams/processor/internals/StreamThread.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d4e60b9f/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 7d460e1..7392d9e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -219,22 +219,25 @@ public class StreamThread extends Thread { } private Producer createProducer() { - log.info("Creating producer client for stream thread [" + this.getName() + "]"); - return new KafkaProducer<>(config.getProducerConfigs(this.clientId), + String threadName = this.getName(); + log.info("Creating producer client for stream thread [" + threadName + "]"); + return new KafkaProducer<>(config.getProducerConfigs(this.clientId + "-" + threadName), new ByteArraySerializer(), new ByteArraySerializer()); } private Consumer createConsumer() { - log.info("Creating consumer client for stream thread [" + this.getName() + "]"); - return new KafkaConsumer<>(config.getConsumerConfigs(this, this.jobId, this.clientId), + String threadName = this.getName(); + log.info("Creating consumer client for stream thread [" + threadName + "]"); + return new KafkaConsumer<>(config.getConsumerConfigs(this, this.jobId, this.clientId + "-" + threadName), new ByteArrayDeserializer(), new ByteArrayDeserializer()); } private Consumer createRestoreConsumer() { - log.info("Creating restore consumer client for stream thread [" + this.getName() + "]"); - return new KafkaConsumer<>(config.getRestoreConsumerConfigs(this.clientId), + String threadName = this.getName(); + log.info("Creating restore consumer client for stream thread [" + threadName + "]"); + return new KafkaConsumer<>(config.getRestoreConsumerConfigs(this.clientId + "-" + threadName), new ByteArrayDeserializer(), new ByteArrayDeserializer()); }