kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1413839 - in /incubator/kafka/branches/0.8/core/src/main/scala/kafka: consumer/ producer/ producer/async/ serializer/ tools/ utils/
Date Mon, 26 Nov 2012 20:59:22 GMT
Author: jkreps
Date: Mon Nov 26 20:59:21 2012
New Revision: 1413839

URL: http://svn.apache.org/viewvc?rev=1413839&view=rev
Log:
KAFKA-544. Follow-up items on key-retention. Addresses misc. comments from Joel, see ticket
for details. 

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/serializer/Decoder.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala?rev=1413839&r1=1413838&r2=1413839&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
Mon Nov 26 20:59:21 2012
@@ -89,7 +89,7 @@ object ConsoleConsumer extends Logging {
             .withRequiredArg
             .describedAs("class")
             .ofType(classOf[String])
-            .defaultsTo(classOf[NewlineMessageFormatter].getName)
+            .defaultsTo(classOf[DefaultMessageFormatter].getName)
     val messageFormatterArgOpt = parser.accepts("property")
             .withRequiredArg
             .describedAs("prop")
@@ -256,10 +256,27 @@ trait MessageFormatter {
   def close() {}
 }
 
-class NewlineMessageFormatter extends MessageFormatter {
+class DefaultMessageFormatter extends MessageFormatter {
+  var printKey = false
+  var keySeparator = "\t".getBytes
+  var lineSeparator = "\n".getBytes
+  
+  override def init(props: Properties) {
+    if(props.containsKey("print.key"))
+      printKey = props.getProperty("print.key").trim.toLowerCase.equals("true")
+    if(props.containsKey("key.separator"))
+      keySeparator = props.getProperty("key.separator").getBytes
+    if(props.containsKey("line.separator"))
+      lineSeparator = props.getProperty("line.separator").getBytes
+  }
+  
   def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {
+    if(printKey) {
+      output.write(key)
+      output.write(keySeparator)
+    }
     output.write(value)
-    output.write('\n')
+    output.write(lineSeparator)
   }
 }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala?rev=1413839&r1=1413838&r2=1413839&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala
Mon Nov 26 20:59:21 2012
@@ -20,7 +20,9 @@ package kafka.producer
 import scala.collection.JavaConversions._
 import joptsimple._
 import java.util.Properties
+import java.util.regex._
 import java.io._
+import kafka.common._
 import kafka.message._
 import kafka.serializer._
 
@@ -49,13 +51,18 @@ object ConsoleProducer { 
                                .describedAs("timeout_ms")
                                .ofType(classOf[java.lang.Long])
                                .defaultsTo(1000)
-    val messageEncoderOpt = parser.accepts("message-encoder", "The class name of the message
encoder implementation to use.")
+    val valueEncoderOpt = parser.accepts("value-serializer", "The class name of the message
encoder implementation to use for serializing values.")
+                                 .withRequiredArg
+                                 .describedAs("encoder_class")
+                                 .ofType(classOf[java.lang.String])
+                                 .defaultsTo(classOf[StringEncoder].getName)
+    val keyEncoderOpt = parser.accepts("key-serializer", "The class name of the message encoder
implementation to use for serializing keys.")
                                  .withRequiredArg
                                  .describedAs("encoder_class")
                                  .ofType(classOf[java.lang.String])
                                  .defaultsTo(classOf[StringEncoder].getName)
     val messageReaderOpt = parser.accepts("line-reader", "The class name of the class to
use for reading lines from standard in. " + 
-                                                          "By default each line is read as
a seperate message.")
+                                                          "By default each line is read as
a separate message.")
                                   .withRequiredArg
                                   .describedAs("reader_class")
                                   .ofType(classOf[java.lang.String])
@@ -82,9 +89,11 @@ object ConsoleProducer { 
     val compress = options.has(compressOpt)
     val batchSize = options.valueOf(batchSizeOpt)
     val sendTimeout = options.valueOf(sendTimeoutOpt)
-    val encoderClass = options.valueOf(messageEncoderOpt)
+    val keyEncoderClass = options.valueOf(keyEncoderOpt)
+    val valueEncoderClass = options.valueOf(valueEncoderOpt)
     val readerClass = options.valueOf(messageReaderOpt)
     val cmdLineProps = parseLineReaderArgs(options.valuesOf(propertyOpt))
+    cmdLineProps.put("topic", topic)
 
     val props = new Properties()
     props.put("broker.list", brokerList)
@@ -94,12 +103,13 @@ object ConsoleProducer { 
     if(options.has(batchSizeOpt))
       props.put("batch.size", batchSize.toString)
     props.put("queue.time", sendTimeout.toString)
-    props.put("serializer.class", encoderClass)
+    props.put("key.serializer.class", keyEncoderClass)
+    props.put("serializer.class", valueEncoderClass)
 
-    val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader]
+    val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader[AnyRef,
AnyRef]]
     reader.init(System.in, cmdLineProps)
 
-    val producer = new Producer[Any, Any](new ProducerConfig(props))
+    val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))
 
     Runtime.getRuntime.addShutdownHook(new Thread() {
       override def run() {
@@ -107,11 +117,11 @@ object ConsoleProducer { 
       }
     })
 
-    var message: AnyRef = null
+    var message: KeyedMessage[AnyRef, AnyRef] = null
     do { 
       message = reader.readMessage()
       if(message != null)
-        producer.send(new KeyedMessage(topic, message))
+        producer.send(message)
     } while(message != null)
   }
 
@@ -127,19 +137,49 @@ object ConsoleProducer { 
     props
   }
 
-  trait MessageReader { 
+  trait MessageReader[K,V] { 
     def init(inputStream: InputStream, props: Properties) {}
-    def readMessage(): AnyRef
+    def readMessage(): KeyedMessage[K,V]
     def close() {}
   }
 
-  class LineMessageReader extends MessageReader { 
+  class LineMessageReader extends MessageReader[String, String] {
+    var topic: String = null
     var reader: BufferedReader = null
-
-    override def init(inputStream: InputStream, props: Properties) { 
+    var parseKey = false
+    var keySeparator = "\t"
+    var ignoreError = false
+    var lineNumber = 0
+
+    override def init(inputStream: InputStream, props: Properties) {
+      topic = props.getProperty("topic")
+      if(props.containsKey("parse.key"))
+        parseKey = props.getProperty("parse.key").trim.toLowerCase.equals("true")
+      if(props.containsKey("key.seperator"))
+        keySeparator = props.getProperty("key.separator")
+      if(props.containsKey("ignore.error"))
+        ignoreError = props.getProperty("ignore.error").trim.toLowerCase.equals("true")
       reader = new BufferedReader(new InputStreamReader(inputStream))
     }
 
-    override def readMessage() = reader.readLine()
+    override def readMessage() = {
+      lineNumber += 1
+      val line = reader.readLine()
+      if(parseKey) {
+        line.indexOf(keySeparator) match {
+          case -1 =>
+            if(ignoreError)
+              new KeyedMessage(topic, line)
+            else
+              throw new KafkaException("No key found on line " + lineNumber + ": " + line)
+          case n =>
+            new KeyedMessage(topic,
+                             line.substring(0, n), 
+                             if(n + keySeparator.size > line.size) "" else line.substring(n
+ keySeparator.size))
+        }
+      } else {
+        new KeyedMessage(topic, line) 
+      }
+    }
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala?rev=1413839&r1=1413838&r2=1413839&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
Mon Nov 26 20:59:21 2012
@@ -19,7 +19,7 @@ package kafka.producer.async
 
 import kafka.utils.{SystemTime, Logging}
 import java.util.concurrent.{TimeUnit, CountDownLatch, BlockingQueue}
-import collection.mutable.ListBuffer
+import collection.mutable.ArrayBuffer
 import kafka.producer.KeyedMessage
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
@@ -57,7 +57,7 @@ class ProducerSendThread[K,V](val thread
 
   private def processEvents() {
     var lastSend = SystemTime.milliseconds
-    var events = new ListBuffer[KeyedMessage[K,V]]
+    var events = new ArrayBuffer[KeyedMessage[K,V]]
     var full: Boolean = false
 
     // drain the queue until you get a shutdown command
@@ -85,7 +85,7 @@ class ProducerSendThread[K,V](val thread
           // if either queue time has reached or batch size has reached, dispatch to event
handler
           tryToHandle(events)
           lastSend = SystemTime.milliseconds
-          events = new ListBuffer[KeyedMessage[K,V]]
+          events = new ArrayBuffer[KeyedMessage[K,V]]
         }
     }
     // send the last batch of events

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/serializer/Decoder.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/serializer/Decoder.scala?rev=1413839&r1=1413838&r2=1413839&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/serializer/Decoder.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/serializer/Decoder.scala Mon Nov
26 20:59:21 2012
@@ -37,13 +37,6 @@ class DefaultDecoder(props: VerifiablePr
 }
 
 /**
- * Decode messages without any key
- */
-class KeylessMessageDecoder(props: VerifiableProperties = null) extends Decoder[Message]
{
-  def fromBytes(bytes: Array[Byte]) = new Message(bytes)
-}
-
-/**
  * The string encoder translates strings into bytes. It uses UTF8 by default but takes
  * an optional property serializer.encoding to control this.
  */

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala?rev=1413839&r1=1413838&r2=1413839&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
Mon Nov 26 20:59:21 2012
@@ -74,7 +74,7 @@ object SimpleConsumerShell extends Loggi
                            .withRequiredArg
                            .describedAs("class")
                            .ofType(classOf[String])
-                           .defaultsTo(classOf[NewlineMessageFormatter].getName)
+                           .defaultsTo(classOf[DefaultMessageFormatter].getName)
     val messageFormatterArgOpt = parser.accepts("property")
                            .withRequiredArg
                            .describedAs("prop")

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1413839&r1=1413838&r2=1413839&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Mon Nov 26 20:59:21
2012
@@ -426,7 +426,7 @@ object Utils extends Logging {
   }
 
   /**
-   * This method gets comma seperated values which contains key,value pairs and returns a
map of
+   * This method gets comma separated values which contains key,value pairs and returns a
map of
    * key value pairs. the format of allCSVal is key1:val1, key2:val2 ....
    */
   def parseCsvMap(str: String): Map[String, String] = {



Mime
View raw message