kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1176671 - in /incubator/kafka/trunk: core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/javaapi/consumer/ core/src/main/scala/kafka/tools/ core/src/test/scala/other/kafka/ core/src/test/scala/unit/kafka/consumer/ core/src/test/s...
Date Wed, 28 Sep 2011 00:42:31 GMT
Author: junrao
Date: Wed Sep 28 00:42:30 2011
New Revision: 1176671

URL: http://svn.apache.org/viewvc?rev=1176671&view=rev
Log:
Consumer needs a pluggable decoder; patched by Joel Koshy; reviewed by Jun Rao; KAFKA-3

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
    incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerShell.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
    incubator/kafka/trunk/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/trunk/examples/src/main/java/kafka/examples/Consumer.java
    incubator/kafka/trunk/project/build/KafkaProject.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala?rev=1176671&r1=1176670&r2=1176671&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala Wed Sep
28 00:42:30 2011
@@ -119,7 +119,7 @@ object ConsoleConsumer {
       }
     })
     
-    var stream: KafkaMessageStream = connector.createMessageStreams(Map(topic -> 1)).get(topic).get.get(0)
+    var stream = connector.createMessageStreams(Map(topic -> 1)).get(topic).get.get(0)
     val iter =
       if(maxMessages >= 0)
         stream.slice(0, maxMessages)

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConnector.scala?rev=1176671&r1=1176670&r2=1176671&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConnector.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConnector.scala Wed Sep
28 00:42:30 2011
@@ -20,6 +20,7 @@ package kafka.consumer
 import scala.collection._
 import kafka.utils.Utils
 import org.apache.log4j.Logger
+import kafka.serializer.{DefaultDecoder, Decoder}
 
 /**
  *  Main interface for consumer
@@ -32,7 +33,9 @@ trait ConsumerConnector {
    *  @return a map of (topic, list of  KafkaMessageStream) pair. The number of items in
the
    *          list is #streams. Each KafkaMessageStream supports an iterator of messages.
    */
-  def createMessageStreams(topicCountMap: Map[String,Int]) : Map[String,List[KafkaMessageStream]]
+  def createMessageStreams[T](topicCountMap: Map[String,Int],
+                              decoder: Decoder[T] = new DefaultDecoder)
+    : Map[String,List[KafkaMessageStream[T]]]
 
   /**
    *  Commit the offsets of all broker partitions connected by this connector.

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala?rev=1176671&r1=1176670&r2=1176671&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala Wed Sep
28 00:42:30 2011
@@ -22,32 +22,35 @@ import org.apache.log4j.Logger
 import java.util.concurrent.{TimeUnit, BlockingQueue}
 import kafka.cluster.Partition
 import kafka.message.{MessageAndOffset, MessageSet, Message}
+import kafka.serializer.Decoder
 
 /**
  * An iterator that blocks until a value can be read from the supplied queue.
  * The iterator takes a shutdownCommand object which can be added to the queue to trigger
a shutdown
  * 
  */
