From commits-return-10657-apmail-kafka-commits-archive=kafka.apache.org@kafka.apache.org Sun Oct 28 04:46:10 2018 Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3F8E818EC6 for ; Sun, 28 Oct 2018 04:46:10 +0000 (UTC) Received: (qmail 17969 invoked by uid 500); 28 Oct 2018 04:46:10 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 17917 invoked by uid 500); 28 Oct 2018 04:46:09 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 17908 invoked by uid 99); 28 Oct 2018 04:46:09 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 28 Oct 2018 04:46:09 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 4ED4382970; Sun, 28 Oct 2018 04:46:09 +0000 (UTC) Date: Sun, 28 Oct 2018 04:46:08 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch trunk updated: MINOR: Remove duplicate `subscribe` call in ConsumerPerformance (#5828) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <154070196774.432.10251437435899498904@gitbox.apache.org> From: ijuma@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/trunk X-Git-Reftype: branch X-Git-Oldrev: 63715efa02861b25b3be84c2970342aebec7a439 X-Git-Newrev: 5d7cb438a5607fd1bba35ee7a7cf1b2924bae45d X-Git-Rev: 5d7cb438a5607fd1bba35ee7a7cf1b2924bae45d X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 5d7cb43 MINOR: Remove duplicate `subscribe` call in ConsumerPerformance (#5828) 5d7cb43 is described below commit 5d7cb438a5607fd1bba35ee7a7cf1b2924bae45d Author: huxi AuthorDate: Sun Oct 28 12:45:59 2018 +0800 MINOR: Remove duplicate `subscribe` call in ConsumerPerformance (#5828) In the `consume` method, the consumer subscribes the topic, so no need to do the same thing before the method call. Also include minor clean-up in `consume`. Reviewers: Ismael Juma --- core/src/main/scala/kafka/tools/ConsumerPerformance.scala | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala index 2e7b8dd..a065204 100644 --- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala @@ -27,7 +27,7 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.{Metric, MetricName, TopicPartition} import kafka.utils.{CommandLineUtils, ToolsUtils} -import java.util.{Collections, Properties, Random} +import java.util.{Properties, Random} import java.text.SimpleDateFormat import java.time.Duration @@ -54,7 +54,6 @@ object ConsumerPerformance extends LazyLogging { var startMs, endMs = 0L val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](config.props) - consumer.subscribe(Collections.singletonList(config.topic)) startMs = System.currentTimeMillis consume(consumer, List(config.topic), config.numMessages, config.recordFetchTimeoutMs, config, totalMessagesRead, totalBytesRead, joinGroupTimeInMs, startMs) endMs = System.currentTimeMillis @@ -121,10 +120,9 @@ object ConsumerPerformance extends LazyLogging { }}) // Now start the benchmark - val startMs = System.currentTimeMillis - var lastReportTime: Long = startMs - var lastConsumedTime = System.currentTimeMillis - var currentTimeMillis = lastConsumedTime + var currentTimeMillis = System.currentTimeMillis + var lastReportTime: Long = currentTimeMillis + var lastConsumedTime = currentTimeMillis while (messagesRead < count && currentTimeMillis - lastConsumedTime <= timeout) { val records = consumer.poll(Duration.ofMillis(100)).asScala