kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1401704 - in /incubator/kafka/branches/0.8/core/src/main/scala/kafka: consumer/SimpleConsumer.scala tools/SimpleConsumerShell.scala
Date Wed, 24 Oct 2012 14:19:18 GMT
Author: junrao
Date: Wed Oct 24 14:19:18 2012
New Revision: 1401704

URL: http://svn.apache.org/viewvc?rev=1401704&view=rev
Log:
SimpleConsumer throws UnsupportedOperationException: empty.head; patched by Yang Ye; reviewed
by Jun Rao; kafka-576

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala?rev=1401704&r1=1401703&r2=1401704&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala Wed
Oct 24 14:19:18 2012
@@ -30,7 +30,8 @@ import kafka.cluster.Broker
 
 
 object SimpleConsumer extends Logging {
-  def earliestOrLatestOffset(broker: Broker, topic: String, partitionId: Int, earliestOrLatest:
Long, isFromOrdinaryConsumer: Boolean): Long = {
+  def earliestOrLatestOffset(broker: Broker, topic: String, partitionId: Int, earliestOrLatest:
Long,
+                             isFromOrdinaryConsumer: Boolean): Long = {
     var simpleConsumer: SimpleConsumer = null
     var producedOffset: Long = -1L
     try {
@@ -38,9 +39,10 @@ object SimpleConsumer extends Logging {
                                           ConsumerConfig.SocketBufferSize)
       val topicAndPartition = TopicAndPartition(topic, partitionId)
       val request = if(isFromOrdinaryConsumer)
-        OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest,
1)))
+        new OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest,
1)))
       else
-        OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest,
1)), Request.DebuggingConsumerId.toShort)
+        new OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest,
1)),
+                          Request.DebuggingConsumerId)
       producedOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
     } catch {
       case e =>
@@ -53,7 +55,8 @@ object SimpleConsumer extends Logging {
     producedOffset
   }
 
-  def earliestOrLatestOffset(zkClient: ZkClient, topic: String, brokerId: Int, partitionId:
Int, earliestOrLatest: Long, isFromOrdinaryConsumer: Boolean = true): Long = {
+  def earliestOrLatestOffset(zkClient: ZkClient, topic: String, brokerId: Int, partitionId:
Int,
+                             earliestOrLatest: Long, isFromOrdinaryConsumer: Boolean = true):
Long = {
     val cluster = getCluster(zkClient)
     val broker = cluster.getBroker(brokerId) match {
       case Some(b) => b

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala?rev=1401704&r1=1401703&r2=1401704&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
Wed Oct 24 14:19:18 2012
@@ -25,7 +25,6 @@ import kafka.api.{OffsetRequest, FetchRe
 import kafka.cluster.Broker
 import scala.collection.JavaConversions._
 
-
 /**
  * Command line program to dump out messages to standard out using the simple consumer
  */
@@ -90,6 +89,8 @@ object SimpleConsumerShell extends Loggi
                            .defaultsTo(1000)
     val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error
when processing a message, " +
         "skip it instead of halt.")
+    val noWaitAtEndOfLogOpt = parser.accepts("no-wait-at-logend",
+        "If set, when the simple consumer reaches the end of the Log, it will stop, not waiting
for new produced messages")
 
     val options = parser.parse(args : _*)
     for(arg <- List(brokerListOpt, topicOpt, partitionIdOpt)) {
@@ -110,6 +111,7 @@ object SimpleConsumerShell extends Loggi
 
     val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
     val printOffsets = if(options.has(printOffsetOpt)) true else false
+    val noWaitAtEndOfLog = options.has(noWaitAtEndOfLogOpt)
 
     val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
     val formatterArgs = MessageFormatter.tryParseFormatterArgs(options.valuesOf(messageFormatterArgOpt))
@@ -182,11 +184,14 @@ object SimpleConsumerShell extends Loggi
                     .build()
             val fetchResponse = simpleConsumer.fetch(fetchRequest)
             val messageSet = fetchResponse.messageSet(topic, partitionId)
+            if (messageSet.validBytes <= 0 && noWaitAtEndOfLog) {
+              println("Terminating. Reached the end of partition (%s, %d) at offset %d".format(topic,
partitionId, offset))
+              return
+            }
             debug("multi fetched " + messageSet.sizeInBytes + " bytes from offset " + offset)
-            var consumed = 0
             for(messageAndOffset <- messageSet) {
               try {
-                offset = messageAndOffset.offset
+                offset = messageAndOffset.nextOffset
                 if(printOffsets)
                   System.out.println("next offset = " + offset)
                 formatter.writeTo(messageAndOffset.message, System.out)
@@ -204,7 +209,6 @@ object SimpleConsumerShell extends Loggi
                 simpleConsumer.close()
                 System.exit(1)
               }
-              consumed += 1
             }
           }
         } catch {



Mime
View raw message