kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1397422 - in /incubator/kafka/branches/0.8: bin/ core/src/main/scala/kafka/api/ core/src/main/scala/kafka/cluster/ core/src/main/scala/kafka/common/ core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/javaapi/ core/src/main/scala...
Date Fri, 12 Oct 2012 02:56:29 GMT
Author: nehanarkhede
Date: Fri Oct 12 02:56:28 2012
New Revision: 1397422

URL: http://svn.apache.org/viewvc?rev=1397422&view=rev
Log:
KAFKA-432 allow consumer to read from followers; patched by Yang Ye; reviewed by Neha and Jun

Modified:
    incubator/kafka/branches/0.8/bin/kafka-consumer-shell.sh
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ReplicaNotAvailableException.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerShell.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala

Modified: incubator/kafka/branches/0.8/bin/kafka-consumer-shell.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/bin/kafka-consumer-shell.sh?rev=1397422&r1=1397421&r2=1397422&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/bin/kafka-consumer-shell.sh (original)
+++ incubator/kafka/branches/0.8/bin/kafka-consumer-shell.sh Fri Oct 12 02:56:28 2012
@@ -1,17 +0,0 @@
-#!/bin/bash
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-$(dirname $0)/kafka-run-class.sh kafka.tools.ConsumerShell $@

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala?rev=1397422&r1=1397421&r2=1397422&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala Fri Oct 12 02:56:28 2012
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer
 import kafka.utils.{nonthreadsafe, Utils}
 import scala.collection.immutable.Map
 import kafka.common.TopicAndPartition
+import kafka.consumer.ConsumerConfig
 
 
 case class PartitionFetchInfo(offset: Long, fetchSize: Int)
@@ -30,8 +31,6 @@ object FetchRequest {
   val CurrentVersion = 1.shortValue()
   val DefaultCorrelationId = -1
   val DefaultClientId = ""
-  val DefaultMaxWait = 0
-  val DefaultMinBytes = 0
 
   def readFrom(buffer: ByteBuffer): FetchRequest = {
     val versionId = buffer.getShort
@@ -58,9 +57,9 @@ object FetchRequest {
 case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
                         correlationId: Int = FetchRequest.DefaultCorrelationId,
                         clientId: String = FetchRequest.DefaultClientId,
-                        replicaId: Int = Request.DefaultReplicaId,
-                        maxWait: Int = FetchRequest.DefaultMaxWait,
-                        minBytes: Int = FetchRequest.DefaultMinBytes,
+                        replicaId: Int = Request.OrdinaryConsumerId,
+                        maxWait: Int = ConsumerConfig.MaxFetchWaitMs,
+                        minBytes: Int = ConsumerConfig.MinFetchBytes,
                         requestInfo: Map[TopicAndPartition, PartitionFetchInfo])
         extends RequestOrResponse(Some(RequestKeys.FetchKey)) {
 
@@ -111,7 +110,11 @@ case class FetchRequest(versionId: Short
     })
   }
 
-  def isFromFollower = replicaId != Request.NonFollowerId
+  def isFromFollower = replicaId != Request.OrdinaryConsumerId && replicaId != Request.DebuggingConsumerId
+
+  def isFromOrdinaryConsumer = replicaId == Request.OrdinaryConsumerId
+
+  def isFromLowLevelConsumer = replicaId == Request.DebuggingConsumerId
 
   def numPartitions = requestInfo.size
 }
@@ -122,9 +125,9 @@ class FetchRequestBuilder() {
   private var correlationId = FetchRequest.DefaultCorrelationId
   private val versionId = FetchRequest.CurrentVersion
   private var clientId = FetchRequest.DefaultClientId
-  private var replicaId = Request.DefaultReplicaId
-  private var maxWait = FetchRequest.DefaultMaxWait
-  private var minBytes = FetchRequest.DefaultMinBytes
+  private var replicaId = Request.OrdinaryConsumerId
+  private var maxWait = ConsumerConfig.MaxFetchWaitMs
+  private var minBytes = ConsumerConfig.MinFetchBytes
   private val requestMap = new collection.mutable.HashMap[TopicAndPartition, PartitionFetchInfo]
 
   def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int) = {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala?rev=1397422&r1=1397421&r2=1397422&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala Fri Oct 12 02:56:28 2012
@@ -55,9 +55,11 @@ case class PartitionOffsetRequestInfo(ti
 case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequestInfo],
                          versionId: Short = OffsetRequest.CurrentVersion,
                          clientId: String = OffsetRequest.DefaultClientId,
-                         replicaId: Int = Request.DefaultReplicaId)
+                         replicaId: Int = Request.OrdinaryConsumerId)
         extends RequestOrResponse(Some(RequestKeys.OffsetsKey)) {
 
+  def this(requestInfo: Map[TopicAndPartition, PartitionOffsetRequestInfo], replicaId: Int) = this(requestInfo, OffsetRequest.CurrentVersion, OffsetRequest.DefaultClientId, replicaId)
+
   lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
 
   def writeTo(buffer: ByteBuffer) {
@@ -96,5 +98,6 @@ case class OffsetRequest(requestInfo: Ma
       )
     })
 
