Updated Branches:
refs/heads/0.8 1c3660568 -> cd3b79699
KAFKA-1086 Improve GetOffsetShell to find metadata automatically; reviwed by Jun Rao and Joel
Koshy
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cd3b7969
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cd3b7969
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cd3b7969
Branch: refs/heads/0.8
Commit: cd3b79699341afb8d52c51d9ac7317d93c32eeb6
Parents: 1c36605
Author: Neha Narkhede <neha.narkhede@gmail.com>
Authored: Wed Oct 16 16:31:05 2013 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Wed Oct 16 16:31:05 2013 -0700
----------------------------------------------------------------------
.../main/scala/kafka/tools/GetOffsetShell.scala | 70 ++++++++++++++------
kafka-patch-review.py | 2 +-
2 files changed, 49 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/cd3b7969/core/src/main/scala/kafka/tools/GetOffsetShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
index 2b9438a..91c4d28 100644
--- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala
+++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
@@ -23,25 +23,27 @@ import joptsimple._
import java.net.URI
import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest}
import kafka.common.TopicAndPartition
+import kafka.client.ClientUtils
+import kafka.utils.CommandLineUtils
object GetOffsetShell {
def main(args: Array[String]): Unit = {
val parser = new OptionParser
- val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect
to.")
+ val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and
port of the server to connect to.")
.withRequiredArg
- .describedAs("kafka://hostname:port")
+ .describedAs("hostname:port,...,hostname:port")
.ofType(classOf[String])
val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset from.")
.withRequiredArg
.describedAs("topic")
.ofType(classOf[String])
- val partitionOpt = parser.accepts("partition", "partition id")
+ val partitionOpt = parser.accepts("partitions", "comma separated list of partition ids.
If not specified, it will find offsets for all partitions")
.withRequiredArg
- .describedAs("partition id")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(0)
+ .describedAs("partition ids")
+ .ofType(classOf[String])
+ .defaultsTo("")
val timeOpt = parser.accepts("time", "timestamp of the offsets before that")
.withRequiredArg
.describedAs("timestamp/-1(latest)/-2(earliest)")
@@ -51,28 +53,52 @@ object GetOffsetShell {
.describedAs("count")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1)
+ val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request
waits.")
+ .withRequiredArg
+ .describedAs("ms")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(1000)
val options = parser.parse(args : _*)
- for(arg <- List(urlOpt, topicOpt, timeOpt)) {
- if(!options.has(arg)) {
- System.err.println("Missing required argument \"" + arg + "\"")
- parser.printHelpOn(System.err)
- System.exit(1)
- }
- }
+ CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, topicOpt, timeOpt)
- val url = new URI(options.valueOf(urlOpt))
+ val clientId = "GetOffsetShell"
+ val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt))
val topic = options.valueOf(topicOpt)
- val partition = options.valueOf(partitionOpt).intValue
+ var partitionList = options.valueOf(partitionOpt)
var time = options.valueOf(timeOpt).longValue
val nOffsets = options.valueOf(nOffsetsOpt).intValue
- val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 100000, "GetOffsetShell")
- val topicAndPartition = TopicAndPartition(topic, partition)
- val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time,
nOffsets)))
- val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets
- println("get " + offsets.length + " results")
- for (offset <- offsets)
- println(offset)
+ val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue()
+
+ val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers,
clientId, maxWaitMs).topicsMetadata
+ if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) {
+ System.err.println(("Error: no valid topic metadata for topic: %s, " + " probably the
topic does not exist, run ").format(topic) +
+ "kafka-list-topic.sh to verify")
+ System.exit(1)
+ }
+ val partitions =
+ if(partitionList == "") {
+ topicsMetadata.head.partitionsMetadata.map(_.partitionId)
+ } else {
+ partitionList.split(",").map(_.toInt).toSeq
+ }
+ partitions.foreach { partitionId =>
+ val partitionMetadataOpt = topicsMetadata.head.partitionsMetadata.find(_.partitionId
== partitionId)
+ partitionMetadataOpt match {
+ case Some(metadata) =>
+ metadata.leader match {
+ case Some(leader) =>
+ val consumer = new SimpleConsumer(leader.host, leader.port, 10000, 100000,
clientId)
+ val topicAndPartition = TopicAndPartition(topic, partitionId)
+ val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time,
nOffsets)))
+ val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets
+
+ println("%s:%d:%s".format(topic, partitionId, offsets.mkString(",")))
+ case None => System.err.println("Error: partition %d does not have a leader.
Skip getting offsets".format(partitionId))
+ }
+ case None => System.err.println("Error: partition %d does not exist".format(partitionId))
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/cd3b7969/kafka-patch-review.py
----------------------------------------------------------------------
diff --git a/kafka-patch-review.py b/kafka-patch-review.py
index f1d5192..2653465 100644
--- a/kafka-patch-review.py
+++ b/kafka-patch-review.py
@@ -90,7 +90,7 @@ def main():
comment="Created reviewboard "
if not opt.reviewboard:
- print 'Created a new reviewboard ',rb_url
+ print 'Created a new reviewboard ',rb_url,' against branch: ',opt.branch
else:
print 'Updated reviewboard',opt.reviewboard
comment="Updated reviewboard "
|