-class ConsumerIterator(private val channel: BlockingQueue[FetchedDataChunk], consumerTimeoutMs:
Int)
-        extends IteratorTemplate[Message] {
+class ConsumerIterator[T](private val channel: BlockingQueue[FetchedDataChunk],
+                          consumerTimeoutMs: Int,
+                          private val decoder: Decoder[T])
+        extends IteratorTemplate[T] {
   
-  private val logger = Logger.getLogger(classOf[ConsumerIterator])
+  private val logger = Logger.getLogger(classOf[ConsumerIterator[T]])
   private var current: Iterator[MessageAndOffset] = null
   private var currentDataChunk: FetchedDataChunk = null
   private var currentTopicInfo: PartitionTopicInfo = null
   private var consumedOffset: Long = -1L
 
-  override def next(): Message = {
-    val message = super.next
+  override def next(): T = {
+    val decodedMessage = super.next()
     if(consumedOffset < 0)
       throw new IllegalStateException("Offset returned by the message set is invalid %d".format(consumedOffset))
     currentTopicInfo.resetConsumeOffset(consumedOffset)
     if(logger.isTraceEnabled)
       logger.trace("Setting consumed offset to %d".format(consumedOffset))
-    message
+    decodedMessage
   }
 
-  protected def makeNext(): Message = {
+  protected def makeNext(): T = {
     // if we don't have an iterator, get one
     if(current == null || !current.hasNext) {
       if (consumerTimeoutMs < 0)
@@ -62,7 +65,7 @@ class ConsumerIterator(private val chann
         if(logger.isDebugEnabled)
           logger.debug("Received the shutdown command")
     	  channel.offer(currentDataChunk)
-        return allDone
+        return allDone()
       } else {
         currentTopicInfo = currentDataChunk.topicInfo
         if (currentTopicInfo.getConsumeOffset != currentDataChunk.fetchOffset) {
@@ -73,9 +76,9 @@ class ConsumerIterator(private val chann
         current = currentDataChunk.messages.iterator
       }
     }
-    val item = current.next
+    val item = current.next()
     consumedOffset = item.offset
-    item.message
+    decoder.toEvent(item.message)
   }
   
 }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala?rev=1176671&r1=1176670&r2=1176671&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/KafkaMessageStream.scala Wed
Sep 28 00:42:30 2011
@@ -20,20 +20,23 @@ package kafka.consumer
 import java.util.concurrent.BlockingQueue
 import org.apache.log4j.Logger
 import kafka.message.Message
-
+import kafka.serializer.{DefaultDecoder, Decoder}
 
 /**
  * All calls to elements should produce the same thread-safe iterator? Should have a seperate
thread
  * that feeds messages into a blocking queue for processing.
  */
-class KafkaMessageStream(private val queue: BlockingQueue[FetchedDataChunk], consumerTimeoutMs:
Int)
-   extends Iterable[Message] with java.lang.Iterable[Message]{
+class KafkaMessageStream[T](private val queue: BlockingQueue[FetchedDataChunk],
+                            consumerTimeoutMs: Int,
+                            private val decoder: Decoder[T])
+   extends Iterable[T] with java.lang.Iterable[T]{
 
   private val logger = Logger.getLogger(getClass())
-  private val iter: ConsumerIterator = new ConsumerIterator(queue, consumerTimeoutMs)
+  private val iter: ConsumerIterator[T] =
+    new ConsumerIterator[T](queue, consumerTimeoutMs, decoder)
     
   /**
    *  Create an iterator over messages in the stream.
    */
-  def iterator(): ConsumerIterator = iter
+  def iterator(): ConsumerIterator[T] = iter
 }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1176671&r1=1176670&r2=1176671&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
(original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
Wed Sep 28 00:42:30 2011
@@ -29,6 +29,7 @@ import org.I0Itec.zkclient.{IZkStateList
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import kafka.api.OffsetRequest
 import java.util.UUID
+import kafka.serializer.Decoder
 
 /**
  * This class handles the consumers interaction with zookeeper
@@ -103,8 +104,10 @@ private[kafka] class ZookeeperConsumerCo
 
   def this(config: ConsumerConfig) = this(config, true)
 
-  def createMessageStreams(topicCountMap: Map[String,Int]) : Map[String,List[KafkaMessageStream]]
= {
-    consume(topicCountMap)
+  def createMessageStreams[T](topicCountMap: Map[String,Int],
+                              decoder: Decoder[T])
+      : Map[String,List[KafkaMessageStream[T]]] = {
+    consume(topicCountMap, decoder)
   }
 
   private def createFetcher() {
@@ -143,13 +146,15 @@ private[kafka] class ZookeeperConsumerCo
     }
   }
 
-  def consume(topicCountMap: scala.collection.Map[String,Int]): Map[String,List[KafkaMessageStream]]
= {
+  def consume[T](topicCountMap: scala.collection.Map[String,Int],
+                 decoder: Decoder[T])
+      : Map[String,List[KafkaMessageStream[T]]] = {
     logger.debug("entering consume ")
     if (topicCountMap == null)
       throw new RuntimeException("topicCountMap is null")
 
     val dirs = new ZKGroupDirs(config.groupId)
-    var ret = new mutable.HashMap[String,List[KafkaMessageStream]]
+    var ret = new mutable.HashMap[String,List[KafkaMessageStream[T]]]
 
     var consumerUuid : String = null
     config.consumerId match {
@@ -177,11 +182,11 @@ private[kafka] class ZookeeperConsumerCo
     // create a queue per topic per consumer thread
     val consumerThreadIdsPerTopic = topicCount.getConsumerThreadIdsPerTopic
     for ((topic, threadIdSet) <- consumerThreadIdsPerTopic) {
-      var streamList: List[KafkaMessageStream] = Nil
+      var streamList: List[KafkaMessageStream[T]] = Nil
       for (threadId <- threadIdSet) {
         val stream = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
         queues.put((topic, threadId), stream)
-        streamList ::= new KafkaMessageStream(stream, config.consumerTimeoutMs)
+        streamList ::= new KafkaMessageStream[T](stream, config.consumerTimeoutMs, decoder)
       }
       ret += (topic -> streamList)
       logger.debug("adding topic " + topic + " and stream to map..")

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java?rev=1176671&r1=1176670&r2=1176671&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
(original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
Wed Sep 28 00:42:30 2011
@@ -18,19 +18,25 @@
 package kafka.javaapi.consumer;
 
 import kafka.consumer.KafkaMessageStream;
+import kafka.message.Message;
+import kafka.serializer.Decoder;
 
 import java.util.List;
 import java.util.Map;
 
 public interface ConsumerConnector {
     /**
-     *  Create a list of MessageStreams for each topic.
+     *  Create a list of MessageStreams of type T for each topic.
      *
      *  @param topicCountMap  a map of (topic, #streams) pair
+     *  @param decoder a decoder that converts from Message to T
      *  @return a map of (topic, list of  KafkaMessageStream) pair. The number of items in
the
      *          list is #streams. Each KafkaMessageStream supports an iterator of messages.
      */
-    public Map<String, List<KafkaMessageStream>> createMessageStreams(Map<String,
Integer> topicCountMap);
+    public <T> Map<String, List<KafkaMessageStream<T>>> createMessageStreams(
+            Map<String, Integer> topicCountMap, Decoder<T> decoder);
+    public Map<String, List<KafkaMessageStream<Message>>> createMessageStreams(
+            Map<String, Integer> topicCountMap);
 
     /**
      *  Commit the offsets of all broker partitions connected by this connector.

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala?rev=1176671&r1=1176670&r2=1176671&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
(original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
Wed Sep 28 00:42:30 2011
@@ -17,6 +17,8 @@
 package kafka.javaapi.consumer
 
 import kafka.consumer.{KafkaMessageStream, ConsumerConfig}
+import kafka.message.Message
+import kafka.serializer.{DefaultDecoder, Decoder}
 
 /**
  * This class handles the consumers interaction with zookeeper
@@ -63,15 +65,17 @@ private[kafka] class ZookeeperConsumerCo
   def this(config: ConsumerConfig) = this(config, true)
 
  // for java client
-  def createMessageStreams(topicCountMap: java.util.Map[String,java.lang.Integer]):
-    java.util.Map[String,java.util.List[KafkaMessageStream]] = {
+  def createMessageStreams[T](
+        topicCountMap: java.util.Map[String,java.lang.Integer],
+        decoder: Decoder[T])
+      : java.util.Map[String,java.util.List[KafkaMessageStream[T]]] = {
     import scala.collection.JavaConversions._
 
     val scalaTopicCountMap: Map[String, Int] = Map.empty[String, Int] ++ asMap(topicCountMap.asInstanceOf[java.util.Map[String,
Int]])
-    val scalaReturn = underlying.consume(scalaTopicCountMap)
-    val ret = new java.util.HashMap[String,java.util.List[KafkaMessageStream]]
+    val scalaReturn = underlying.consume(scalaTopicCountMap, decoder)
+    val ret = new java.util.HashMap[String,java.util.List[KafkaMessageStream[T]]]
     for ((topic, streams) <- scalaReturn) {
-      var javaStreamList = new java.util.ArrayList[KafkaMessageStream]
+      var javaStreamList = new java.util.ArrayList[KafkaMessageStream[T]]
       for (stream <- streams)
         javaStreamList.add(stream)
       ret.put(topic, javaStreamList)
@@ -79,6 +83,12 @@ private[kafka] class ZookeeperConsumerCo
     ret
   }
 
+  def createMessageStreams(
+        topicCountMap: java.util.Map[String,java.lang.Integer])
+      : java.util.Map[String,java.util.List[KafkaMessageStream[Message]]] =
+    createMessageStreams(topicCountMap, new DefaultDecoder)
+
+
   def commitOffsets() {
     underlying.commitOffsets
   }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerShell.scala?rev=1176671&r1=1176670&r2=1176671&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerShell.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/ConsumerShell.scala Wed Sep 28 00:42:30
2011
@@ -22,6 +22,7 @@ import kafka.utils.Utils
 import java.util.concurrent.CountDownLatch
 import org.apache.log4j.Logger
 import kafka.consumer._
+import kafka.serializer.StringDecoder
 
 /**
  * Program to read using the rich consumer and dump the results to standard out
@@ -63,7 +64,7 @@ object ConsumerShell {
 
     val consumerConfig = new ConsumerConfig(Utils.loadProps(propsFile))
     val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
-    val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(topic ->
partitions))
+    val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(topic ->
partitions), new StringDecoder)
     var threadList = List[ZKConsumerThread]()
     for ((topic, streamList) <- topicMessageStreams)
       for (stream <- streamList)
@@ -83,7 +84,7 @@ object ConsumerShell {
   }
 }
 
-class ZKConsumerThread(stream: KafkaMessageStream) extends Thread {
+class ZKConsumerThread(stream: KafkaMessageStream[String]) extends Thread {
   val shutdownLatch = new CountDownLatch(1)
   val logger = Logger.getLogger(getClass)
 
@@ -92,7 +93,7 @@ class ZKConsumerThread(stream: KafkaMess
     var count: Int = 0
     try {
       for (message <- stream) {
-        println("consumed: " + Utils.toString(message.payload, "UTF-8"))
+        println("consumed: " + message)
         count += 1
       }
     }catch {

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala?rev=1176671&r1=1176670&r2=1176671&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala Wed Sep
28 00:42:30 2011
@@ -139,7 +139,7 @@ object ReplayLogProducer {
     }
   }
 
-  class ZKConsumerThread(config: Config, stream: KafkaMessageStream) extends Thread {
+  class ZKConsumerThread(config: Config, stream: KafkaMessageStream[Message]) extends Thread
{
     val shutdownLatch = new CountDownLatch(1)
     val logger = Logger.getLogger(getClass)
     val props = new Properties()

Modified: incubator/kafka/trunk/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala?rev=1176671&r1=1176670&r2=1176671&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala Wed
Sep 28 00:42:30 2011
@@ -18,6 +18,7 @@
 package kafka
 
 import consumer._
+import message.Message
 import utils.Utils
 import java.util.concurrent.CountDownLatch
 
@@ -55,7 +56,7 @@ object TestZKConsumerOffsets {
   }
 }
 
-private class ConsumerThread(stream: KafkaMessageStream) extends Thread {
+private class ConsumerThread(stream: KafkaMessageStream[Message]) extends Thread {
   val shutdownLatch = new CountDownLatch(1)
 
   override def run() {

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala?rev=1176671&r1=1176670&r2=1176671&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
(original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
Wed Sep 28 00:42:30 2011
@@ -27,6 +27,7 @@ import kafka.utils.{TestZKUtils, TestUti
 import org.scalatest.junit.JUnit3Suite
 import org.apache.log4j.{Level, Logger}
 import kafka.message._
+import kafka.serializer.StringDecoder
 
 class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with
ZooKeeperTestHarness {
   private val logger = Logger.getLogger(getClass())
@@ -124,26 +125,6 @@ class ZookeeperConsumerConnectorTest ext
     val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
     requestHandlerLogger.setLevel(Level.FATAL)
 
-    var actualMessages: List[Message] = Nil
-
-    // test consumer timeout logic
-    val consumerConfig0 = new ConsumerConfig(
-      TestUtils.createConsumerProperties(zkConnect, group, consumer0)) {
-      override val consumerTimeoutMs = 200
-    }
-    val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true)
-    val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(Predef.Map(topic
-> numNodes*numParts/2))
-    try {
-      getMessages(nMessages*2, topicMessageStreams0)
-      fail("should get an exception")
-    }
-    catch {
-      case e: ConsumerTimeoutException => // this is ok
-        println("This is ok")
-      case e => throw e
-    }
-    zkConsumerConnector0.shutdown
-
     println("Sending messages for 1st consumer")
     // send some messages to each broker
     val sentMessages1 = sendMessages(nMessages, "batch1", DefaultCompressionCodec)
@@ -227,6 +208,41 @@ class ZookeeperConsumerConnectorTest ext
     requestHandlerLogger.setLevel(Level.ERROR)
   }
 
+  def testConsumerDecoder() {
+    val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
+    requestHandlerLogger.setLevel(Level.FATAL)
+
+    val sentMessages = sendMessages(nMessages, "batch1", NoCompressionCodec).
+      map(m => Utils.toString(m.payload, "UTF-8")).
+      sortWith((s, t) => s.compare(t) == -1)
+    val consumerConfig = new ConsumerConfig(
+      TestUtils.createConsumerProperties(zkConnect, group, consumer1))
+
+    val zkConsumerConnector =
+      new ZookeeperConsumerConnector(consumerConfig, true)
+    val topicMessageStreams =
+      zkConsumerConnector.createMessageStreams(
+        Predef.Map(topic -> numNodes*numParts/2), new StringDecoder)
+
+    var receivedMessages: List[String] = Nil
+    for ((topic, messageStreams) <- topicMessageStreams) {
+      for (messageStream <- messageStreams) {
+        val iterator = messageStream.iterator
+        for (i <- 0 until nMessages * 2) {
+          assertTrue(iterator.hasNext())
+          val message = iterator.next()
+          receivedMessages ::= message
+          logger.debug("received message: " + message)
+        }
+      }
+    }
+    receivedMessages = receivedMessages.sortWith((s, t) => s.compare(t) == -1)
+    assertEquals(sentMessages, receivedMessages)
+
+    zkConsumerConnector.shutdown()
+    requestHandlerLogger.setLevel(Level.ERROR)
+  }
+
   def sendMessages(conf: KafkaConfig, messagesPerNode: Int, header: String, compression:
CompressionCodec): List[Message]= {
     var messages: List[Message] = Nil
     val producer = TestUtils.createProducer("localhost", conf.port)
@@ -250,7 +266,7 @@ class ZookeeperConsumerConnectorTest ext
     messages.sortWith((s,t) => s.checksum < t.checksum)
   }
 
-  def getMessages(nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaMessageStream]]):
List[Message]= {
+  def getMessages(nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaMessageStream[Message]]]):
List[Message]= {
     var messages: List[Message] = Nil
     for ((topic, messageStreams) <- topicMessageStreams) {
       for (messageStream <- messageStreams) {

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala?rev=1176671&r1=1176670&r2=1176671&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
(original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
Wed Sep 28 00:42:30 2011
@@ -31,6 +31,7 @@ import kafka.consumer.{Consumer, Consume
 import javax.management.NotCompliantMBeanException
 import org.apache.log4j.{Level, Logger}
 import kafka.message.{NoCompressionCodec, DefaultCompressionCodec, CompressionCodec, Message}
+import kafka.serializer.StringDecoder
 
 class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with
ZooKeeperTestHarness {
   private val logger = Logger.getLogger(getClass())
@@ -125,24 +126,6 @@ class ZookeeperConsumerConnectorTest ext
   def testCompression() {
     val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
     requestHandlerLogger.setLevel(Level.FATAL)
-    var actualMessages: List[Message] = Nil
-
-    // test consumer timeout logic
-    val consumerConfig0 = new ConsumerConfig(
-      TestUtils.createConsumerProperties(zkConnect, group, consumer0)) {
-      override val consumerTimeoutMs = 200
-    }
-    val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true)
-    val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(toJavaMap(Predef.Map(topic
-> numNodes*numParts/2)))
-    try {
-      getMessages(nMessages*2, topicMessageStreams0)
-      fail("should get an exception")
-    }
-    catch {
-      case e: ConsumerTimeoutException => // this is ok
-      case e => throw e
-    }
-    zkConsumerConnector0.shutdown
 
     // send some messages to each broker
     val sentMessages1 = sendMessages(nMessages, "batch1", DefaultCompressionCodec)
@@ -224,6 +207,41 @@ class ZookeeperConsumerConnectorTest ext
     requestHandlerLogger.setLevel(Level.ERROR)
   }
 
+  def testConsumerDecoder() {
+    val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
+    requestHandlerLogger.setLevel(Level.FATAL)
+
+    val sentMessages = sendMessages(nMessages, "batch1", NoCompressionCodec).
+      map(m => Utils.toString(m.payload, "UTF-8")).
+      sortWith((s, t) => s.compare(t) == -1)
+    val consumerConfig = new ConsumerConfig(
+      TestUtils.createConsumerProperties(zkConnect, group, consumer1))
+
+    val zkConsumerConnector =
+      new ZookeeperConsumerConnector(consumerConfig, true)
+    val topicMessageStreams = zkConsumerConnector.createMessageStreams(
+      Predef.Map(topic -> new java.lang.Integer(numNodes * numParts / 2)), new StringDecoder)
+
+    var receivedMessages: List[String] = Nil
+    for ((topic, messageStreams) <- topicMessageStreams) {
+      for (messageStream <- messageStreams) {
+        val iterator = messageStream.iterator
+        for (i <- 0 until nMessages * 2) {
+          assertTrue(iterator.hasNext())
+          val message = iterator.next()
+          receivedMessages ::= message
+          logger.debug("received message: " + message)
+        }
+      }
+    }
+    receivedMessages = receivedMessages.sortWith((s, t) => s.compare(t) == -1)
+    assertEquals(sentMessages, receivedMessages)
+
+    zkConsumerConnector.shutdown()
+    requestHandlerLogger.setLevel(Level.ERROR)
+  }
+
+
   def sendMessages(conf: KafkaConfig, messagesPerNode: Int, header: String, compressed: CompressionCodec):
List[Message]= {
     var messages: List[Message] = Nil
     val producer = kafka.javaapi.Implicits.toJavaSyncProducer(TestUtils.createProducer("localhost",
conf.port))
@@ -247,7 +265,7 @@ class ZookeeperConsumerConnectorTest ext
     messages.sortWith((s,t) => s.checksum < t.checksum)
   }
 
-  def getMessages(nMessagesPerThread: Int, jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaMessageStream]])
+  def getMessages(nMessagesPerThread: Int, jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaMessageStream[Message]]])
   : List[Message]= {
     var messages: List[Message] = Nil
     val topicMessageStreams = asMap(jTopicMessageStreams)

Modified: incubator/kafka/trunk/examples/src/main/java/kafka/examples/Consumer.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/examples/src/main/java/kafka/examples/Consumer.java?rev=1176671&r1=1176670&r2=1176671&view=diff
==============================================================================
--- incubator/kafka/trunk/examples/src/main/java/kafka/examples/Consumer.java (original)
+++ incubator/kafka/trunk/examples/src/main/java/kafka/examples/Consumer.java Wed Sep 28 00:42:30
2011
@@ -16,15 +16,16 @@
  */
 package kafka.examples;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
 import kafka.consumer.ConsumerConfig;
 import kafka.consumer.ConsumerIterator;
 import kafka.consumer.KafkaMessageStream;
 import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.Message;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 
 public class Consumer extends Thread
 {
@@ -33,7 +34,8 @@ public class Consumer extends Thread
   
   public Consumer(String topic)
   {
-    consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
+    consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
+            createConsumerConfig());
     this.topic = topic;
   }
 
@@ -53,9 +55,9 @@ public class Consumer extends Thread
   public void run() {
     Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
     topicCountMap.put(topic, new Integer(1));
-    Map<String, List<KafkaMessageStream>> consumerMap = consumer.createMessageStreams(topicCountMap);
-    KafkaMessageStream stream =  consumerMap.get(topic).get(0);
-    ConsumerIterator it = stream.iterator();
+    Map<String, List<KafkaMessageStream<Message>>> consumerMap = consumer.createMessageStreams(topicCountMap);
+    KafkaMessageStream<Message> stream =  consumerMap.get(topic).get(0);
+    ConsumerIterator<Message> it = stream.iterator();
     while(it.hasNext())
       System.out.println(ExampleUtils.getMessage(it.next()));
   }

Modified: incubator/kafka/trunk/project/build/KafkaProject.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/project/build/KafkaProject.scala?rev=1176671&r1=1176670&r2=1176671&view=diff
==============================================================================
--- incubator/kafka/trunk/project/build/KafkaProject.scala (original)
+++ incubator/kafka/trunk/project/build/KafkaProject.scala Wed Sep 28 00:42:30 2011
@@ -129,6 +129,8 @@ class KafkaProject(info: ProjectInfo) ex
 
     override def artifactID = "kafka-java-examples"
     override def filterScalaJars = false
+    override def javaCompileOptions = super.javaCompileOptions ++
+      List(JavaCompileOption("-Xlint:unchecked"))
   }
 
   class KafkaPerfProject(info: ProjectInfo) extends DefaultProject(info)



Mime
View raw message