-  def isFromFollower = replicaId != Request.NonFollowerId
+  def isFromOrdinaryClient = replicaId == Request.OrdinaryConsumerId
+  def isFromDebuggingClient = replicaId == Request.DebuggingConsumerId
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala?rev=1397422&r1=1397421&r2=1397422&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala Fri Oct 12 02:56:28 2012
@@ -26,8 +26,8 @@ object RequestOrResponse {
 
 
 object Request {
-  val DefaultReplicaId = -1
-  val NonFollowerId = DefaultReplicaId
+  val OrdinaryConsumerId: Int = -1
+  val DebuggingConsumerId: Int = -2
 }
 
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala?rev=1397422&r1=1397421&r2=1397422&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala Fri Oct 12 02:56:28 2012
@@ -41,7 +41,7 @@ class Replica(val brokerId: Int,
       logEndOffsetValue.set(newLogEndOffset)
       logEndOffsetUpdateTimeMsValue.set(time.milliseconds)
       trace("Setting log end offset for replica %d for topic %s partition %d to %d"
-            .format(brokerId, topic, partitionId, logEndOffsetValue))
+            .format(brokerId, topic, partitionId, logEndOffsetValue.get()))
     } else
       throw new KafkaException("Shouldn't set logEndOffset for replica %d topic %s partition %d since it's local"
           .format(brokerId, topic, partitionId))

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ReplicaNotAvailableException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ReplicaNotAvailableException.scala?rev=1397422&r1=1397421&r2=1397422&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ReplicaNotAvailableException.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ReplicaNotAvailableException.scala Fri Oct 12 02:56:28 2012
@@ -20,6 +20,7 @@ package kafka.common
 /**
  * Thrown when a request is made for partition, but no leader exists for that partition
  */
-class ReplicaNotAvailableException(cause: Throwable) extends RuntimeException(cause) {
-  def this() = this(null)
+class ReplicaNotAvailableException(cause: Throwable, message: String = "") extends RuntimeException(cause) {
+  def this() = this(null, "")
+  def this(message: String) = this(null, message)
 }
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala?rev=1397422&r1=1397421&r2=1397422&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala Fri Oct 12 02:56:28 2012
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -38,79 +38,79 @@ object ConsoleConsumer extends Logging {
   def main(args: Array[String]) {
     val parser = new OptionParser
     val topicIdOpt = parser.accepts("topic", "The topic id to consume on.")
-                           .withRequiredArg
-                           .describedAs("topic")
-                           .ofType(classOf[String])
+            .withRequiredArg
+            .describedAs("topic")
+            .ofType(classOf[String])
     val whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to include for consumption.")
-                             .withRequiredArg
-                             .describedAs("whitelist")
-                             .ofType(classOf[String])
+            .withRequiredArg
+            .describedAs("whitelist")
+            .ofType(classOf[String])
     val blacklistOpt = parser.accepts("blacklist", "Blacklist of topics to exclude from consumption.")
-                             .withRequiredArg
-                             .describedAs("blacklist")
-                             .ofType(classOf[String])
+            .withRequiredArg
+            .describedAs("blacklist")
+            .ofType(classOf[String])
     val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
-                                      "Multiple URLS can be given to allow fail-over.")
-                           .withRequiredArg
-                           .describedAs("urls")
-                           .ofType(classOf[String])
+            "Multiple URLS can be given to allow fail-over.")
+            .withRequiredArg
+            .describedAs("urls")
+            .ofType(classOf[String])
     val groupIdOpt = parser.accepts("group", "The group id to consume on.")
-                           .withRequiredArg
-                           .describedAs("gid")
-                           .defaultsTo("console-consumer-" + new Random().nextInt(100000))   
-                           .ofType(classOf[String])
+            .withRequiredArg
+            .describedAs("gid")
+            .defaultsTo("console-consumer-" + new Random().nextInt(100000))
+            .ofType(classOf[String])
     val fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to fetch in a single request.")
-                           .withRequiredArg
-                           .describedAs("size")
-                           .ofType(classOf[java.lang.Integer])
-                           .defaultsTo(1024 * 1024)
+            .withRequiredArg
+            .describedAs("size")
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(1024 * 1024)
     val minFetchBytesOpt = parser.accepts("min-fetch-bytes", "The min number of bytes each fetch request waits for.")
-                           .withRequiredArg
-                           .describedAs("bytes")
-                           .ofType(classOf[java.lang.Integer])
-                           .defaultsTo(1)
+            .withRequiredArg
+            .describedAs("bytes")
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(1)
     val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.")
-                           .withRequiredArg
-                           .describedAs("ms")
-                           .ofType(classOf[java.lang.Integer])
-                           .defaultsTo(100)
+            .withRequiredArg
+            .describedAs("ms")
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(100)
     val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.")
-                           .withRequiredArg
-                           .describedAs("size")
-                           .ofType(classOf[java.lang.Integer])
-                           .defaultsTo(2 * 1024 * 1024)
+            .withRequiredArg
+            .describedAs("size")
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(2 * 1024 * 1024)
     val consumerTimeoutMsOpt = parser.accepts("consumer-timeout-ms", "consumer throws timeout exception after waiting this much " +
-                                              "of time without incoming messages")
-                           .withRequiredArg
-                           .describedAs("prop")
-                           .ofType(classOf[java.lang.Integer])
-                           .defaultsTo(-1)
+            "of time without incoming messages")
+            .withRequiredArg
+            .describedAs("prop")
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(-1)
     val messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.")
-                           .withRequiredArg
-                           .describedAs("class")
-                           .ofType(classOf[String])
-                           .defaultsTo(classOf[NewlineMessageFormatter].getName)
+            .withRequiredArg
+            .describedAs("class")
+            .ofType(classOf[String])
+            .defaultsTo(classOf[NewlineMessageFormatter].getName)
     val messageFormatterArgOpt = parser.accepts("property")
-                           .withRequiredArg
-                           .describedAs("prop")
-                           .ofType(classOf[String])
+            .withRequiredArg
+            .describedAs("prop")
+            .ofType(classOf[String])
     val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, " +
-        "start with the earliest message present in the log rather than the latest message.")
+            "start with the earliest message present in the log rather than the latest message.")
     val autoCommitIntervalOpt = parser.accepts("autocommit.interval.ms", "The time interval at which to save the current offset in ms")
