kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-3645; Fix ConsumerGroupCommand and ConsumerOffsetChecker to correctly read endpoint info from ZK
Date Sat, 04 Jun 2016 08:42:37 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 49ddc897b -> ff300c9d4


KAFKA-3645; Fix ConsumerGroupCommand and ConsumerOffsetChecker to correctly read endpoint
info from ZK

The host and port entries under /brokers/ids/<bid> gets filled only for PLAINTEXT security
protocol. For other protocols the host is null and the actual endpoint is under "endpoints".
This causes NPE when running the consumer group and offset checker scripts in a kerberized
env. By always reading the host and port values from the "endpoint", a more meaningful exception
would be thrown rather than a NPE.

Author: Arun Mahadevan <aiyer@hortonworks.com>

Reviewers: Sriharsha Chintalapani <schintalapani@hortonworks.com>, Ismael Juma <ismael@juma.me.uk>

Closes #1301 from arunmahadevan/cg_kerb_fix


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ff300c9d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ff300c9d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ff300c9d

Branch: refs/heads/trunk
Commit: ff300c9d4f45e4a355db11258965c3a3a6f6bbf7
Parents: 49ddc89
Author: Arun Mahadevan <aiyer@hortonworks.com>
Authored: Sat Jun 4 09:24:45 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Sat Jun 4 09:24:45 2016 +0100

----------------------------------------------------------------------
 .../kafka/admin/ConsumerGroupCommand.scala      | 20 +++++------------
 .../kafka/tools/ConsumerOffsetChecker.scala     | 23 ++++++--------------
 2 files changed, 12 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ff300c9d/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index b086d8f..f0c817f 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.BrokerNotAvailableException
-import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.apache.kafka.common.utils.Utils
@@ -277,20 +277,10 @@ object ConsumerGroupCommand {
 
     private def getZkConsumer(brokerId: Int): Option[SimpleConsumer] = {
       try {
-        zkUtils.readDataMaybeNull(ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match {
-          case Some(brokerInfoString) =>
-            Json.parseFull(brokerInfoString) match {
-              case Some(m) =>
-                val brokerInfo = m.asInstanceOf[Map[String, Any]]
-                val host = brokerInfo.get("host").get.asInstanceOf[String]
-                val port = brokerInfo.get("port").get.asInstanceOf[Int]
-                Some(new SimpleConsumer(host, port, 10000, 100000, "ConsumerGroupCommand"))
-              case None =>
-                throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))
-            }
-          case None =>
-            throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))
-        }
+        zkUtils.getBrokerInfo(brokerId)
+          .map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT))
+          .map(endPoint => new SimpleConsumer(endPoint.host, endPoint.port, 10000, 100000,
"ConsumerGroupCommand"))
+          .orElse(throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId)))
       } catch {
         case t: Throwable =>
           println("Could not parse broker info due to " + t.getMessage)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ff300c9d/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
index 5c01f34..8f86f66 100644
--- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
@@ -21,11 +21,12 @@ package kafka.tools
 import joptsimple._
 import kafka.utils._
 import kafka.consumer.SimpleConsumer
-import kafka.api.{OffsetFetchResponse, OffsetFetchRequest, OffsetRequest}
+import kafka.api.{OffsetFetchRequest, OffsetFetchResponse, OffsetRequest}
 import kafka.common.{OffsetMetadataAndError, TopicAndPartition}
 import org.apache.kafka.common.errors.BrokerNotAvailableException
-import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
 import org.apache.kafka.common.security.JaasUtils
+
 import scala.collection._
 import kafka.client.ClientUtils
 import kafka.network.BlockingChannel
@@ -40,20 +41,10 @@ object ConsumerOffsetChecker extends Logging {
 
   private def getConsumer(zkUtils: ZkUtils, bid: Int): Option[SimpleConsumer] = {
     try {
-      zkUtils.readDataMaybeNull(ZkUtils.BrokerIdsPath + "/" + bid)._1 match {
-        case Some(brokerInfoString) =>
-          Json.parseFull(brokerInfoString) match {
-            case Some(m) =>
-              val brokerInfo = m.asInstanceOf[Map[String, Any]]
-              val host = brokerInfo.get("host").get.asInstanceOf[String]
-              val port = brokerInfo.get("port").get.asInstanceOf[Int]
-              Some(new SimpleConsumer(host, port, 10000, 100000, "ConsumerOffsetChecker"))
-            case None =>
-              throw new BrokerNotAvailableException("Broker id %d does not exist".format(bid))
-          }
-        case None =>
-          throw new BrokerNotAvailableException("Broker id %d does not exist".format(bid))
-      }
+      zkUtils.getBrokerInfo(bid)
+        .map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT))
+        .map(endPoint => new SimpleConsumer(endPoint.host, endPoint.port, 10000, 100000,
"ConsumerOffsetChecker"))
+        .orElse(throw new BrokerNotAvailableException("Broker id %d does not exist".format(bid)))
     } catch {
       case t: Throwable =>
         println("Could not parse broker info due to " + t.getCause)


Mime
View raw message