kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1198849 - in /incubator/kafka/trunk: config/producer.properties core/src/main/scala/kafka/tools/ProducerShell.scala
Date Mon, 07 Nov 2011 18:11:10 GMT
Author: junrao
Date: Mon Nov  7 18:11:10 2011
New Revision: 1198849

URL: http://svn.apache.org/viewvc?rev=1198849&view=rev
Log:
change ProducerShell to use high level producer; patched by Jun Rao; reviewed by Neha Narkhede;
KAFKA-195

Added:
    incubator/kafka/trunk/config/producer.properties
Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerShell.scala

Added: incubator/kafka/trunk/config/producer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/config/producer.properties?rev=1198849&view=auto
==============================================================================
--- incubator/kafka/trunk/config/producer.properties (added)
+++ incubator/kafka/trunk/config/producer.properties Mon Nov  7 18:11:10 2011
@@ -0,0 +1,80 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.producer.ProducerConfig for more details
+
+############################# Producer Basics #############################
+
+# need to set either broker.list or zk.connect
+
+# configure brokers statically
+# format: brokerid1:host1:port1,brokerid2:host2:port2 ...
+broker.list=0:localhost:9092
+
+# discover brokers from ZK
+#zk.connect=
+
+# zookeeper session timeout; default is 6000
+#zk.sessiontimeout.ms=
+
+# the max time that the client waits to establish a connection to zookeeper; default is 6000
+#zk.connectiontimeout.ms
+
+# name of the partitioner class for partitioning events; default partition spreads data randomly
+#partitioner.class=
+
+# specifies whether the messages are sent asynchronously (async) or synchronously (sync)
+producer.type=sync
+
+# specify the compression codec for all data generated: 0: no compression, 1: gzip
+compression.codec=0
+
+# message encoder
+serializer.class=kafka.serializer.StringEncoder
+
+# allow topic level compression
+#compressed.topics=
+
+# max message size; messages larger than that size are discarded; default is 1000000
+#max.message.size=
+
+
+############################# Async Producer #############################
+# maximum time, in milliseconds, for buffering data on the producer queue 
+#queue.time=
+
+# the maximum size of the blocking queue for buffering on the producer 
+#queue.size=
+
+# Timeout for event enqueue:
+# 0: events will be enqueued immediately or dropped if the queue is full
+# -ve: enqueue will block indefinitely if the queue is full
+# +ve: enqueue will block up to this many milliseconds if the queue is full
+#queue.enqueueTimeout.ms=
+
+# the number of messages batched at the producer 
+#batch.size=
+
+# the callback handler for one or multiple events 
+#callback.handler=
+
+# properties required to initialize the callback handler 
+#callback.handler.props=
+
+# the handler for events 
+#event.handler=
+
+# properties required to initialize the event handler 
+#event.handler.props=
+

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerShell.scala?rev=1198849&r1=1198848&r2=1198849&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerShell.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerShell.scala Mon Nov  7 18:11:10
2011
@@ -23,6 +23,7 @@ import joptsimple._
 import kafka.message._
 import kafka.producer._
 import java.util.Properties
+import kafka.utils.Utils
 
 /**
  * Interactive shell for producing messages from the command line
@@ -32,9 +33,9 @@ object ProducerShell {
   def main(args: Array[String]) {
     
     val parser = new OptionParser
-    val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect
to.")
+    val producerPropsOpt = parser.accepts("props", "REQUIRED: Properties file with the producer
properties.")
                            .withRequiredArg
-                           .describedAs("kafka://hostname:port")
+                           .describedAs("properties")
                            .ofType(classOf[String])
     val topicOpt = parser.accepts("topic", "REQUIRED: The topic to produce to.")
                            .withRequiredArg
@@ -43,7 +44,7 @@ object ProducerShell {
     
     val options = parser.parse(args : _*)
     
-    for(arg <- List(urlOpt, topicOpt)) {
+    for(arg <- List(producerPropsOpt, topicOpt)) {
       if(!options.has(arg)) {
         System.err.println("Missing required argument \"" + arg + "\"") 
         parser.printHelpOn(System.err)
@@ -51,15 +52,10 @@ object ProducerShell {
       }
     }
     
-    val url = new URI(options.valueOf(urlOpt))
+    val propsFile = options.valueOf(producerPropsOpt)
+    val producerConfig = new ProducerConfig(Utils.loadProps(propsFile))
     val topic = options.valueOf(topicOpt)
-    val props = new Properties()
-    props.put("host", url.getHost)
-    props.put("port", url.getPort.toString)
-    props.put("buffer.size", "65536")
-    props.put("connect.timeout.ms", "10000")
-    props.put("reconnect.interval", "100")
-    val producer = new SyncProducer(new SyncProducerConfig(props))
+    val producer = new Producer[String, String](producerConfig)
 
     val input = new BufferedReader(new InputStreamReader(System.in))
     var done = false
@@ -68,10 +64,9 @@ object ProducerShell {
       if(line == null) {
         done = true
       } else {
-        val lineBytes = line.trim.getBytes()
-        val messageList = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
messages = new Message(lineBytes))
-        producer.send(topic, messageList)
-        println("Sent: %s (%d bytes)".format(line, messageList.sizeInBytes))
+        val message = line.trim
+        producer.send(new ProducerData[String, String](topic, message))
+        println("Sent: %s (%d bytes)".format(line, message.getBytes.length))
       }
     }
     producer.close()



Mime
View raw message