-                           .withRequiredArg
-                           .describedAs("ms")
-                           .ofType(classOf[java.lang.Integer])
-                           .defaultsTo(10*1000)
+            .withRequiredArg
+            .describedAs("ms")
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(10*1000)
     val maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.")
-                           .withRequiredArg
-                           .describedAs("num_messages")
-                           .ofType(classOf[java.lang.Integer])
+            .withRequiredArg
+            .describedAs("num_messages")
+            .ofType(classOf[java.lang.Integer])
     val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " +
-        "skip it instead of halt.")
+            "skip it instead of halt.")
 
     val options: OptionSet = tryParse(parser, args)
     Utils.checkRequiredArgs(parser, options, zkConnectOpt)
-    
+
     val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has)
     if (topicOrFilterOpt.size != 1) {
       error("Exactly one of whitelist/blacklist/topic is required.")
@@ -136,10 +136,10 @@ object ConsoleConsumer extends Logging {
     props.put("consumer.timeout.ms", options.valueOf(consumerTimeoutMsOpt).toString)
     val config = new ConsumerConfig(props)
     val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
-    
+
     val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
-    val formatterArgs = tryParseFormatterArgs(options.valuesOf(messageFormatterArgOpt))
-    
+    val formatterArgs = MessageFormatter.tryParseFormatterArgs(options.valuesOf(messageFormatterArgOpt))
+
     val maxMessages = if(options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1
 
     val connector = Consumer.create(config)
@@ -151,7 +151,7 @@ object ConsoleConsumer extends Logging {
       override def run() {
         connector.shutdown()
         // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack
-        if(!options.has(groupIdOpt))  
+        if(!options.has(groupIdOpt))
           tryCleanupZookeeper(options.valueOf(zkConnectOpt), options.valueOf(groupIdOpt))
       }
     })
@@ -201,7 +201,22 @@ object ConsoleConsumer extends Logging {
       }
     }
   }
