kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3356: Remove ConsumerOffsetChecker
Date Sun, 24 Sep 2017 02:09:05 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 271f6b5ae -> a0f533266


KAFKA-3356: Remove ConsumerOffsetChecker

Author: Mickael Maison <mickael.maison@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>

Closes #3036 from mimaison/KAFKA-3356


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a0f53326
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a0f53326
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a0f53326

Branch: refs/heads/trunk
Commit: a0f533266a8a51a6288eda64f3a80242af13e2f9
Parents: 271f6b5
Author: Mickael Maison <mickael.maison@gmail.com>
Authored: Sun Sep 24 10:08:52 2017 +0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Sun Sep 24 10:08:52 2017 +0800

----------------------------------------------------------------------
 bin/kafka-consumer-offset-checker.sh            |  17 --
 .../kafka/tools/ConsumerOffsetChecker.scala     | 209 -------------------
 docs/upgrade.html                               |   8 +-
 3 files changed, 4 insertions(+), 230 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a0f53326/bin/kafka-consumer-offset-checker.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-consumer-offset-checker.sh b/bin/kafka-consumer-offset-checker.sh
deleted file mode 100755
index 5993345..0000000
--- a/bin/kafka-consumer-offset-checker.sh
+++ /dev/null
@@ -1,17 +0,0 @@
-#!/bin/bash
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker "$@"

