kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-1459 kafka.tools.ConsumerOffsetChecker throws NoNodeException; reviewed by Neha Narkhede
Date Tue, 27 May 2014 20:43:41 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 01ea4eb42 -> 0ccc1dc08


KAFKA-1459 kafka.tools.ConsumerOffsetChecker throws NoNodeException; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0ccc1dc0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0ccc1dc0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0ccc1dc0

Branch: refs/heads/trunk
Commit: 0ccc1dc08e3664bb2c9bc1548d4d7966e14cc7f9
Parents: 01ea4eb
Author: Sriharsha Chintalapani <schintalapani@hortonworks.com>
Authored: Tue May 27 13:43:24 2014 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Tue May 27 13:43:32 2014 -0700

----------------------------------------------------------------------
 .../kafka/tools/ConsumerOffsetChecker.scala     | 20 +++++++++++++-------
 1 file changed, 13 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0ccc1dc0/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
index 88f824f..ae83e4d 100644
--- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
@@ -29,7 +29,7 @@ import kafka.client.ClientUtils
 import kafka.network.BlockingChannel
 import kafka.api.PartitionOffsetRequestInfo
 import scala.Some
-
+import org.I0Itec.zkclient.exception.ZkNoNodeException
 
 object ConsumerOffsetChecker extends Logging {
 
@@ -160,21 +160,28 @@ object ConsumerOffsetChecker extends Logging {
 
       topicPidMap = immutable.Map(ZkUtils.getPartitionsForTopics(zkClient, topicList).toSeq:_*)
       val topicPartitions = topicPidMap.flatMap { case(topic, partitionSeq) => partitionSeq.map(TopicAndPartition(topic,
_)) }.toSeq
-
       val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs,
channelRetryBackoffMs)
-      
+
       debug("Sending offset fetch request to coordinator %s:%d.".format(channel.host, channel.port))
       channel.send(OffsetFetchRequest(group, topicPartitions))
       val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer)
       debug("Received offset fetch response %s.".format(offsetFetchResponse))
-      
+
       offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata)
=>
         if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) {
           val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic)
           // this group may not have migrated off zookeeper for offsets storage (we don't
expose the dual-commit option in this tool
           // (meaning the lag may be off until all the consumers in the group have the same
setting for offsets storage)
-          val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + "/%d".format(topicAndPartition.partition))._1.toLong
-          offsetMap.put(topicAndPartition, offset)
+          try {
+            val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + "/%d".format(topicAndPartition.partition))._1.toLong
+            offsetMap.put(topicAndPartition, offset)
+          } catch {
+            case z: ZkNoNodeException =>
+              if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir))
+                offsetMap.put(topicAndPartition,0)
+              else
+                throw z
+          }
         }
         else if (offsetAndMetadata.error == ErrorMapping.NoError)
           offsetMap.put(topicAndPartition, offsetAndMetadata.offset)
@@ -217,4 +224,3 @@ object ConsumerOffsetChecker extends Logging {
     }
   }
 }
-


Mime
View raw message