-  
+
+  def tryCleanupZookeeper(zkUrl: String, groupId: String) {
+    try {
+      val dir = "/consumers/" + groupId
+      info("Cleaning up temporary zookeeper data under " + dir + ".")
+      val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
+      zk.deleteRecursive(dir)
+      zk.close()
+    } catch {
+      case _ => // swallow
+    }
+  }
+}
+
+
+object MessageFormatter {
   def tryParseFormatterArgs(args: Iterable[String]): Properties = {
     val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0)
     if(!splits.forall(_.length == 2)) {
@@ -213,69 +228,56 @@ object ConsoleConsumer extends Logging {
       props.put(a(0), a(1))
     props
   }
-  
-  trait MessageFormatter {
-    def writeTo(message: Message, output: PrintStream)
-    def init(props: Properties) {}
-    def close() {}
+}
+
+trait MessageFormatter {
+  def writeTo(message: Message, output: PrintStream)
+  def init(props: Properties) {}
+  def close() {}
+}
+
+class DecodedMessageFormatter extends MessageFormatter {
+  var topicStr: String = _
+  val decoder = new StringDecoder()
+
+  override def init(props: Properties) {
+    topicStr = props.getProperty("topic")
+    if (topicStr != null)
+      topicStr = topicStr + ":"
+    else
+      topicStr = ""
   }
-  
-  class NewlineMessageFormatter extends MessageFormatter {
-    def writeTo(message: Message, output: PrintStream) {
-      val payload = message.payload
-      output.write(payload.array, payload.arrayOffset, payload.limit)
-      output.write('\n')
+
+  def writeTo(message: Message, output: PrintStream) {
+    try {
+      output.println(topicStr + decoder.toEvent(message) + ":payloadsize:" + message.payloadSize)
+    } catch {
+      case e => e.printStackTrace()
     }
   }
+}
 
-  class ChecksumMessageFormatter extends MessageFormatter {
-    private var topicStr: String = _
-    
-    override def init(props: Properties) {
-      topicStr = props.getProperty("topic")
-      if (topicStr != null) 
-        topicStr = topicStr + ":"
-      else
-        topicStr = ""
-    }
-    
-    def writeTo(message: Message, output: PrintStream) {
-      val chksum = message.checksum
-      output.println(topicStr + "checksum:" + chksum)
-    }
+class NewlineMessageFormatter extends MessageFormatter {
+  def writeTo(message: Message, output: PrintStream) {
+    val payload = message.payload
+    output.write(payload.array, payload.arrayOffset, payload.limit)
+    output.write('\n')
   }
-  
-  class DecodedMessageFormatter extends MessageFormatter {
-    var topicStr: String = _
-    val decoder = new StringDecoder()
-    
-    override def init(props: Properties) {
-      topicStr = props.getProperty("topic")
-      if (topicStr != null) 
-        topicStr = topicStr + ":"
-      else
-        topicStr = ""
-    }
-    
-    def writeTo(message: Message, output: PrintStream) {
-      try {
-        output.println(topicStr + decoder.toEvent(message) + ":payloadsize:" + message.payloadSize)
-      } catch {
-        case e => e.printStackTrace()
-      }
-    }
+}
+
+class ChecksumMessageFormatter extends MessageFormatter {
+  private var topicStr: String = _
+
+  override def init(props: Properties) {
+    topicStr = props.getProperty("topic")
+    if (topicStr != null)
+      topicStr = topicStr + ":"
+    else
+      topicStr = ""
   }
-  
-  def tryCleanupZookeeper(zkUrl: String, groupId: String) {
-    try {
-      val dir = "/consumers/" + groupId
-      info("Cleaning up temporary zookeeper data under " + dir + ".")
-      val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
-      zk.deleteRecursive(dir)
-      zk.close()
-    } catch {
-      case _ => // swallow
-    }
+
+  def writeTo(message: Message, output: PrintStream) {
+    val chksum = message.checksum
+    output.println(topicStr + "checksum:" + chksum)
   }
-   
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala?rev=1397422&r1=1397421&r2=1397422&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala Fri Oct 12 02:56:28 2012
@@ -28,14 +28,10 @@ class ConsumerFetcherThread(name: String
                             val config: ConsumerConfig,
                             sourceBroker: Broker,
                             val consumerFetcherManager: ConsumerFetcherManager)
-        extends AbstractFetcherThread(name = name, 
-                                      sourceBroker = sourceBroker, 
-                                      socketTimeout = config.socketTimeoutMs,
-                                      socketBufferSize = config.socketBufferSize, 
-                                      fetchSize = config.fetchSize,
-                                      fetcherBrokerId = Request.NonFollowerId, 
-                                      maxWait = config.maxFetchWaitMs,
-                                      minBytes = config.minFetchBytes) {
+        extends AbstractFetcherThread(name = name, sourceBroker = sourceBroker, socketTimeout = config.socketTimeoutMs,
+          socketBufferSize = config.socketBufferSize, fetchSize = config.fetchSize,
+          fetcherBrokerId = Request.OrdinaryConsumerId, maxWait = config.maxFetchWaitMs,
+          minBytes = config.minFetchBytes) {
 
   // process fetched data
   def processPartitionData(topic: String, fetchOffset: Long, partitionData: FetchResponsePartitionData) {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala?rev=1397422&r1=1397421&r2=1397422&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala Fri Oct 12 02:56:28 2012
@@ -22,6 +22,47 @@ import kafka.network._
 import kafka.utils._
 import java.util.concurrent.TimeUnit
 import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
+import kafka.utils.ZkUtils._
+import collection.immutable
+import kafka.common.{TopicAndPartition, KafkaException}
+import org.I0Itec.zkclient.ZkClient
+import kafka.cluster.Broker
+
+
+object SimpleConsumer extends Logging {
+  def earliestOrLatestOffset(broker: Broker, topic: String, partitionId: Int, earliestOrLatest: Long, isFromOrdinaryConsumer: Boolean): Long = {
+    var simpleConsumer: SimpleConsumer = null
+    var producedOffset: Long = -1L
+    try {
+      simpleConsumer = new SimpleConsumer(broker.host, broker.port, ConsumerConfig.SocketTimeout,
+                                          ConsumerConfig.SocketBufferSize)
+      val topicAndPartition = TopicAndPartition(topic, partitionId)
+      val request = if(isFromOrdinaryConsumer)
+        OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)))
+      else
+        OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)), Request.DebuggingConsumerId.toShort)
+      producedOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
+    } catch {
+      case e =>
+        error("error in earliestOrLatestOffset() ", e)
+    }
+    finally {
+      if (simpleConsumer != null)
+        simpleConsumer.close()
+    }
+    producedOffset
+  }
+
+  def earliestOrLatestOffset(zkClient: ZkClient, topic: String, brokerId: Int, partitionId: Int, earliestOrLatest: Long, isFromOrdinaryConsumer: Boolean = true): Long = {
+    val cluster = getCluster(zkClient)
+    val broker = cluster.getBroker(brokerId) match {
+      case Some(b) => b
+      case None => throw new KafkaException("Broker " + brokerId + " is unavailable. Cannot issue " +
+                                                    "getOffsetsBefore request")
+    }
+    earliestOrLatestOffset(broker, topic, partitionId, earliestOrLatest, isFromOrdinaryConsumer)
+  }
+}
 
 
 /**

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1397422&r1=1397421&r2=1397422&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Fri Oct 12 02:56:28 2012
@@ -27,7 +27,6 @@ import org.I0Itec.zkclient.exception.ZkN
 import java.net.InetAddress
 import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
 import org.apache.zookeeper.Watcher.Event.KeeperState
-import kafka.api.PartitionOffsetRequestInfo
 import java.util.UUID
 import kafka.serializer.Decoder
 import kafka.utils.ZkUtils._
@@ -253,31 +252,7 @@ private[kafka] class ZookeeperConsumerCo
     }
   }
 
-  private def earliestOrLatestOffset(topic: String, brokerId: Int, partitionId: Int, earliestOrLatest: Long): Long = {
-    var simpleConsumer: SimpleConsumer = null
-    var producedOffset: Long = -1L
-    try {
-      val cluster = getCluster(zkClient)
-      val broker = cluster.getBroker(brokerId) match {
-        case Some(b) => b
-        case None => throw new KafkaException("Broker " + brokerId + " is unavailable. Cannot issue " +
-          "getOffsetsBefore request")
-      }
-      simpleConsumer = new SimpleConsumer(broker.host, broker.port, ConsumerConfig.SocketTimeout,
-                                            ConsumerConfig.SocketBufferSize)
-      val topicAndPartition = TopicAndPartition(topic, partitionId)
-      val request = OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)))
-      producedOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
-    } catch {
-      case e =>
-        error("error in earliestOrLatestOffset() ", e)
-    }
-    finally {
-      if (simpleConsumer != null)
-        simpleConsumer.close
-    }
-    producedOffset
-  }
+
 
   class ZKSessionExpireListener(val dirs: ZKGroupDirs,
                                  val consumerIdString: String,
@@ -611,9 +586,9 @@ private[kafka] class ZookeeperConsumerCo
           case None =>
             config.autoOffsetReset match {
               case OffsetRequest.SmallestTimeString =>
-                earliestOrLatestOffset(topic, leader, partition, OffsetRequest.EarliestTime)
+                SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.EarliestTime)
               case OffsetRequest.LargestTimeString =>
-                earliestOrLatestOffset(topic, leader, partition, OffsetRequest.LatestTime)
+                SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.LatestTime)
               case _ =>
                 throw new InvalidConfigException("Wrong value in autoOffsetReset in ConsumerConfig")
             }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/OffsetRequest.scala?rev=1397422&r1=1397421&r2=1397422&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/OffsetRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/OffsetRequest.scala Fri Oct 12 02:56:28 2012
@@ -33,7 +33,7 @@ class OffsetRequest(requestInfo: java.ut
       requestInfo = scalaMap,
       versionId = versionId,
       clientId = clientId,
-      replicaId = Request.NonFollowerId
+      replicaId = Request.OrdinaryConsumerId
     )
   }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1397422&r1=1397421&r2=1397422&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Fri Oct 12 02:56:28 2012
@@ -64,7 +64,7 @@ class KafkaApis(val requestChannel: Requ
     request.apiLocalCompleteTimeNs = SystemTime.nanoseconds
   }
 
-  def handleLeaderAndISRRequest(request: RequestChannel.Request){
+  def handleLeaderAndISRRequest(request: RequestChannel.Request) {
     val leaderAndISRRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest]
     if(requestLogger.isTraceEnabled)
       requestLogger.trace("Handling leader and isr request " + leaderAndISRRequest)
@@ -81,7 +81,7 @@ class KafkaApis(val requestChannel: Requ
   }
 
 
-  def handleStopReplicaRequest(request: RequestChannel.Request){
+  def handleStopReplicaRequest(request: RequestChannel.Request) {
     val stopReplicaRequest = request.requestObj.asInstanceOf[StopReplicaRequest]
     if(requestLogger.isTraceEnabled)
       requestLogger.trace("Handling stop replica request " + stopReplicaRequest)
@@ -138,7 +138,7 @@ class KafkaApis(val requestChannel: Requ
         produceRequest.requiredAcks == 1 ||
         produceRequest.numPartitions <= 0 ||
         allPartitionHaveReplicationFactorOne ||
-        numPartitionsInError == produceRequest.numPartitions){
+        numPartitionsInError == produceRequest.numPartitions) {
       val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode, r.start)).toMap
       val response = ProducerResponse(produceRequest.versionId, produceRequest.correlationId, statuses)
       requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
@@ -262,21 +262,20 @@ class KafkaApis(val requestChannel: Requ
 
   /**
    * Read from all the offset details given and return a map of
-   * (topic, partition) -> FetchResponsePartitionData
+   * (topic, partition) -> PartitionData
    */
   private def readMessageSets(fetchRequest: FetchRequest) = {
     val isFetchFromFollower = fetchRequest.isFromFollower
     fetchRequest.requestInfo.map {
       case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) =>
-        val partitionData = 
-          try {
-            val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, isFetchFromFollower)
-            BrokerTopicStat.getBrokerTopicStat(topic).bytesOutRate.mark(messages.sizeInBytes)
-            BrokerTopicStat.getBrokerAllTopicStat.bytesOutRate.mark(messages.sizeInBytes)
-            if (!isFetchFromFollower) {
-              new FetchResponsePartitionData(partition, ErrorMapping.NoError, offset, highWatermark, messages)
-            } else {
-              debug("Leader %d for topic %s partition %d received fetch request from follower %d"
+        val partitionData = try {
+          val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, fetchRequest.replicaId)
+          BrokerTopicStat.getBrokerTopicStat(topic).bytesOutRate.mark(messages.sizeInBytes)
+          BrokerTopicStat.getBrokerAllTopicStat.bytesOutRate.mark(messages.sizeInBytes)
+          if (!isFetchFromFollower) {
+            new FetchResponsePartitionData(partition, ErrorMapping.NoError, offset, highWatermark, messages)
+          } else {
+            debug("Leader %d for topic %s partition %d received fetch request from follower %d"
                           .format(brokerId, topic, partition, fetchRequest.replicaId))
 
             new FetchResponsePartitionData(partition, ErrorMapping.NoError, offset, highWatermark, messages)
@@ -296,25 +295,27 @@ class KafkaApis(val requestChannel: Requ
   /**
    * Read from a single topic/partition at the given offset upto maxSize bytes
    */
-  private def readMessageSet(topic: String, 
-                             partition: Int, 
-                             offset: Long,
-                             maxSize: Int, 
-                             fromFollower: Boolean): (MessageSet, Long) = {
+  private def readMessageSet(topic: String, partition: Int, offset: Long,
+                             maxSize: Int, fromReplicaId: Int): (MessageSet, Long) = {
     // check if the current broker is the leader for the partitions
-    val leader = replicaManager.getLeaderReplicaIfLocal(topic, partition)
+    val localReplica = if(fromReplicaId == Request.DebuggingConsumerId)
+      replicaManager.getReplicaOrException(topic, partition)
+    else
+      replicaManager.getLeaderReplicaIfLocal(topic, partition)
     trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize))
-    val messages = leader.log match {
+    val maxOffsetOpt = if (fromReplicaId == Request.OrdinaryConsumerId) {
+      Some(localReplica.highWatermark)
+    } else {
+      None
+    }
+    val messages = localReplica.log match {
       case Some(log) =>
-        if(fromFollower)
-          log.read(startOffset = offset, maxLength = maxSize, maxOffset = None)
-        else
-          log.read(startOffset = offset, maxLength = maxSize, maxOffset = Some(leader.highWatermark))
+        log.read(offset, maxSize, maxOffsetOpt)
       case None =>
         error("Leader for topic %s partition %d on broker %d does not have a local log".format(topic, partition, brokerId))
         MessageSet.Empty
     }
-    (messages, leader.highWatermark)
+    (messages, localReplica.highWatermark)
   }
 
   /**
@@ -330,15 +331,17 @@ class KafkaApis(val requestChannel: Requ
       val (topicAndPartition, partitionOffsetRequestInfo) = elem
       try {
         // ensure leader exists
-        val leader = replicaManager.getLeaderReplicaIfLocal(
-          topicAndPartition.topic, topicAndPartition.partition)
+        val localReplica = if(!offsetRequest.isFromDebuggingClient)
+          replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
+        else
+          replicaManager.getReplicaOrException(topicAndPartition.topic, topicAndPartition.partition)
         val offsets = {
           val allOffsets = replicaManager.logManager.getOffsets(topicAndPartition,
                                                                 partitionOffsetRequestInfo.time,
                                                                 partitionOffsetRequestInfo.maxNumOffsets)
-          if (offsetRequest.isFromFollower) allOffsets
+          if (!offsetRequest.isFromOrdinaryClient) allOffsets
           else {
-            val hw = leader.highWatermark
+            val hw = localReplica.highWatermark
             if (allOffsets.exists(_ > hw))
               hw +: allOffsets.dropWhile(_ > hw)
             else allOffsets
@@ -488,7 +491,6 @@ class KafkaApis(val requestChannel: Requ
     }).toMap
 
     def respond() {
-      
       val finalErrorsAndOffsets = initialErrorsAndOffsets.map(
         status => {
           val pstat = partitionStatus(new RequestKey(status._1))

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1397422&r1=1397421&r2=1397422&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Fri Oct 12 02:56:28 2012
@@ -25,7 +25,7 @@ import kafka.log.LogManager
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
 import java.util.concurrent.TimeUnit
-import kafka.common.{UnknownTopicOrPartitionException, LeaderNotAvailableException, ErrorMapping}
+import kafka.common.{ReplicaNotAvailableException, UnknownTopicOrPartitionException, LeaderNotAvailableException, ErrorMapping}
 import kafka.api.{PartitionStateInfo, LeaderAndIsrRequest}
 
 
@@ -124,6 +124,14 @@ class ReplicaManager(val config: KafkaCo
       Some(partition)
   }
 
+  def getReplicaOrException(topic: String, partition: Int): Replica = {
+    val replicaOpt = getReplica(topic, partition)
+    if(replicaOpt.isDefined)
+      return replicaOpt.get
+    else
+      throw new ReplicaNotAvailableException("Replica %d is not available for partiton [%s, %d] yet".format(config.brokerId, topic, partition))
+  }
+
   def getLeaderReplicaIfLocal(topic: String, partitionId: Int): Replica =  {
     val partitionOpt = getPartition(topic, partitionId)
     partitionOpt match {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerShell.scala?rev=1397422&r1=1397421&r2=1397422&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerShell.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerShell.scala Fri Oct 12 02:56:28 2012
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.tools
-
-import joptsimple._
-import kafka.utils.{Utils, Logging}
-import java.util.concurrent.CountDownLatch
-import kafka.consumer._
-import kafka.serializer.StringDecoder
-
-/**
- * Program to read using the rich consumer and dump the results to standard out
- */
-object ConsumerShell {
-  def main(args: Array[String]): Unit = {
-    
-    val parser = new OptionParser
-    val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.")
-                           .withRequiredArg
-                           .describedAs("topic")
-                           .ofType(classOf[String])
-    val consumerPropsOpt = parser.accepts("props", "REQUIRED: Properties file with the consumer properties.")
-                           .withRequiredArg
-                           .describedAs("properties")
-                           .ofType(classOf[String])
-    val partitionsOpt = parser.accepts("partitions", "Number of partitions to consume from.")
-                           .withRequiredArg
-                           .describedAs("count")
-                           .ofType(classOf[java.lang.Integer])
-                           .defaultsTo(1)
-    
-    val options = parser.parse(args : _*)
-    
-    for(arg <- List(topicOpt, consumerPropsOpt)) {
-      if(!options.has(arg)) {
-        System.err.println("Missing required argument \"" + arg + "\"") 
-        parser.printHelpOn(System.err)
-        System.exit(1)
-      }
-    }
-    
-    val partitions = options.valueOf(partitionsOpt).intValue
-    val propsFile = options.valueOf(consumerPropsOpt)
-    val topic = options.valueOf(topicOpt)
-    
-    println("Starting consumer...")
-
-    val consumerConfig = new ConsumerConfig(Utils.loadProps(propsFile))
-    val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
-    val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(topic -> partitions), new StringDecoder)
-    var threadList = List[ZKConsumerThread]()
-    for ((topic, streamList) <- topicMessageStreams)
-      for (stream <- streamList)
-        threadList ::= new ZKConsumerThread(stream)
-
-    for (thread <- threadList)
-      thread.start
-
-    // attach shutdown handler to catch control-c
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      override def run() = {
-        consumerConnector.shutdown
-        threadList.foreach(_.shutdown)
-        println("consumer threads shutted down")        
-      }
-    })
-  }
-}
-
-class ZKConsumerThread(stream: KafkaStream[String]) extends Thread with Logging {
-  val shutdownLatch = new CountDownLatch(1)
-
-  override def run() {
-    println("Starting consumer thread..")
-    var count: Int = 0
-    try {
-      for (messageAndMetadata <- stream) {
-        println("consumed: " + messageAndMetadata.message)
-        count += 1
-      }
-    }catch {
-      case e:ConsumerTimeoutException => // this is ok
-      case oe: Exception => error("error in ZKConsumerThread", oe)
-    }
-    shutdownLatch.countDown
-    println("Received " + count + " messages")
-    println("thread shutdown !" )
-  }
-
-  def shutdown() {
-    shutdownLatch.await
-  }          
-}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala?rev=1397422&r1=1397421&r2=1397422&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala Fri Oct 12 02:56:28 2012
@@ -33,7 +33,7 @@ object DumpLogSegments {
       if(file.getName.endsWith(Log.LogFileSuffix)) {
         println("Dumping " + file)
         dumpLog(file, print)
-      } else if(file.getName.endsWith(Log.IndexFileSuffix)){
+      } else if(file.getName.endsWith(Log.IndexFileSuffix)) {
         println("Dumping " + file)
         dumpIndex(file)
       }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala?rev=1397422&r1=1397421&r2=1397422&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala Fri Oct 12 02:56:28 2012
@@ -17,62 +17,81 @@
 
 package kafka.tools
 
-import java.net.URI
 import joptsimple._
-import kafka.api.FetchRequestBuilder
 import kafka.utils._
 import kafka.consumer._
+import kafka.api.{OffsetRequest, FetchRequestBuilder, Request}
+import kafka.cluster.Broker
+import scala.collection.JavaConversions._
+
 
 /**
  * Command line program to dump out messages to standard out using the simple consumer
  */
 object SimpleConsumerShell extends Logging {
 
+  def USE_LEADER_REPLICA = -1
+
   def main(args: Array[String]): Unit = {
 
     val parser = new OptionParser
-    val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect to.")
+    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.")
                            .withRequiredArg
-                           .describedAs("kafka://hostname:port")
+                           .describedAs("hostname:port,...,hostname:port")
                            .ofType(classOf[String])
     val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.")
                            .withRequiredArg
                            .describedAs("topic")
                            .ofType(classOf[String])
-    val partitionOpt = parser.accepts("partition", "The partition to consume from.")
+    val partitionIdOpt = parser.accepts("partition", "The partition to consume from.")
                            .withRequiredArg
                            .describedAs("partition")
                            .ofType(classOf[java.lang.Integer])
                            .defaultsTo(0)
-    val offsetOpt = parser.accepts("offset", "The offset to start consuming from.")
+    val replicaIdOpt = parser.accepts("replica", "The replica id to consume from, default -1 means leader broker.")
                            .withRequiredArg
-                           .describedAs("offset")
-                           .ofType(classOf[java.lang.Long])
-                           .defaultsTo(0L)
-    val fetchsizeOpt = parser.accepts("fetchsize", "The fetch size of each request.")
-                           .withRequiredArg
-                           .describedAs("fetchsize")
+                           .describedAs("replica id")
                            .ofType(classOf[java.lang.Integer])
-                           .defaultsTo(1000000)
+                           .defaultsTo(USE_LEADER_REPLICA)
+    val offsetOpt = parser.accepts("offset", "The offset id to consume from, default to -2 which means from beginning; while value -1 means from end")
+                           .withOptionalArg()
+                           .describedAs("consume offset")
+                           .ofType(classOf[java.lang.Long])
+                           .defaultsTo(OffsetRequest.EarliestTime)
     val clientIdOpt = parser.accepts("clientId", "The ID of this client.")
                            .withOptionalArg
                            .describedAs("clientId")
                            .ofType(classOf[String])
                            .defaultsTo("SimpleConsumerShell")
+    val fetchSizeOpt = parser.accepts("fetchsize", "The fetch size of each request.")
+                           .withRequiredArg
+                           .describedAs("fetchsize")
+                           .ofType(classOf[java.lang.Integer])
+                           .defaultsTo(1024 * 1024)
+    val messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.")
+                           .withRequiredArg
+                           .describedAs("class")
+                           .ofType(classOf[String])
+                           .defaultsTo(classOf[NewlineMessageFormatter].getName)
+    val messageFormatterArgOpt = parser.accepts("property")
+                           .withRequiredArg
+                           .describedAs("prop")
+                           .ofType(classOf[String])
     val printOffsetOpt = parser.accepts("print-offsets", "Print the offsets returned by the iterator")
                            .withOptionalArg
                            .describedAs("print offsets")
                            .ofType(classOf[java.lang.Boolean])
                            .defaultsTo(false)
-    val printMessageOpt = parser.accepts("print-messages", "Print the messages returned by the iterator")
-                           .withOptionalArg
-                           .describedAs("print messages")
-                           .ofType(classOf[java.lang.Boolean])
-                           .defaultsTo(false)
+    val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.")
+                           .withRequiredArg
+                           .describedAs("ms")
+                           .ofType(classOf[java.lang.Integer])
+                           .defaultsTo(1000)
+    val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " +
+        "skip it instead of halt.")
 
     val options = parser.parse(args : _*)
-    
-    for(arg <- List(urlOpt, topicOpt)) {
+    for(arg <- List(brokerListOpt, topicOpt, partitionIdOpt)) {
       if(!options.has(arg)) {
         error("Missing required argument \"" + arg + "\"")
         parser.printHelpOn(System.err)
@@ -80,45 +99,124 @@ object SimpleConsumerShell extends Loggi
       }
     }
 
-    val url = new URI(options.valueOf(urlOpt))
     val topic = options.valueOf(topicOpt)
-    val partition = options.valueOf(partitionOpt).intValue
-    val startingOffset = options.valueOf(offsetOpt).longValue
-    val fetchSize = options.valueOf(fetchsizeOpt).intValue
+    val partitionId = options.valueOf(partitionIdOpt).intValue()
+    val replicaId = options.valueOf(replicaIdOpt).intValue()
+    var startingOffset = options.valueOf(offsetOpt).longValue
+    val fetchSize = options.valueOf(fetchSizeOpt).intValue
     val clientId = options.valueOf(clientIdOpt).toString
+    val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue()
+
+    val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
     val printOffsets = if(options.has(printOffsetOpt)) true else false
-    val printMessages = if(options.has(printMessageOpt)) true else false
 
-    info("Starting consumer...")
-    val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 64*1024)
-    val thread = Utils.newThread("kafka-consumer", new Runnable() {
+    val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
+    val formatterArgs = MessageFormatter.tryParseFormatterArgs(options.valuesOf(messageFormatterArgOpt))
+
+    // getting topic metadata
+    info("Getting topic metatdata...")
+    val metadataTargetBrokers = Utils.getAllBrokersFromBrokerList(options.valueOf(brokerListOpt))
+    val topicsMetadata = Utils.getTopicMetadata(Set(topic), metadataTargetBrokers).topicsMetadata
+    if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) {
+      System.err.println(("Error: no valid topic metadata for topic: %s, " + "what we get from server is only: %s").format(topic, topicsMetadata))
+      System.exit(1)
+    }
+
+    // validating partition id
+    val partitionsMetadata = topicsMetadata(0).partitionsMetadata
+    val partitionMetadataOpt = partitionsMetadata.find(p => p.partitionId == partitionId)
+    if(!partitionMetadataOpt.isDefined) {
+      System.err.println("Error: partition %d does not exist for topic %s".format(partitionId, topic))
+      System.exit(1)
+    }
+
+    // validating replica id and initializing target broker
+    var fetchTargetBroker: Broker = null
+    var replicaOpt: Option[Broker] = null
+    if(replicaId == USE_LEADER_REPLICA) {
+      replicaOpt = partitionMetadataOpt.get.leader
+      if(!replicaOpt.isDefined) {
+        System.err.println("Error: user speicifies to fetch from leader for partition (%s, %d) which has not been elected yet".format(replicaId, topic, partitionId))
+        System.exit(1)
+      }
+    }
+    else {
+      val replicasForPartition = partitionMetadataOpt.get.replicas
+      replicaOpt = replicasForPartition.find(r => r.id == replicaId)
+      if(!replicaOpt.isDefined) {
+        System.err.println("Error: replica %d does not exist for partition (%s, %d)".format(replicaId, topic, partitionId))
+        System.exit(1)
+      }
+    }
+    fetchTargetBroker = replicaOpt.get
+
+    // initializing starting offset
+    if(startingOffset < OffsetRequest.EarliestTime) {
+      System.err.println("Invalid starting offset: %d".format(startingOffset))
+      System.exit(1)
+    }
+    if(startingOffset < 0)
+      startingOffset = SimpleConsumer.earliestOrLatestOffset(fetchTargetBroker, topic, partitionId, startingOffset, false)
+
+    // initializing formatter
+    val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
+    formatter.init(formatterArgs)
+
+    info("Starting simple consumer shell to partition [%s, %d], replica [%d], host and port: [%s, %d], from offset [%d]"
+                 .format(topic, partitionId, replicaId, fetchTargetBroker.host, fetchTargetBroker.port, startingOffset))
+    val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, 10000, 64*1024)
+    val thread = Utils.newThread("kafka-simpleconsumer-shell", new Runnable() {
       def run() {
         var reqId = 0
         var offset = startingOffset
-        while(true) {
-          val fetchRequest = new FetchRequestBuilder()
-            .correlationId(reqId)
-            .clientId(clientId)
-            .addFetch(topic, partition, offset, fetchSize)
-            .build()
-          val fetchResponse = consumer.fetch(fetchRequest)
-          val messageSet = fetchResponse.messageSet(topic, partition)
-          debug("multi fetched " + messageSet.sizeInBytes + " bytes from offset " + offset)
-          var consumed = 0
-          for(messageAndOffset <- messageSet) {
-            if(printMessages)
-              info("consumed: " + Utils.toString(messageAndOffset.message.payload, "UTF-8"))
-            offset = messageAndOffset.nextOffset
-            if(printOffsets)
-              info("next offset = " + offset)
-            consumed += 1
+        try {
+          while(true) {
+            val fetchRequest = new FetchRequestBuilder()
+                    .correlationId(reqId)
+                    .clientId(clientId)
+                    .replicaId(Request.DebuggingConsumerId)
+                    .addFetch(topic, partitionId, offset, fetchSize)
+                    .maxWait(maxWaitMs)
+                    .minBytes(ConsumerConfig.MinFetchBytes)
+                    .build()
+            val fetchResponse = simpleConsumer.fetch(fetchRequest)
+            val messageSet = fetchResponse.messageSet(topic, partitionId)
+            debug("multi fetched " + messageSet.sizeInBytes + " bytes from offset " + offset)
+            var consumed = 0
+            for(messageAndOffset <- messageSet) {
+              try {
+                offset = messageAndOffset.offset
+                if(printOffsets)
+                  System.out.println("next offset = " + offset)
+                formatter.writeTo(messageAndOffset.message, System.out)
+              } catch {
+                case e =>
+                  if (skipMessageOnError)
+                    error("Error processing message, skipping this message: ", e)
+                  else
+                    throw e
+              }
+              if(System.out.checkError()) {
+                // This means no one is listening to our output stream any more, time to shutdown
+                System.err.println("Unable to write to standard out, closing consumer.")
+                formatter.close()
+                simpleConsumer.close()
+                System.exit(1)
+              }
+              consumed += 1
+            }
+            reqId += 1
           }
-          reqId += 1
+        } catch {
+          case e: Throwable =>
+            error("Error consuming topic, partition, replica (%s, %d, %d) with request id [%d] and offset [%d]".format(topic, partitionId, replicaId, reqId, offset), e)
         }
       }
-    }, false);
+    }, false)
     thread.start()
     thread.join()
+    System.out.flush()
+    formatter.close()
+    simpleConsumer.close()
   }
-
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala?rev=1397422&r1=1397421&r2=1397422&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala Fri Oct 12 02:56:28 2012
@@ -93,7 +93,7 @@ class SimpleFetchTest extends JUnit3Suit
 
     // This request (from a follower) wants to read up to 2*HW but should only get back up to HW bytes into the log
     val goodFetch = new FetchRequestBuilder()
-          .replicaId(Request.NonFollowerId)
+          .replicaId(Request.OrdinaryConsumerId)
           .addFetch(topic, partitionId, 0, fetchSize)
           .build()
     val goodFetchBB = TestUtils.createRequestByteBuffer(goodFetch)



Mime
View raw message