http://git-wip-us.apache.org/repos/asf/kafka/blob/a0f53326/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
deleted file mode 100644
index 87147dc..0000000
--- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
+++ /dev/null
@@ -1,209 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.tools
-
-
-import joptsimple._
-import kafka.utils._
-import kafka.consumer.SimpleConsumer
-import kafka.api.{OffsetFetchRequest, OffsetFetchResponse, OffsetRequest}
-import kafka.common.{OffsetMetadataAndError, TopicAndPartition}
-import org.apache.kafka.common.errors.BrokerNotAvailableException
-import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
-import org.apache.kafka.common.security.JaasUtils
-
-import scala.collection._
-import kafka.client.ClientUtils
-import kafka.network.BlockingChannel
-import kafka.api.PartitionOffsetRequestInfo
-import org.I0Itec.zkclient.exception.ZkNoNodeException
-import org.apache.kafka.common.network.ListenerName
-
-@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
-object ConsumerOffsetChecker extends Logging {
-
-  private val consumerMap: mutable.Map[Int, Option[SimpleConsumer]] = mutable.Map()
-  private val offsetMap: mutable.Map[TopicAndPartition, Long] = mutable.Map()
-  private var topicPidMap: immutable.Map[String, Seq[Int]] = immutable.Map()
-
-  private def getConsumer(zkUtils: ZkUtils, bid: Int): Option[SimpleConsumer] = {
-    try {
-      zkUtils.getBrokerInfo(bid)
-        .map(_.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)))
-        .map(endPoint => new SimpleConsumer(endPoint.host, endPoint.port, 10000, 100000,
"ConsumerOffsetChecker"))
-        .orElse(throw new BrokerNotAvailableException("Broker id %d does not exist".format(bid)))
-    } catch {
-      case t: Throwable =>
-        println("Could not parse broker info due to " + t.getCause)
-        None
-    }
-  }
-
-  private def processPartition(zkUtils: ZkUtils,
-                               group: String, topic: String, producerId: Int) {
-    val topicPartition = TopicAndPartition(topic, producerId)
-    val offsetOpt = offsetMap.get(topicPartition)
-    val groupDirs = new ZKGroupTopicDirs(group, topic)
-    val owner = zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/%s".format(producerId))._1
-    zkUtils.getLeaderForPartition(topic, producerId) match {
-      case Some(bid) =>
-        val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(zkUtils, bid))
-        consumerOpt.foreach { consumer =>
-          val topicAndPartition = TopicAndPartition(topic, producerId)
-          val request =
-            OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime,
1)))
-          val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
-
-          val lagString = offsetOpt.map(o => if (o == -1) "unknown" else (logSize - o).toString)
-          println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, producerId,
offsetOpt.getOrElse("unknown"), logSize, lagString.getOrElse("unknown"),
-                                                                 owner match {case Some(ownerStr)
=> ownerStr case None => "none"}))
-        }
-      case None =>
-        println("No broker for partition %s - %s".format(topic, producerId))
-    }
-  }
-
-  private def processTopic(zkUtils: ZkUtils, group: String, topic: String) {
-    topicPidMap.get(topic).foreach { producerIds =>
-      producerIds.sorted.foreach {
-        producerId => processPartition(zkUtils, group, topic, producerId)
-        }
-    }
-  }
-
-  private def printBrokerInfo() {
-    println("BROKER INFO")
-    for ((bid, consumerOpt) <- consumerMap)
-      consumerOpt.foreach { consumer =>
-        println("%s -> %s:%d".format(bid, consumer.host, consumer.port))
-      }
-  }
-
-  def main(args: Array[String]) {
-    warn("WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following
0.9.0. Use ConsumerGroupCommand instead.")
-
-    val parser = new OptionParser(false)
-
-    val zkConnectOpt = parser.accepts("zookeeper", "ZooKeeper connect string.").
-            withRequiredArg().defaultsTo("localhost:2181").ofType(classOf[String])
-    val topicsOpt = parser.accepts("topic",
-            "Comma-separated list of consumer topics (all topics if absent).").
-            withRequiredArg().ofType(classOf[String])
-    val groupOpt = parser.accepts("group", "Consumer group.").
-            withRequiredArg().ofType(classOf[String])
-    val channelSocketTimeoutMsOpt = parser.accepts("socket.timeout.ms", "Socket timeout to
use when querying for offsets.").
-            withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(6000)
-    val channelRetryBackoffMsOpt = parser.accepts("retry.backoff.ms", "Retry back-off to
use for failed offset queries.").
-            withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(3000)
-
-    parser.accepts("broker-info", "Print broker info")
-    parser.accepts("help", "Print this message.")
-
-    if(args.length == 0)
-      CommandLineUtils.printUsageAndDie(parser, "Check the offset of your consumers.")
-
-    val options = parser.parse(args : _*)
-
-    if (options.has("help")) {
-       parser.printHelpOn(System.out)
-       Exit.exit(0)
-    }
-
-    CommandLineUtils.checkRequiredArgs(parser, options, groupOpt, zkConnectOpt)
-
-    val zkConnect = options.valueOf(zkConnectOpt)
-
-    val group = options.valueOf(groupOpt)
-    val groupDirs = new ZKGroupDirs(group)
-
-    val channelSocketTimeoutMs = options.valueOf(channelSocketTimeoutMsOpt).intValue()
-    val channelRetryBackoffMs = options.valueOf(channelRetryBackoffMsOpt).intValue()
-
-    val topics = if (options.has(topicsOpt)) Some(options.valueOf(topicsOpt)) else None
-
-    var zkUtils: ZkUtils = null
-    var channel: BlockingChannel = null
-    try {
-      zkUtils = ZkUtils(zkConnect,
-                        30000,
-                        30000,
-                        JaasUtils.isZkSecurityEnabled())
-
-      val topicList = topics match {
-        case Some(x) => x.split(",").view.toList
-        case None => zkUtils.getChildren(groupDirs.consumerGroupDir +  "/owners").toList
-      }
-
-      topicPidMap = immutable.Map(zkUtils.getPartitionsForTopics(topicList).toSeq:_*)
-      val topicPartitions = topicPidMap.flatMap { case(topic, partitionSeq) => partitionSeq.map(TopicAndPartition(topic,
_)) }.toSeq
-      channel = ClientUtils.channelToOffsetManager(group, zkUtils, channelSocketTimeoutMs,
channelRetryBackoffMs)
-
-      debug("Sending offset fetch request to coordinator %s:%d.".format(channel.host, channel.port))
-      channel.send(OffsetFetchRequest(group, topicPartitions))
-      val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().payload())
-      debug("Received offset fetch response %s.".format(offsetFetchResponse))
-
-      offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata)
=>
-        if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) {
-          val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic)
-          // this group may not have migrated off zookeeper for offsets storage (we don't
expose the dual-commit option in this tool
-          // (meaning the lag may be off until all the consumers in the group have the same
setting for offsets storage)
-          try {
-            val offset = zkUtils.readData(topicDirs.consumerOffsetDir + "/%d".format(topicAndPartition.partition))._1.toLong
-            offsetMap.put(topicAndPartition, offset)
-          } catch {
-            case z: ZkNoNodeException =>
-              if(zkUtils.pathExists(topicDirs.consumerOffsetDir))
-                offsetMap.put(topicAndPartition,-1)
-              else
-                throw z
-          }
-        }
-        else if (offsetAndMetadata.error == Errors.NONE)
-          offsetMap.put(topicAndPartition, offsetAndMetadata.offset)
-        else {
-          println("Could not fetch offset for %s due to %s.".format(topicAndPartition, offsetAndMetadata.error.exception))
-        }
-      }
-      channel.disconnect()
-      channel = null
-
-      println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format("Group", "Topic", "Pid", "Offset",
"logSize", "Lag", "Owner"))
-      topicList.sorted.foreach {
-        topic => processTopic(zkUtils, group, topic)
-      }
-
-      if (options.has("broker-info"))
-        printBrokerInfo()
-
-      consumerMap.values.flatten.foreach(_.close())
-    }
-    catch {
-      case t: Throwable =>
-        println("Exiting due to: %s.".format(t.getMessage))
-    }
-    finally {
-      consumerMap.values.flatten.foreach(_.close())
-      if (zkUtils != null)
-        zkUtils.close()
-
-      if (channel != null)
-        channel.disconnect()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a0f53326/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index a98bdea..ce750ea 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -66,6 +66,7 @@
         This was only intended for use on the broker, but it is no longer in use and the
implementations have not been maintained.
         A stub implementation has been retained for binary compatibility.</li>
     <li>The Java clients and tools now accept any string as a client-id.</li>
+    <li>The deprecated tool <code>kafka-consumer-offset-checker.sh</code>
has been removed. Use <code>kafka-consumer-groups.sh</code> to get consumer group
details.</li>
 </ul>
 
 <h5><a id="upgrade_100_new_protocols" href="#upgrade_100_new_protocols">New Protocol
Versions</a></h5>
@@ -163,7 +164,7 @@
         This config specifies the time, in milliseconds, that the <code>GroupCoordinator</code>
will delay the initial consumer rebalance.
         The rebalance will be further delayed by the value of <code>group.initial.rebalance.delay.ms</code>
as new members join the group, up to a maximum of <code>max.poll.interval.ms</code>.
         The default value for this is 3 seconds.
-        During development and testing it might be desirable to set this to 0 inorder to
not delay test execution time.
+        During development and testing it might be desirable to set this to 0 in order to
not delay test execution time.
     </li>
     <li><code>org.apache.kafka.common.Cluster#partitionsForTopic</code>,
<code>partitionsForNode</code> and <code>availablePartitionsForTopic</code>
methods
         will return an empty list instead of <code>null</code> (which is considered
a bad practice) in case the metadata for the required topic does not exist.
@@ -205,7 +206,7 @@
     tool.</li>
   <li>EoS in Kafka introduces new request APIs and modifies several existing ones.
See
     <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP-98-ExactlyOnceDeliveryandTransactionalMessaging-RPCProtocolSummary">KIP-98</a>
-    for the full details</code></li>
+    for the full details</li>
 </ol>
 
 <h5><a id="upgrade_11_message_format" href="#upgrade_11_message_format">Notes
on the new message format in 0.11.0</a></h5>
@@ -236,7 +237,7 @@
   is already not possible in that case. In order to avoid the cost of down-conversion, you
should ensure that consumer applications
   are upgraded to the latest 0.11.0 client. Significantly, since the old consumer has been
deprecated in 0.11.0.0, it does not support
   the new message format. You must upgrade to use the new consumer to use the new message
format without the cost of down-conversion.
-  Note that 0.11.0 consumers support backwards compability with brokers 0.10.0 brokers and
upward, so it is possible to upgrade the
+  Note that 0.11.0 consumers support backwards compatibility with brokers 0.10.0 brokers
and upward, so it is possible to upgrade the
   clients first before the brokers.
 </p>
 
@@ -483,7 +484,6 @@ work with 0.10.0.x brokers. Therefore, 0.9.0.0 clients should be upgraded
to 0.9
         <code>def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]],
output: PrintStream)</code> </li>
     <li> MessageReader interface was changed from <code>def readMessage(): KeyedMessage[Array[Byte],
Array[Byte]]</code> to
         <code>def readMessage(): ProducerRecord[Array[Byte], Array[Byte]]</code>
</li>
-    </li>
     <li> MessageFormatter's package was changed from <code>kafka.tools</code>
to <code>kafka.common</code> </li>
     <li> MessageReader's package was changed from <code>kafka.tools</code>
to <code>kafka.common</code> </li>
     <li> MirrorMakerMessageHandler no longer exposes the <code>handle(record:
MessageAndMetadata[Array[Byte], Array[Byte]])</code> method as it was never called.
</li>


Mime
View raw message