Author: nehanarkhede Date: Tue Aug 23 02:36:24 2011 New Revision: 1160529 URL: http://svn.apache.org/viewvc?rev=1160529&view=rev Log: Improve the command line tools in the bin directory to use the compression feature correctly; KAFKA-112; patched by nehanarkhede; reviewed by junrao Added: incubator/kafka/trunk/core/src/main/scala/kafka/tools/DumpLogSegments.scala - copied, changed from r1160521, incubator/kafka/trunk/core/src/main/scala/kafka/utils/DumpLogSegments.scala Removed: incubator/kafka/trunk/core/src/main/scala/kafka/utils/DumpLogSegments.scala Modified: incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala Copied: incubator/kafka/trunk/core/src/main/scala/kafka/tools/DumpLogSegments.scala (from r1160521, incubator/kafka/trunk/core/src/main/scala/kafka/utils/DumpLogSegments.scala) URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/DumpLogSegments.scala?p2=incubator/kafka/trunk/core/src/main/scala/kafka/tools/DumpLogSegments.scala&p1=incubator/kafka/trunk/core/src/main/scala/kafka/utils/DumpLogSegments.scala&r1=1160521&r2=1160529&rev=1160529&view=diff ============================================================================== --- incubator/kafka/trunk/core/src/main/scala/kafka/utils/DumpLogSegments.scala (original) +++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/DumpLogSegments.scala Tue Aug 23 02:36:24 2011 @@ -15,7 +15,7 @@ * limitations under the License. */ -package kafka.utils +package kafka.tools import java.io._ import kafka.message._ @@ -44,7 +44,7 @@ object DumpLogSegments { println("offset:\t %d \t invalid".format(offset)) if (!isNoPrint) println("payload:\t" + Utils.toString(messageAndOffset.message.payload, "UTF-8")) - offset += messageAndOffset.offset + offset = messageAndOffset.offset } } } Modified: incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala?rev=1160529&r1=1160528&r2=1160529&view=diff ============================================================================== --- incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala (original) +++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerPerformance.scala Tue Aug 23 02:36:24 2011 @@ -219,7 +219,7 @@ object ProducerPerformance { props.put("zk.connect", brokerInfoList(1)) else props.put("broker.list", brokerInfoList(1)) - props.put("compression.codec", config.compressionCodec.toString) + props.put("compression.codec", config.compressionCodec.codec.toString) props.put("reconnect.interval", Integer.MAX_VALUE.toString) props.put("buffer.size", (64*1024).toString) Modified: incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala?rev=1160529&r1=1160528&r2=1160529&view=diff ============================================================================== --- incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (original) +++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala Tue Aug 23 02:36:24 2011 @@ -23,6 +23,7 @@ import kafka.api.FetchRequest import kafka.utils._ import kafka.consumer._ import kafka.server._ +import org.apache.log4j.Logger /** * Command line program to dump out messages to standard out using the simple consumer @@ -30,7 +31,9 @@ import kafka.server._ object SimpleConsumerShell { def main(args: Array[String]): Unit = { - + + val logger = Logger.getLogger(getClass) + val parser = new OptionParser val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect to.") .withRequiredArg @@ -55,12 +58,22 @@ object SimpleConsumerShell { .describedAs("fetchsize") .ofType(classOf[java.lang.Integer]) .defaultsTo(1000000) + val printOffsetOpt = parser.accepts("print-offsets", "Print the offsets returned by the iterator") + .withOptionalArg + .describedAs("print offsets") + .ofType(classOf[java.lang.Boolean]) + .defaultsTo(false) + val printMessageOpt = parser.accepts("print-messages", "Print the messages returned by the iterator") + .withOptionalArg + .describedAs("print messages") + .ofType(classOf[java.lang.Boolean]) + .defaultsTo(false) val options = parser.parse(args : _*) for(arg <- List(urlOpt, topicOpt)) { if(!options.has(arg)) { - System.err.println("Missing required argument \"" + arg + "\"") + logger.error("Missing required argument \"" + arg + "\"") parser.printHelpOn(System.err) System.exit(1) } @@ -71,31 +84,35 @@ object SimpleConsumerShell { val partition = options.valueOf(partitionOpt).intValue val startingOffset = options.valueOf(offsetOpt).longValue val fetchsize = options.valueOf(fetchsizeOpt).intValue + val printOffsets = if(options.has(printOffsetOpt)) true else false + val printMessages = if(options.has(printMessageOpt)) true else false - println("Starting consumer...") + logger.info("Starting consumer...") val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 64*1024) val thread = Utils.newThread("kafka-consumer", new Runnable() { def run() { var offset = startingOffset while(true) { - val fetchRequest = new FetchRequest(topic, partition, offset, fetchsize) - val messageSets = consumer.multifetch(fetchRequest) - for (messages <- messageSets) { - println("multi fetched " + messages.sizeInBytes + " bytes from offset " + offset) + val fetchRequest = new FetchRequest(topic, partition, offset, fetchsize) + val messageSets = consumer.multifetch(fetchRequest) + for (messages <- messageSets) { + if(logger.isDebugEnabled) + logger.debug("multi fetched " + messages.sizeInBytes + " bytes from offset " + offset) var consumed = 0 for(messageAndOffset <- messages) { - println("consumed: " + Utils.toString(messageAndOffset.message.payload, "UTF-8")) + if(printMessages) + logger.info("consumed: " + Utils.toString(messageAndOffset.message.payload, "UTF-8")) + offset = messageAndOffset.offset + if(printOffsets) + logger.info("next offset = " + offset) consumed += 1 - } - if(consumed > 0) - offset += messages.validBytes + } } - Thread.sleep(10000) } - } + } }, false); thread.start() thread.join() } - + }