kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1199125 - in /incubator/kafka/branches/0.7: ./ config/producer.properties config/server.properties core/src/main/scala/kafka/tools/ProducerShell.scala
Date Tue, 08 Nov 2011 06:48:24 GMT
Author: nehanarkhede
Date: Tue Nov  8 06:48:24 2011
New Revision: 1199125

URL: http://svn.apache.org/viewvc?rev=1199125&view=rev
Log:
merging fix for KAFKA-195 from trunk for 0.7 release

Added:
    incubator/kafka/branches/0.7/config/producer.properties
      - copied unchanged from r1198849, incubator/kafka/trunk/config/producer.properties
Modified:
    incubator/kafka/branches/0.7/   (props changed)
    incubator/kafka/branches/0.7/config/server.properties
    incubator/kafka/branches/0.7/core/src/main/scala/kafka/tools/ProducerShell.scala

Propchange: incubator/kafka/branches/0.7/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Nov  8 06:48:24 2011
@@ -1 +1 @@
-/incubator/kafka/trunk:1190521-1195253,1195282-1198536
+/incubator/kafka/trunk:1190521-1195253,1198537-1198849

Modified: incubator/kafka/branches/0.7/config/server.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.7/config/server.properties?rev=1199125&r1=1199124&r2=1199125&view=diff
==============================================================================
--- incubator/kafka/branches/0.7/config/server.properties (original)
+++ incubator/kafka/branches/0.7/config/server.properties Tue Nov  8 06:48:24 2011
@@ -28,7 +28,7 @@ brokerid=0
 ############################# Socket Server Settings #############################
 
 # The port the socket server listens on
-port=9092
+port=9093
 
 # The number of processor threads the socket server uses for receiving and answering requests.

 # Defaults to the number of cores on the machine

Modified: incubator/kafka/branches/0.7/core/src/main/scala/kafka/tools/ProducerShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.7/core/src/main/scala/kafka/tools/ProducerShell.scala?rev=1199125&r1=1199124&r2=1199125&view=diff
==============================================================================
--- incubator/kafka/branches/0.7/core/src/main/scala/kafka/tools/ProducerShell.scala (original)
+++ incubator/kafka/branches/0.7/core/src/main/scala/kafka/tools/ProducerShell.scala Tue Nov
 8 06:48:24 2011
@@ -23,6 +23,7 @@ import joptsimple._
 import kafka.message._
 import kafka.producer._
 import java.util.Properties
+import kafka.utils.Utils
 
 /**
  * Interactive shell for producing messages from the command line
@@ -32,9 +33,9 @@ object ProducerShell {
   def main(args: Array[String]) {
     
     val parser = new OptionParser
-    val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect
to.")
+    val producerPropsOpt = parser.accepts("props", "REQUIRED: Properties file with the producer
properties.")
                            .withRequiredArg
-                           .describedAs("kafka://hostname:port")
+                           .describedAs("properties")
                            .ofType(classOf[String])
     val topicOpt = parser.accepts("topic", "REQUIRED: The topic to produce to.")
                            .withRequiredArg
@@ -43,7 +44,7 @@ object ProducerShell {
     
     val options = parser.parse(args : _*)
     
-    for(arg <- List(urlOpt, topicOpt)) {
+    for(arg <- List(producerPropsOpt, topicOpt)) {
       if(!options.has(arg)) {
         System.err.println("Missing required argument \"" + arg + "\"") 
         parser.printHelpOn(System.err)
@@ -51,15 +52,10 @@ object ProducerShell {
       }
     }
     
-    val url = new URI(options.valueOf(urlOpt))
+    val propsFile = options.valueOf(producerPropsOpt)
+    val producerConfig = new ProducerConfig(Utils.loadProps(propsFile))
     val topic = options.valueOf(topicOpt)
-    val props = new Properties()
-    props.put("host", url.getHost)
-    props.put("port", url.getPort.toString)
-    props.put("buffer.size", "65536")
-    props.put("connect.timeout.ms", "10000")
-    props.put("reconnect.interval", "100")
-    val producer = new SyncProducer(new SyncProducerConfig(props))
+    val producer = new Producer[String, String](producerConfig)
 
     val input = new BufferedReader(new InputStreamReader(System.in))
     var done = false
@@ -68,10 +64,9 @@ object ProducerShell {
       if(line == null) {
         done = true
       } else {
-        val lineBytes = line.trim.getBytes()
-        val messageList = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
messages = new Message(lineBytes))
-        producer.send(topic, messageList)
-        println("Sent: %s (%d bytes)".format(line, messageList.sizeInBytes))
+        val message = line.trim
+        producer.send(new ProducerData[String, String](topic, message))
+        println("Sent: %s (%d bytes)".format(line, message.getBytes.length))
       }
     }
     producer.close()



Mime
View raw message