kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1410055 [2/3] - in /incubator/kafka/branches/0.8: contrib/hadoop-consumer/src/main/java/kafka/etl/impl/ contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/ core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/javaapi/consum...
Date Thu, 15 Nov 2012 22:15:26 GMT
Modified: incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestKafkaAppender.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestKafkaAppender.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestKafkaAppender.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestKafkaAppender.scala Thu Nov 15 22:15:14 2012
@@ -44,7 +44,7 @@ object TestKafkaAppender extends Logging
   }
 }
 
-class AppenderStringSerializer extends Encoder[AnyRef] {
-  def toMessage(event: AnyRef):Message = new Message(event.asInstanceOf[String].getBytes)
+class AppenderStringSerializer(encoding: String = "UTF-8") extends Encoder[AnyRef] {
+  def toBytes(event: AnyRef): Array[Byte] = event.toString.getBytes(encoding)
 }
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala Thu Nov 15 22:15:14 2012
@@ -56,13 +56,13 @@ object TestZKConsumerOffsets {
   }
 }
 
-private class ConsumerThread(stream: KafkaStream[Message]) extends Thread {
+private class ConsumerThread(stream: KafkaStream[Array[Byte], Array[Byte]]) extends Thread {
   val shutdownLatch = new CountDownLatch(1)
 
   override def run() {
     println("Starting consumer thread..")
     for (messageAndMetadata <- stream) {
-      println("consumed: " + Utils.readString(messageAndMetadata.message.payload, "UTF-8"))
+      println("consumed: " + new String(messageAndMetadata.message, "UTF-8"))
     }
     shutdownLatch.countDown
     println("thread shutdown !" )

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala Thu Nov 15 22:15:14 2012
@@ -26,10 +26,10 @@ import junit.framework.Assert._
 import kafka.message._
 import kafka.server._
 import kafka.utils.TestUtils._
-import kafka.utils.{TestZKUtils, TestUtils}
+import kafka.utils._
 import kafka.admin.CreateTopicCommand
 import org.junit.Test
-import kafka.serializer.DefaultDecoder
+import kafka.serializer._
 import kafka.cluster.{Broker, Cluster}
 import org.scalatest.junit.JUnit3Suite
 import kafka.integration.KafkaServerTestHarness
@@ -46,13 +46,14 @@ class ConsumerIteratorTest extends JUnit
   val topic = "topic"
   val group = "group1"
   val consumer0 = "consumer0"
+  val consumedOffset = 5
   val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, c.brokerId.toString, "localhost", c.port)))
   val queue = new LinkedBlockingQueue[FetchedDataChunk]
   val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
                                                            c.brokerId,
                                                            0,
                                                            queue,
-                                                           new AtomicLong(5),
+                                                           new AtomicLong(consumedOffset),
                                                            new AtomicLong(0),
                                                            new AtomicInteger(0)))
   val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0))
@@ -65,24 +66,25 @@ class ConsumerIteratorTest extends JUnit
 
   @Test
   def testConsumerIteratorDeduplicationDeepIterator() {
-    val messages = 0.until(10).map(x => new Message((configs(0).brokerId * 5 + x).toString.getBytes)).toList
+    val messageStrings = (0 until 10).map(_.toString).toList
+    val messages = messageStrings.map(s => new Message(s.getBytes))
     val messageSet = new ByteBufferMessageSet(DefaultCompressionCodec, new AtomicLong(0), messages:_*)
 
     topicInfos(0).enqueue(messageSet)
     assertEquals(1, queue.size)
     queue.put(ZookeeperConsumerConnector.shutdownCommand)
 
-    val iter: ConsumerIterator[Message] = new ConsumerIterator[Message](queue, consumerConfig.consumerTimeoutMs,
-                                                                        new DefaultDecoder, false)
-    var receivedMessages: List[Message] = Nil
-    for (i <- 0 until 5) {
-      assertTrue(iter.hasNext)
-      receivedMessages ::= iter.next.message
-    }
+    val iter = new ConsumerIterator[String, String](queue, 
+                                                    consumerConfig.consumerTimeoutMs,
+                                                    new StringDecoder(), 
+                                                    new StringDecoder(),
+                                                    enableShallowIterator = false)
+    var receivedMessages = (0 until 5).map(i => iter.next.message).toList
 
-    assertTrue(!iter.hasNext)
+    assertFalse(iter.hasNext)
     assertEquals(1, queue.size) // This is only the shutdown command.
     assertEquals(5, receivedMessages.size)
-    assertEquals(receivedMessages.sortWith((s,t) => s.checksum < t.checksum), messages.takeRight(5).sortWith((s,t) => s.checksum < t.checksum))
+    val unconsumed = messageSet.filter(_.offset >= consumedOffset).map(m => Utils.readString(m.message.payload))
+    assertEquals(unconsumed, receivedMessages)
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala Thu Nov 15 22:15:14 2012
@@ -25,11 +25,11 @@ import scala.collection._
 import org.scalatest.junit.JUnit3Suite
 import org.apache.log4j.{Level, Logger}
 import kafka.message._
-import kafka.serializer.StringDecoder
+import kafka.serializer._
 import kafka.admin.CreateTopicCommand
 import org.I0Itec.zkclient.ZkClient
 import kafka.utils._
-import kafka.producer.{ProducerConfig, ProducerData, Producer}
+import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
 import java.util.{Collections, Properties}
 import kafka.utils.TestUtils._
 
@@ -73,7 +73,7 @@ class ZookeeperConsumerConnectorTest ext
       override val consumerTimeoutMs = 200
     }
     val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true)
-    val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(Predef.Map(topic -> 1))
+    val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
 
     // no messages to consume, we should hit timeout;
     // also the iterator should support re-entrant, so loop it twice
@@ -90,9 +90,8 @@ class ZookeeperConsumerConnectorTest ext
     zkConsumerConnector0.shutdown
 
     // send some messages to each broker
-    val sentMessages1_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages)
-    val sentMessages1_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
-    val sentMessages1 = (sentMessages1_1 ++ sentMessages1_2).sortWith((s,t) => s.checksum < t.checksum)
+    val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages) ++
+                        sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
 
     // wait to make sure the topic and partition have a leader for the successful case
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
@@ -101,11 +100,10 @@ class ZookeeperConsumerConnectorTest ext
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
-    val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1))
+    val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
 
     val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
-    assertEquals(sentMessages1.size, receivedMessages1.size)
-    assertEquals(sentMessages1, receivedMessages1)
+    assertEquals(sentMessages1.sorted, receivedMessages1.sorted)
 
     // also check partition ownership
     val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
@@ -121,19 +119,16 @@ class ZookeeperConsumerConnectorTest ext
       override val rebalanceBackoffMs = RebalanceBackoffMs
     }
     val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
-    val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Predef.Map(topic -> 1))
+    val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
     // send some messages to each broker
-    val sentMessages2_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages)
-    val sentMessages2_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
-    val sentMessages2 = (sentMessages2_1 ++ sentMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
+    val sentMessages2 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages) ++
+                        sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
 
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
 
-    val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
-    val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
-    val receivedMessages2 = (receivedMessages2_1 ::: receivedMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
-    assertEquals(sentMessages2, receivedMessages2)
+    val receivedMessages2 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages, topicMessageStreams2)
+    assertEquals(sentMessages2.sorted, receivedMessages2.sorted)
 
     // also check partition ownership
     val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir)
@@ -147,18 +142,14 @@ class ZookeeperConsumerConnectorTest ext
     val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
     val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
     // send some messages to each broker
-    val sentMessages3_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages)
-    val sentMessages3_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
-    val sentMessages3 = (sentMessages3_1 ++ sentMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
+    val sentMessages3 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages) ++ 
+                        sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
 
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
 
-    val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
-    val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
-    val receivedMessages3 = (receivedMessages3_1 ::: receivedMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
-    assertEquals(sentMessages3.size, receivedMessages3.size)
-    assertEquals(sentMessages3, receivedMessages3)
+    val receivedMessages3 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages, topicMessageStreams2)
+    assertEquals(sentMessages3.sorted, receivedMessages3.sorted)
 
     // also check partition ownership
     val actual_3 = getZKChildrenValues(dirs.consumerOwnerDir)
@@ -176,9 +167,8 @@ class ZookeeperConsumerConnectorTest ext
     requestHandlerLogger.setLevel(Level.FATAL)
 
     // send some messages to each broker
-    val sentMessages1_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec)
-    val sentMessages1_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
-    val sentMessages1 = (sentMessages1_1 ++ sentMessages1_2).sortWith((s,t) => s.checksum < t.checksum)
+    val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec) ++ 
+                        sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
 
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
@@ -187,10 +177,9 @@ class ZookeeperConsumerConnectorTest ext
     val consumerConfig1 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer1))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
-    val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1))
+    val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
     val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
-    assertEquals(sentMessages1.size, receivedMessages1.size)
-    assertEquals(sentMessages1, receivedMessages1)
+    assertEquals(sentMessages1.sorted, receivedMessages1.sorted)
 
     // also check partition ownership
     val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
@@ -206,19 +195,16 @@ class ZookeeperConsumerConnectorTest ext
       override val rebalanceBackoffMs = RebalanceBackoffMs
     }
     val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
-    val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Predef.Map(topic -> 1))
+    val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
     // send some messages to each broker
-    val sentMessages2_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec)
-    val sentMessages2_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
-    val sentMessages2 = (sentMessages2_1 ++ sentMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
+    val sentMessages2 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec) ++
+                        sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
 
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
 
-    val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
-    val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
-    val receivedMessages2 = (receivedMessages2_1 ::: receivedMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
-    assertEquals(sentMessages2, receivedMessages2)
+    val receivedMessages2 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages, topicMessageStreams2)
+    assertEquals(sentMessages2.sorted, receivedMessages2.sorted)
 
     // also check partition ownership
     val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir)
@@ -230,20 +216,16 @@ class ZookeeperConsumerConnectorTest ext
     val consumerConfig3 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer3))
     val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
-    val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
+    val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int](), new StringDecoder(), new StringDecoder())
     // send some messages to each broker
-    val sentMessages3_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec)
-    val sentMessages3_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
-    val sentMessages3 = (sentMessages3_1 ++ sentMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
+    val sentMessages3 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec) ++
+                        sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
 
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
 
-    val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
-    val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
-    val receivedMessages3 = (receivedMessages3_1 ::: receivedMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
-    assertEquals(sentMessages3.size, receivedMessages3.size)
-    assertEquals(sentMessages3, receivedMessages3)
+    val receivedMessages3 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages, topicMessageStreams2)
+    assertEquals(sentMessages3.sorted, receivedMessages3.sorted)
 
     // also check partition ownership
     val actual_3 = getZKChildrenValues(dirs.consumerOwnerDir)
@@ -258,17 +240,14 @@ class ZookeeperConsumerConnectorTest ext
 
   def testCompressionSetConsumption() {
     // send some messages to each broker
-    val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, 200, DefaultCompressionCodec)
-    val sentMessages2 = sendMessagesToBrokerPartition(configs.last, topic, 1, 200, DefaultCompressionCodec)
-    val sentMessages = (sentMessages1 ++ sentMessages2).sortWith((s,t) => s.checksum < t.checksum)
+    val sentMessages = sendMessagesToBrokerPartition(configs.head, topic, 0, 200, DefaultCompressionCodec) ++ 
+                       sendMessagesToBrokerPartition(configs.last, topic, 1, 200, DefaultCompressionCodec)
 
     val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
-    val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1))
+    val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
     val receivedMessages = getMessages(400, topicMessageStreams1)
-    val sortedReceivedMessages = receivedMessages.sortWith((s,t) => s.checksum < t.checksum)
-    val sortedSentMessages = sentMessages.sortWith((s,t) => s.checksum < t.checksum)
-    assertEquals(sortedSentMessages, sortedReceivedMessages)
+    assertEquals(sentMessages.sorted, receivedMessages.sorted)
 
     // also check partition ownership
     val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir)
@@ -284,10 +263,8 @@ class ZookeeperConsumerConnectorTest ext
     requestHandlerLogger.setLevel(Level.FATAL)
 
     // send some messages to each broker
-    val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, NoCompressionCodec)
-    val sentMessages2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, NoCompressionCodec)
-    val sentMessages = (sentMessages1 ++ sentMessages2).map(m => Utils.readString(m.payload, "UTF-8")).
-      sortWith((s, t) => s.compare(t) == -1)
+    val sentMessages = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, NoCompressionCodec) ++ 
+                       sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, NoCompressionCodec)
 
     val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
 
@@ -297,8 +274,7 @@ class ZookeeperConsumerConnectorTest ext
     val zkConsumerConnector =
       new ZookeeperConsumerConnector(consumerConfig, true)
     val topicMessageStreams =
-      zkConsumerConnector.createMessageStreams(Predef.Map(topic -> 1), new StringDecoder)
-
+      zkConsumerConnector.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
 
     var receivedMessages: List[String] = Nil
     for ((topic, messageStreams) <- topicMessageStreams) {
@@ -312,8 +288,7 @@ class ZookeeperConsumerConnectorTest ext
         }
       }
     }
-    receivedMessages = receivedMessages.sortWith((s, t) => s.compare(t) == -1)
-    assertEquals(sentMessages, receivedMessages)
+    assertEquals(sentMessages.sorted, receivedMessages.sorted)
 
     zkConsumerConnector.shutdown()
     requestHandlerLogger.setLevel(Level.ERROR)
@@ -331,7 +306,7 @@ class ZookeeperConsumerConnectorTest ext
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
-    val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1))
+    val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
     val topicRegistry = zkConsumerConnector1.getTopicRegistry
     assertEquals(1, topicRegistry.map(r => r._1).size)
     assertEquals(topic, topicRegistry.map(r => r._1).head)
@@ -346,54 +321,61 @@ class ZookeeperConsumerConnectorTest ext
     assertEquals(expected_1, actual_1)
 
     val receivedMessages1 = getMessages(nMessages, topicMessageStreams1)
-    assertEquals(nMessages, receivedMessages1.size)
-    assertEquals(sentMessages1.sortWith((s,t) => s.checksum < t.checksum), receivedMessages1)
+    assertEquals(sentMessages1, receivedMessages1)
   }
 
-  def sendMessagesToBrokerPartition(config: KafkaConfig, topic: String, partition: Int, numMessages: Int, compression: CompressionCodec = NoCompressionCodec): List[Message] = {
+  def sendMessagesToBrokerPartition(config: KafkaConfig, 
+                                    topic: String, 
+                                    partition: Int, 
+                                    numMessages: Int, 
+                                    compression: CompressionCodec = NoCompressionCodec): List[String] = {
     val header = "test-%d-%d".format(config.brokerId, partition)
     val props = new Properties()
     props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
     props.put("partitioner.class", "kafka.utils.FixedValuePartitioner")
     props.put("compression.codec", compression.codec.toString)
-    val producer: Producer[Int, Message] = new Producer[Int, Message](new ProducerConfig(props))
-    val ms = 0.until(numMessages).map(x =>
-      new Message((header + config.brokerId + "-" + partition + "-" + x).getBytes)).toArray
-    producer.send(new ProducerData[Int, Message](topic, partition, ms))
+    props.put("key.serializer.class", classOf[IntEncoder].getName.toString)
+    props.put("serializer.class", classOf[StringEncoder].getName.toString)
+    val producer: Producer[Int, String] = new Producer[Int, String](new ProducerConfig(props))
+    val ms = 0.until(numMessages).map(x => header + config.brokerId + "-" + partition + "-" + x)
+    producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*)
     debug("Sent %d messages to broker %d for topic %s and partition %d".format(ms.size, config.brokerId, topic, partition))
-    producer
+    producer.close()
     ms.toList
   }
 
-  def sendMessages(config: KafkaConfig, messagesPerNode: Int, header: String, compression: CompressionCodec, numParts: Int): List[Message]= {
-    var messages: List[Message] = Nil
+  def sendMessages(config: KafkaConfig, 
+                   messagesPerNode: Int, 
+                   header: String, 
+                   compression: CompressionCodec, 
+                   numParts: Int): List[String]= {
+    var messages: List[String] = Nil
     val props = new Properties()
     props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
     props.put("partitioner.class", "kafka.utils.FixedValuePartitioner")
-    val producer: Producer[Int, Message] = new Producer[Int, Message](new ProducerConfig(props))
-
+    props.put("key.serializer.class", classOf[IntEncoder].getName.toString)
+    props.put("serializer.class", classOf[StringEncoder].getName)
+    val producer: Producer[Int, String] = new Producer[Int, String](new ProducerConfig(props))
     for (partition <- 0 until numParts) {
-      val ms = 0.until(messagesPerNode).map(x =>
-        new Message((header + config.brokerId + "-" + partition + "-" + x).getBytes)).toArray
-      for (message <- ms)
-        messages ::= message
-      producer.send(new ProducerData[Int, Message](topic, partition, ms))
+      val ms = 0.until(messagesPerNode).map(x => header + config.brokerId + "-" + partition + "-" + x)
+      producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*)
+      messages ++= ms
       debug("Sent %d messages to broker %d for topic %s and partition %d".format(ms.size, config.brokerId, topic, partition))
     }
     producer.close()
-    messages.reverse
+    messages
   }
 
-  def sendMessages(messagesPerNode: Int, header: String, compression: CompressionCodec = NoCompressionCodec): List[Message]= {
-    var messages: List[Message] = Nil
-    for(conf <- configs) {
+  def sendMessages(messagesPerNode: Int, header: String, compression: CompressionCodec = NoCompressionCodec): List[String]= {
+    var messages: List[String] = Nil
+    for(conf <- configs)
       messages ++= sendMessages(conf, messagesPerNode, header, compression, numParts)
-    }
-    messages.sortWith((s,t) => s.checksum < t.checksum)
+    messages
   }
 
-  def getMessages(nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaStream[Message]]]): List[Message]= {
-    var messages: List[Message] = Nil
+  def getMessages(nMessagesPerThread: Int, 
+                  topicMessageStreams: Map[String,List[KafkaStream[String, String]]]): List[String]= {
+    var messages: List[String] = Nil
     for((topic, messageStreams) <- topicMessageStreams) {
       for (messageStream <- messageStreams) {
         val iterator = messageStream.iterator
@@ -401,11 +383,11 @@ class ZookeeperConsumerConnectorTest ext
           assertTrue(iterator.hasNext)
           val message = iterator.next.message
           messages ::= message
-          debug("received message: " + Utils.readString(message.payload, "UTF-8"))
+          debug("received message: " + message)
         }
       }
     }
-    messages.sortWith((s,t) => s.checksum < t.checksum)
+    messages.reverse
   }
 
   def getZKChildrenValues(path : String) : Seq[Tuple2[String,String]] = {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala Thu Nov 15 22:15:14 2012
@@ -25,7 +25,8 @@ import org.apache.log4j.{Level, Logger}
 import org.scalatest.junit.JUnit3Suite
 import kafka.utils.TestUtils
 import kafka.message.Message
-import kafka.producer.{Producer, ProducerData}
+import kafka.serializer._
+import kafka.producer.{Producer, KeyedMessage}
 
 class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
 
@@ -69,10 +70,11 @@ class AutoOffsetResetTest extends JUnit3
    * Returns the count of messages received.
    */
   def resetAndConsume(numMessages: Int, resetTo: String, offset: Long): Int = {
-    val producer: Producer[String, Message] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs))
+    val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs), 
+        new DefaultEncoder(), new StringEncoder())
 
     for(i <- 0 until numMessages)
-      producer.send(new ProducerData[String, Message](topic, topic, new Message("test".getBytes())))
+      producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, "test".getBytes))
 
     // update offset in zookeeper for consumer to jump "forward" in time
     val dirs = new ZKGroupTopicDirs(group, topic)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala Thu Nov 15 22:15:14 2012
@@ -27,7 +27,8 @@ import kafka.message._
 import kafka.server._
 import org.scalatest.junit.JUnit3Suite
 import kafka.consumer._
-import kafka.producer.{ProducerData, Producer}
+import kafka.serializer._
+import kafka.producer.{KeyedMessage, Producer}
 import kafka.utils.TestUtils._
 import kafka.utils.TestUtils
 import kafka.admin.CreateTopicCommand
@@ -38,7 +39,7 @@ class FetcherTest extends JUnit3Suite wi
   val configs =
     for(props <- TestUtils.createBrokerConfigs(numNodes))
     yield new KafkaConfig(props)
-  val messages = new mutable.HashMap[Int, Seq[Message]]
+  val messages = new mutable.HashMap[Int, Seq[Array[Byte]]]
   val topic = "topic"
   val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, c.brokerId.toString, "localhost", c.port)))
   val shutdown = ZookeeperConsumerConnector.shutdownCommand
@@ -83,10 +84,10 @@ class FetcherTest extends JUnit3Suite wi
   def sendMessages(messagesPerNode: Int): Int = {
     var count = 0
     for(conf <- configs) {
-      val producer: Producer[String, Message] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs))
-      val ms = 0.until(messagesPerNode).map(x => new Message((conf.brokerId * 5 + x).toString.getBytes)).toArray
+      val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs), new DefaultEncoder(), new StringEncoder())
+      val ms = 0.until(messagesPerNode).map(x => (conf.brokerId * 5 + x).toString.getBytes).toArray
       messages += conf.brokerId -> ms
-      producer.send(new ProducerData[String, Message](topic, topic, ms))
+      producer.send(ms.map(m => KeyedMessage[String, Array[Byte]](topic, topic, m)):_*)
       producer.close()
       count += ms.size
     }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala Thu Nov 15 22:15:14 2012
@@ -21,10 +21,11 @@ import kafka.api.FetchRequestBuilder
 import kafka.message.{Message, ByteBufferMessageSet}
 import kafka.server.{KafkaRequestHandler, KafkaConfig}
 import org.apache.log4j.{Level, Logger}
+import org.junit.Assert._
 import org.scalatest.junit.JUnit3Suite
 import scala.collection._
-import kafka.producer.ProducerData
-import kafka.utils.TestUtils
+import kafka.producer.KeyedMessage
+import kafka.utils._
 import kafka.common.{ErrorMapping, KafkaException, OffsetOutOfRangeException}
 
 /**
@@ -57,17 +58,17 @@ class LazyInitProducerTest extends JUnit
   def testProduceAndFetch() {
     // send some messages
     val topic = "test"
-    val sentMessages = List(new Message("hello".getBytes()), new Message("there".getBytes()))
-    val producerData = new ProducerData[String, Message](topic, topic, sentMessages)
+    val sentMessages = List("hello", "there")
+    val producerData = sentMessages.map(m => new KeyedMessage[String, String](topic, topic, m))
 
-    producer.send(producerData)
+    producer.send(producerData:_*)
 
     var fetchedMessage: ByteBufferMessageSet = null
     while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
       val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
       fetchedMessage = fetched.messageSet(topic, 0)
     }
-    TestUtils.checkEquals(sentMessages.iterator, fetchedMessage.map(m => m.message).iterator)
+    assertEquals(sentMessages, fetchedMessage.map(m => Utils.readString(m.message.payload)).toList)
 
     // send an invalid offset
     try {
@@ -83,12 +84,12 @@ class LazyInitProducerTest extends JUnit
     // send some messages, with non-ordered topics
     val topicOffsets = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
     {
-      val messages = new mutable.HashMap[String, Seq[Message]]
+      val messages = new mutable.HashMap[String, Seq[String]]
       val builder = new FetchRequestBuilder()
       for( (topic, offset) <- topicOffsets) {
-        val producedData = List(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
+        val producedData = List("a_" + topic, "b_" + topic)
         messages += topic -> producedData
-        producer.send(new ProducerData[String, Message](topic, topic, producedData))
+        producer.send(producedData.map(m => new KeyedMessage[String, String](topic, topic, m)):_*)
         builder.addFetch(topic, offset, 0, 10000)
       }
 
@@ -97,7 +98,7 @@ class LazyInitProducerTest extends JUnit
       val response = consumer.fetch(request)
       for( (topic, offset) <- topicOffsets) {
         val fetched = response.messageSet(topic, offset)
-        TestUtils.checkEquals(messages(topic).iterator, fetched.map(m => m.message).iterator)
+        assertEquals(messages(topic), fetched.map(m => Utils.readString(m.message.payload)))
       }
     }
 
@@ -121,13 +122,13 @@ class LazyInitProducerTest extends JUnit
   def testMultiProduce() {
     // send some messages
     val topics = List("test1", "test2", "test3");
-    val messages = new mutable.HashMap[String, Seq[Message]]
+    val messages = new mutable.HashMap[String, Seq[String]]
     val builder = new FetchRequestBuilder()
-    var produceList: List[ProducerData[String, Message]] = Nil
+    var produceList: List[KeyedMessage[String, String]] = Nil
     for(topic <- topics) {
-      val set = List(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
+      val set = List("a_" + topic, "b_" + topic)
       messages += topic -> set
-      produceList ::= new ProducerData[String, Message](topic, topic, set)
+      produceList ++= set.map(new KeyedMessage[String, String](topic, topic, _))
       builder.addFetch(topic, 0, 0, 10000)
     }
     producer.send(produceList: _*)
@@ -137,20 +138,20 @@ class LazyInitProducerTest extends JUnit
     val response = consumer.fetch(request)
     for(topic <- topics) {
       val fetched = response.messageSet(topic, 0)
-      TestUtils.checkEquals(messages(topic).iterator, fetched.map(m => m.message).iterator)
+      assertEquals(messages(topic), fetched.map(m => Utils.readString(m.message.payload)))
     }
   }
 
   def testMultiProduceResend() {
     // send some messages
     val topics = List("test1", "test2", "test3");
-    val messages = new mutable.HashMap[String, Seq[Message]]
+    val messages = new mutable.HashMap[String, Seq[String]]
     val builder = new FetchRequestBuilder()
-    var produceList: List[ProducerData[String, Message]] = Nil
+    var produceList: List[KeyedMessage[String, String]] = Nil
     for(topic <- topics) {
-      val set = List(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
+      val set = List("a_" + topic, "b_" + topic)
       messages += topic -> set
-      produceList ::= new ProducerData[String, Message](topic, topic, set)
+      produceList ++= set.map(new KeyedMessage[String, String](topic, topic, _))
       builder.addFetch(topic, 0, 0, 10000)
     }
     producer.send(produceList: _*)
@@ -161,9 +162,7 @@ class LazyInitProducerTest extends JUnit
     val response = consumer.fetch(request)
     for(topic <- topics) {
       val topicMessages = response.messageSet(topic, 0)
-      TestUtils.checkEquals(TestUtils.stackedIterator(messages(topic).iterator,
-                                                      messages(topic).iterator),
-                            topicMessages.iterator.map(_.message))
+      assertEquals(messages(topic) ++ messages(topic), topicMessages.map(m => Utils.readString(m.message.payload)))
     }
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala Thu Nov 15 22:15:14 2012
@@ -22,8 +22,9 @@ import junit.framework.Assert._
 import kafka.api.{PartitionFetchInfo, FetchRequest, FetchRequestBuilder}
 import kafka.server.{KafkaRequestHandler, KafkaConfig}
 import java.util.Properties
-import kafka.producer.{ProducerData, Producer, ProducerConfig}
-import kafka.serializer.StringDecoder
+import kafka.utils.Utils
+import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
+import kafka.serializer._
 import kafka.message.Message
 import kafka.utils.TestUtils
 import org.apache.log4j.{Level, Logger}
@@ -91,7 +92,7 @@ class PrimitiveApiTest extends JUnit3Sui
     val config = new ProducerConfig(props)
 
     val stringProducer1 = new Producer[String, String](config)
-    stringProducer1.send(new ProducerData[String, String](topic, Array("test-message")))
+    stringProducer1.send(new KeyedMessage[String, String](topic, "test-message"))
 
     val replica = servers.head.replicaManager.getReplica(topic, 0).get
     assertTrue("HighWatermark should equal logEndOffset with just 1 replica",
@@ -108,30 +109,26 @@ class PrimitiveApiTest extends JUnit3Sui
     assertTrue(messageSet.iterator.hasNext)
 
     val fetchedMessageAndOffset = messageSet.head
-    val stringDecoder = new StringDecoder
-    val fetchedStringMessage = stringDecoder.toEvent(fetchedMessageAndOffset.message)
-    assertEquals("test-message", fetchedStringMessage)
+    assertEquals("test-message", Utils.readString(fetchedMessageAndOffset.message.payload, "UTF-8"))
   }
 
   def testDefaultEncoderProducerAndFetchWithCompression() {
     val topic = "test-topic"
     val props = new Properties()
-    props.put("serializer.class", "kafka.serializer.StringEncoder")
+    props.put("serializer.class", classOf[StringEncoder].getName.toString)
     props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
     props.put("compression", "true")
     val config = new ProducerConfig(props)
 
     val stringProducer1 = new Producer[String, String](config)
-    stringProducer1.send(new ProducerData[String, String](topic, Array("test-message")))
+    stringProducer1.send(new KeyedMessage[String, String](topic, "test-message"))
 
     var fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
     val messageSet = fetched.messageSet(topic, 0)
     assertTrue(messageSet.iterator.hasNext)
 
     val fetchedMessageAndOffset = messageSet.head
-    val stringDecoder = new StringDecoder
-    val fetchedStringMessage = stringDecoder.toEvent(fetchedMessageAndOffset.message)
-    assertEquals("test-message", fetchedStringMessage)
+    assertEquals("test-message", Utils.readString(fetchedMessageAndOffset.message.payload, "UTF-8"))
   }
 
   def testProduceAndMultiFetch() {
@@ -140,22 +137,21 @@ class PrimitiveApiTest extends JUnit3Sui
     // send some messages
     val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
     {
-      val messages = new mutable.HashMap[String, Seq[Message]]
+      val messages = new mutable.HashMap[String, Seq[String]]
       val builder = new FetchRequestBuilder()
       for( (topic, partition) <- topics) {
-        val messageList = List(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
-        val producerData = new ProducerData[String, Message](topic, topic, messageList)
+        val messageList = List("a_" + topic, "b_" + topic)
+        val producerData = messageList.map(new KeyedMessage[String, String](topic, topic, _))
         messages += topic -> messageList
-        producer.send(producerData)
+        producer.send(producerData:_*)
         builder.addFetch(topic, partition, 0, 10000)
-    }
+      }
 
-      // wait a bit for produced message to be available
       val request = builder.build()
       val response = consumer.fetch(request)
-      for( (topic, partition) <- topics) {
+      for((topic, partition) <- topics) {
         val fetched = response.messageSet(topic, partition)
-        TestUtils.checkEquals(messages(topic).iterator, fetched.map(messageAndOffset => messageAndOffset.message).iterator)
+        assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload)))
       }
     }
 
@@ -204,13 +200,13 @@ class PrimitiveApiTest extends JUnit3Sui
     // send some messages
     val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
     {
-      val messages = new mutable.HashMap[String, Seq[Message]]
+      val messages = new mutable.HashMap[String, Seq[String]]
       val builder = new FetchRequestBuilder()
       for( (topic, partition) <- topics) {
-        val messageList = List(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
-        val producerData = new ProducerData[String, Message](topic, topic, messageList)
+        val messageList = List("a_" + topic, "b_" + topic)
+        val producerData = messageList.map(new KeyedMessage[String, String](topic, topic, _))
         messages += topic -> messageList
-        producer.send(producerData)
+        producer.send(producerData:_*)
         builder.addFetch(topic, partition, 0, 10000)
       }
 
@@ -219,7 +215,7 @@ class PrimitiveApiTest extends JUnit3Sui
       val response = consumer.fetch(request)
       for( (topic, partition) <- topics) {
         val fetched = response.messageSet(topic, partition)
-        TestUtils.checkEquals(messages(topic).iterator, fetched.map(messageAndOffset => messageAndOffset.message).iterator)
+        assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload)))
       }
     }
 
@@ -267,14 +263,14 @@ class PrimitiveApiTest extends JUnit3Sui
 
     // send some messages
     val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
-    val messages = new mutable.HashMap[String, Seq[Message]]
+    val messages = new mutable.HashMap[String, Seq[String]]
     val builder = new FetchRequestBuilder()
-    var produceList: List[ProducerData[String, Message]] = Nil
+    var produceList: List[KeyedMessage[String, String]] = Nil
     for( (topic, partition) <- topics) {
-      val messageList = List(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
-      val producerData = new ProducerData[String, Message](topic, topic, messageList)
+      val messageList = List("a_" + topic, "b_" + topic)
+      val producerData = messageList.map(new KeyedMessage[String, String](topic, topic, _))
       messages += topic -> messageList
-      producer.send(producerData)
+      producer.send(producerData:_*)
       builder.addFetch(topic, partition, 0, 10000)
     }
     producer.send(produceList: _*)
@@ -284,21 +280,21 @@ class PrimitiveApiTest extends JUnit3Sui
     val response = consumer.fetch(request)
     for( (topic, partition) <- topics) {
       val fetched = response.messageSet(topic, partition)
-      TestUtils.checkEquals(messages(topic).iterator, fetched.map(messageAndOffset => messageAndOffset.message).iterator)
+      assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload)))
     }
   }
 
   def testMultiProduceWithCompression() {
     // send some messages
     val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
-    val messages = new mutable.HashMap[String, Seq[Message]]
+    val messages = new mutable.HashMap[String, Seq[String]]
     val builder = new FetchRequestBuilder()
-    var produceList: List[ProducerData[String, Message]] = Nil
+    var produceList: List[KeyedMessage[String, String]] = Nil
     for( (topic, partition) <- topics) {
-      val messageList = List(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
-      val producerData = new ProducerData[String, Message](topic, topic, messageList)
+      val messageList = List("a_" + topic, "b_" + topic)
+      val producerData = messageList.map(new KeyedMessage[String, String](topic, topic, _))
       messages += topic -> messageList
-      producer.send(producerData)
+      producer.send(producerData:_*)
       builder.addFetch(topic, partition, 0, 10000)
     }
     producer.send(produceList: _*)
@@ -308,7 +304,7 @@ class PrimitiveApiTest extends JUnit3Sui
     val response = consumer.fetch(request)
     for( (topic, partition) <- topics) {
       val fetched = response.messageSet(topic, 0)
-      TestUtils.checkEquals(messages(topic).iterator, fetched.map(messageAndOffset => messageAndOffset.message).iterator)
+      assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload)))
     }
   }
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala Thu Nov 15 22:15:14 2012
@@ -23,11 +23,12 @@ import java.util.Properties
 import kafka.producer.{ProducerConfig, Producer}
 import kafka.message.Message
 import kafka.utils.TestUtils
+import kafka.serializer._
 
 trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarness {
     val port: Int
     val host = "localhost"
-    var producer: Producer[String, Message] = null
+    var producer: Producer[String, String] = null
     var consumer: SimpleConsumer = null
 
   override def setUp() {
@@ -41,6 +42,7 @@ trait ProducerConsumerTestHarness extend
       props.put("producer.retry.backoff.ms", "1000")
       props.put("producer.num.retries", "3")
       props.put("producer.request.required.acks", "-1")
+      props.put("serializer.class", classOf[StringEncoder].getName.toString)
       producer = new Producer(new ProducerConfig(props))
       consumer = new SimpleConsumer(host,
                                    port,

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala Thu Nov 15 22:15:14 2012
@@ -24,7 +24,10 @@ import org.scalatest.junit.JUnit3Suite
 import scala.collection.JavaConversions._
 import org.apache.log4j.{Level, Logger}
 import kafka.message._
-import kafka.javaapi.producer.{ProducerData, Producer}
+import kafka.serializer._
+import kafka.producer.KeyedMessage
+import kafka.javaapi.producer.Producer
+import kafka.utils.IntEncoder
 import kafka.utils.TestUtils._
 import kafka.utils.{Utils, Logging, TestUtils}
 import kafka.consumer.{KafkaStream, ConsumerConfig}
@@ -60,43 +63,46 @@ class ZookeeperConsumerConnectorTest ext
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zookeeperConnect, group, consumer1))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
-    val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(toJavaMap(Predef.Map(topic -> numNodes*numParts/2)))
+    val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(toJavaMap(Map(topic -> numNodes*numParts/2)), new StringDecoder(), new StringDecoder())
 
     val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
-    assertEquals(sentMessages1, receivedMessages1)
+    assertEquals(sentMessages1.sorted, receivedMessages1.sorted)
 
     zkConsumerConnector1.shutdown
     info("all consumer connectors stopped")
     requestHandlerLogger.setLevel(Level.ERROR)
   }
 
-  def sendMessages(conf: KafkaConfig, messagesPerNode: Int, header: String, compressed: CompressionCodec): List[Message]= {
-    var messages: List[Message] = Nil
-    val producer: kafka.producer.Producer[Int, Message] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs))
-    val javaProducer: Producer[Int, Message] = new kafka.javaapi.producer.Producer(producer)
+  def sendMessages(conf: KafkaConfig, 
+                   messagesPerNode: Int, 
+                   header: String, 
+                   compressed: CompressionCodec): List[String] = {
+    var messages: List[String] = Nil
+    val producer: kafka.producer.Producer[Int, String] = 
+      TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs), new StringEncoder(), new IntEncoder())
+    val javaProducer: Producer[Int, String] = new kafka.javaapi.producer.Producer(producer)
     for (partition <- 0 until numParts) {
-      val ms = 0.until(messagesPerNode).map(x =>
-        new Message((header + conf.brokerId + "-" + partition + "-" + x).getBytes)).toArray
-      for (message <- ms)
-        messages ::= message
+      val ms = 0.until(messagesPerNode).map(x => header + conf.brokerId + "-" + partition + "-" + x)
+      messages ++= ms
       import scala.collection.JavaConversions._
-      javaProducer.send(new ProducerData[Int, Message](topic, partition, asList(ms)))
+      javaProducer.send(asList(ms.map(new KeyedMessage[Int, String](topic, partition, _))))
     }
     javaProducer.close
     messages
   }
 
-  def sendMessages(messagesPerNode: Int, header: String, compressed: CompressionCodec = NoCompressionCodec): List[Message]= {
-    var messages: List[Message] = Nil
-    for(conf <- configs) {
+  def sendMessages(messagesPerNode: Int, 
+                   header: String, 
+                   compressed: CompressionCodec = NoCompressionCodec): List[String] = {
+    var messages: List[String] = Nil
+    for(conf <- configs)
       messages ++= sendMessages(conf, messagesPerNode, header, compressed)
-    }
-    messages.sortWith((s,t) => s.checksum < t.checksum)
+    messages
   }
 
-  def getMessages(nMessagesPerThread: Int, jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaStream[Message]]])
-  : List[Message]= {
-    var messages: List[Message] = Nil
+  def getMessages(nMessagesPerThread: Int, 
+                  jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaStream[String, String]]]): List[String] = {
+    var messages: List[String] = Nil
     val topicMessageStreams = asMap(jTopicMessageStreams)
     for ((topic, messageStreams) <- topicMessageStreams) {
       for (messageStream <- messageStreams) {
@@ -105,11 +111,11 @@ class ZookeeperConsumerConnectorTest ext
           assertTrue(iterator.hasNext)
           val message = iterator.next.message
           messages ::= message
-          debug("received message: " + Utils.readString(message.payload, "UTF-8"))
+          debug("received message: " + message)
         }
       }
     }
-    messages.sortWith((s,t) => s.checksum < t.checksum)
+    messages
   }
 
   private def toJavaMap(scalaMap: Map[String, Int]): java.util.Map[String, java.lang.Integer] = {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala Thu Nov 15 22:15:14 2012
@@ -169,10 +169,9 @@ class KafkaLog4jAppenderTest extends JUn
   }
 }
 
-class AppenderStringEncoder extends Encoder[LoggingEvent] {
-  def toMessage(event: LoggingEvent):Message = {
-    val logMessage = event.getMessage
-    new Message(logMessage.asInstanceOf[String].getBytes)
+class AppenderStringEncoder(encoding: String = "UTF-8") extends Encoder[LoggingEvent] {
+  def toBytes(event: LoggingEvent): Array[Byte] = {
+    event.getMessage.toString.getBytes(encoding)
   }
 }
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala Thu Nov 15 22:15:14 2012
@@ -25,14 +25,14 @@ import org.junit.Test
 import kafka.api._
 import kafka.cluster.Broker
 import kafka.common._
-import kafka.message.Message
+import kafka.message._
 import kafka.producer.async._
-import kafka.serializer.{StringEncoder, StringDecoder, Encoder}
+import kafka.serializer._
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils._
 import org.scalatest.junit.JUnit3Suite
 import scala.collection.Map
-import scala.collection.mutable.ListBuffer
+import scala.collection.mutable.ArrayBuffer
 import kafka.utils._
 
 class AsyncProducerTest extends JUnit3Suite {
@@ -52,7 +52,7 @@ class AsyncProducerTest extends JUnit3Su
     // a mock event handler that blocks
     val mockEventHandler = new EventHandler[String,String] {
 
-      def handle(events: Seq[ProducerData[String,String]]) {
+      def handle(events: Seq[KeyedMessage[String,String]]) {
         Thread.sleep(500)
       }
 
@@ -116,7 +116,7 @@ class AsyncProducerTest extends JUnit3Su
     EasyMock.expectLastCall
     EasyMock.replay(mockHandler)
 
-    val queue = new LinkedBlockingQueue[ProducerData[String,String]](10)
+    val queue = new LinkedBlockingQueue[KeyedMessage[String,String]](10)
     val producerSendThread =
       new ProducerSendThread[String,String]("thread1", queue, mockHandler, Integer.MAX_VALUE, 5)
     producerSendThread.start()
@@ -141,7 +141,7 @@ class AsyncProducerTest extends JUnit3Su
     EasyMock.replay(mockHandler)
 
     val queueExpirationTime = 200
-    val queue = new LinkedBlockingQueue[ProducerData[String,String]](10)
+    val queue = new LinkedBlockingQueue[KeyedMessage[String,String]](10)
     val producerSendThread =
       new ProducerSendThread[String,String]("thread1", queue, mockHandler, queueExpirationTime, 5)
     producerSendThread.start()
@@ -156,12 +156,12 @@ class AsyncProducerTest extends JUnit3Su
 
   @Test
   def testPartitionAndCollateEvents() {
-    val producerDataList = new ListBuffer[ProducerData[Int,Message]]
-    producerDataList.append(new ProducerData[Int,Message]("topic1", 0, new Message("msg1".getBytes)))
-    producerDataList.append(new ProducerData[Int,Message]("topic2", 1, new Message("msg2".getBytes)))
-    producerDataList.append(new ProducerData[Int,Message]("topic1", 2, new Message("msg3".getBytes)))
-    producerDataList.append(new ProducerData[Int,Message]("topic1", 3, new Message("msg4".getBytes)))
-    producerDataList.append(new ProducerData[Int,Message]("topic2", 4, new Message("msg5".getBytes)))
+    val producerDataList = new ArrayBuffer[KeyedMessage[Int,Message]]
+    producerDataList.append(new KeyedMessage[Int,Message]("topic1", 0, new Message("msg1".getBytes)))
+    producerDataList.append(new KeyedMessage[Int,Message]("topic2", 1, new Message("msg2".getBytes)))
+    producerDataList.append(new KeyedMessage[Int,Message]("topic1", 2, new Message("msg3".getBytes)))
+    producerDataList.append(new KeyedMessage[Int,Message]("topic1", 3, new Message("msg4".getBytes)))
+    producerDataList.append(new KeyedMessage[Int,Message]("topic2", 4, new Message("msg5".getBytes)))
 
     val props = new Properties()
     props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
@@ -185,20 +185,18 @@ class AsyncProducerTest extends JUnit3Su
 
     val producerPool = new ProducerPool(config)
     val handler = new DefaultEventHandler[Int,String](config,
-                                                      partitioner = intPartitioner,
-                                                      encoder = null.asInstanceOf[Encoder[String]],
-                                                      producerPool = producerPool,
-                                                      topicPartitionInfos)
+                                                         partitioner = intPartitioner,
+                                                         encoder = null.asInstanceOf[Encoder[String]],
+                                                         keyEncoder = new IntEncoder(),
+                                                         producerPool = producerPool,
+                                                         topicPartitionInfos)
 
-    val topic1Broker1Data = new ListBuffer[ProducerData[Int,Message]]
-    topic1Broker1Data.appendAll(List(new ProducerData[Int,Message]("topic1", 0, new Message("msg1".getBytes)),
-                                     new ProducerData[Int,Message]("topic1", 2, new Message("msg3".getBytes))))
-    val topic1Broker2Data = new ListBuffer[ProducerData[Int,Message]]
-    topic1Broker2Data.appendAll(List(new ProducerData[Int,Message]("topic1", 3, new Message("msg4".getBytes))))
-    val topic2Broker1Data = new ListBuffer[ProducerData[Int,Message]]
-    topic2Broker1Data.appendAll(List(new ProducerData[Int,Message]("topic2", 4, new Message("msg5".getBytes))))
-    val topic2Broker2Data = new ListBuffer[ProducerData[Int,Message]]
-    topic2Broker2Data.appendAll(List(new ProducerData[Int,Message]("topic2", 1, new Message("msg2".getBytes))))
+    val topic1Broker1Data = 
+      ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic1", 0, new Message("msg1".getBytes)),
+                                             new KeyedMessage[Int,Message]("topic1", 2, new Message("msg3".getBytes)))
+    val topic1Broker2Data = ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic1", 3, new Message("msg4".getBytes)))
+    val topic2Broker1Data = ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic2", 4, new Message("msg5".getBytes)))
+    val topic2Broker2Data = ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic2", 1, new Message("msg2".getBytes)))
     val expectedResult = Some(Map(
         0 -> Map(
               TopicAndPartition("topic1", 0) -> topic1Broker1Data,
@@ -214,7 +212,7 @@ class AsyncProducerTest extends JUnit3Su
 
   @Test
   def testSerializeEvents() {
-    val produceData = TestUtils.getMsgStrings(5).map(m => new ProducerData[String,String]("topic1",m))
+    val produceData = TestUtils.getMsgStrings(5).map(m => new KeyedMessage[String,String]("topic1",m))
     val props = new Properties()
     props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
     val config = new ProducerConfig(props)
@@ -228,20 +226,20 @@ class AsyncProducerTest extends JUnit3Su
     val handler = new DefaultEventHandler[String,String](config,
                                                          partitioner = null.asInstanceOf[Partitioner[String]],
                                                          encoder = new StringEncoder,
+                                                         keyEncoder = new StringEncoder,
                                                          producerPool = producerPool,
                                                          topicPartitionInfos
     )
 
     val serializedData = handler.serialize(produceData)
-    val decoder = new StringDecoder
-    val deserializedData = serializedData.map(d => new ProducerData[String,String](d.getTopic, d.getData.map(m => decoder.toEvent(m))))
+    val deserializedData = serializedData.map(d => new KeyedMessage[String,String](d.topic, Utils.readString(d.message.payload)))
     TestUtils.checkEquals(produceData.iterator, deserializedData.iterator)
   }
 
   @Test
   def testInvalidPartition() {
-    val producerDataList = new ListBuffer[ProducerData[String,Message]]
-    producerDataList.append(new ProducerData[String,Message]("topic1", "key1", new Message("msg1".getBytes)))
+    val producerDataList = new ArrayBuffer[KeyedMessage[String,Message]]
+    producerDataList.append(new KeyedMessage[String,Message]("topic1", "key1", new Message("msg1".getBytes)))
     val props = new Properties()
     props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
     val config = new ProducerConfig(props)
@@ -257,6 +255,7 @@ class AsyncProducerTest extends JUnit3Su
     val handler = new DefaultEventHandler[String,String](config,
                                                          partitioner = new NegativePartitioner,
                                                          encoder = null.asInstanceOf[Encoder[String]],
+                                                         keyEncoder = null.asInstanceOf[Encoder[String]],
                                                          producerPool = producerPool,
                                                          topicPartitionInfos)
     try {
@@ -282,11 +281,12 @@ class AsyncProducerTest extends JUnit3Su
 
     val producerPool = new ProducerPool(config)
 
-    val producerDataList = new ListBuffer[ProducerData[String,String]]
-    producerDataList.append(new ProducerData[String,String]("topic1", "msg1"))
+    val producerDataList = new ArrayBuffer[KeyedMessage[String,String]]
+    producerDataList.append(new KeyedMessage[String,String]("topic1", "msg1"))
     val handler = new DefaultEventHandler[String,String](config,
                                                          partitioner = null.asInstanceOf[Partitioner[String]],
                                                          encoder = new StringEncoder,
+                                                         keyEncoder = new StringEncoder,
                                                          producerPool = producerPool,
                                                          topicPartitionInfos)
     try {
@@ -333,12 +333,13 @@ class AsyncProducerTest extends JUnit3Su
     val handler = new DefaultEventHandler[String,String](config,
                                                          partitioner = null.asInstanceOf[Partitioner[String]],
                                                          encoder = null.asInstanceOf[Encoder[String]],
+                                                         keyEncoder = null.asInstanceOf[Encoder[String]],
                                                          producerPool = producerPool,
                                                          topicPartitionInfos)
-    val producerDataList = new ListBuffer[ProducerData[String,Message]]
-    producerDataList.append(new ProducerData[String,Message]("topic1", new Message("msg1".getBytes)))
-    producerDataList.append(new ProducerData[String,Message]("topic2", new Message("msg2".getBytes)))
-    producerDataList.append(new ProducerData[String,Message]("topic1", new Message("msg3".getBytes)))
+    val producerDataList = new ArrayBuffer[KeyedMessage[String,Message]]
+    producerDataList.append(new KeyedMessage[String,Message]("topic1", new Message("msg1".getBytes)))
+    producerDataList.append(new KeyedMessage[String,Message]("topic2", new Message("msg2".getBytes)))
+    producerDataList.append(new KeyedMessage[String,Message]("topic1", new Message("msg3".getBytes)))
 
     val partitionedDataOpt = handler.partitionAndCollate(producerDataList)
     partitionedDataOpt match {
@@ -375,13 +376,14 @@ class AsyncProducerTest extends JUnit3Su
     val handler = new DefaultEventHandler[String,String]( config,
                                                           partitioner = null.asInstanceOf[Partitioner[String]],
                                                           encoder = new StringEncoder,
+                                                          keyEncoder = new StringEncoder,
                                                           producerPool = producerPool,
                                                           topicPartitionInfos)
 
     val producer = new Producer[String, String](config, handler)
     try {
       // send all 10 messages, should create 2 batches and 2 syncproducer calls
-      producer.send(msgs.map(m => new ProducerData[String,String](topic, List(m))): _*)
+      producer.send(msgs.map(m => new KeyedMessage[String,String](topic, m)): _*)
       producer.close
 
     } catch {
@@ -392,7 +394,8 @@ class AsyncProducerTest extends JUnit3Su
   @Test
   def testFailedSendRetryLogic() {
     val props = new Properties()
-    props.put("serializer.class", "kafka.serializer.StringEncoder")
+    props.put("serializer.class", classOf[StringEncoder].getName.toString)
+    props.put("key.serializer.class", classOf[NullEncoder[Int]].getName.toString)
     props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
 
     val config = new ProducerConfig(props)
@@ -407,11 +410,11 @@ class AsyncProducerTest extends JUnit3Su
     // produce request for topic1 and partitions 0 and 1.  Let the first request fail
     // entirely.  The second request will succeed for partition 1 but fail for partition 0.
     // On the third try for partition 0, let it succeed.
-    val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), TestUtils.messagesToSet(msgs), 0)
+    val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), 0)
     val response1 = ProducerResponse(ProducerRequest.CurrentVersion, 0,
       Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NotLeaderForPartitionCode.toShort, 0L)),
           (TopicAndPartition("topic1", 1), ProducerResponseStatus(ErrorMapping.NoError, 0L))))
-    val request2 = TestUtils.produceRequest(topic1, 0, TestUtils.messagesToSet(msgs))
+    val request2 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs))
     val response2 = ProducerResponse(ProducerRequest.CurrentVersion, 0,
       Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NoError, 0L))))
     val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
@@ -429,11 +432,11 @@ class AsyncProducerTest extends JUnit3Su
 
     val handler = new DefaultEventHandler[Int,String](config,
                                                       partitioner = new FixedValuePartitioner(),
-                                                      encoder = new StringEncoder,
+                                                      encoder = new StringEncoder(),
+                                                      keyEncoder = new NullEncoder[Int](),
                                                       producerPool = producerPool,
                                                       topicPartitionInfos)
-    val data = List(new ProducerData[Int,String](topic1, 0, msgs),
-                    new ProducerData[Int,String](topic1, 1, msgs))
+    val data = msgs.map(m => new KeyedMessage[Int,String](topic1, 0, m)) ++ msgs.map(m => new KeyedMessage[Int,String](topic1, 1, m))
     handler.handle(data)
     handler.close()
 
@@ -445,12 +448,8 @@ class AsyncProducerTest extends JUnit3Su
   def testJavaProducer() {
     val topic = "topic1"
     val msgs = TestUtils.getMsgStrings(5)
-    val scalaProducerData = msgs.map(m => new ProducerData[String, String](topic, List(m)))
-    val javaProducerData = scala.collection.JavaConversions.asList(msgs.map(m => {
-        val javaList = new LinkedList[String]()
-        javaList.add(m)
-        new kafka.javaapi.producer.ProducerData[String, String](topic, javaList)
-      }))
+    val scalaProducerData = msgs.map(m => new KeyedMessage[String, String](topic, m))
+    val javaProducerData = scala.collection.JavaConversions.asList(scalaProducerData)
 
     val mockScalaProducer = EasyMock.createMock(classOf[kafka.producer.Producer[String, String]])
     mockScalaProducer.send(scalaProducerData.head)
@@ -480,10 +479,10 @@ class AsyncProducerTest extends JUnit3Su
     }
   }
 
-  def getProduceData(nEvents: Int): Seq[ProducerData[String,String]] = {
-    val producerDataList = new ListBuffer[ProducerData[String,String]]
+  def getProduceData(nEvents: Int): Seq[KeyedMessage[String,String]] = {
+    val producerDataList = new ArrayBuffer[KeyedMessage[String,String]]
     for (i <- 0 until nEvents)
-      producerDataList.append(new ProducerData[String,String]("topic1", null, List("msg" + i)))
+      producerDataList.append(new KeyedMessage[String,String]("topic1", null, "msg" + i))
     producerDataList
   }
 
@@ -495,4 +494,16 @@ class AsyncProducerTest extends JUnit3Su
     val broker1 = new Broker(brokerId, brokerHost, brokerHost, brokerPort)
     new TopicMetadata(topic, partition.map(new PartitionMetadata(_, Some(broker1), List(broker1))))
   }
+  
+  def messagesToSet(messages: Seq[String]): ByteBufferMessageSet = {
+    new ByteBufferMessageSet(NoCompressionCodec, messages.map(m => new Message(m.getBytes)): _*)
+  }
+  
+  def messagesToSet(key: Array[Byte], messages: Seq[Array[Byte]]): ByteBufferMessageSet = {
+    new ByteBufferMessageSet(NoCompressionCodec, messages.map(m => new Message(key = key, bytes = m)): _*)
+  }
+}
+
+class NegativePartitioner(props: VerifiableProperties = null) extends Partitioner[String] {
+  def partition(data: String, numPartitions: Int): Int = -1
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Thu Nov 15 22:15:14 2012
@@ -97,7 +97,7 @@ class ProducerTest extends JUnit3Suite w
     val producerConfig1 = new ProducerConfig(props1)
     val producer1 = new Producer[String, String](producerConfig1)
     try{
-      producer1.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
+      producer1.send(new KeyedMessage[String, String]("new-topic", "test", "test1"))
       fail("Test should fail because the broker list provided are not valid")
     } catch {
       case e: KafkaException =>
@@ -112,7 +112,7 @@ class ProducerTest extends JUnit3Suite w
     val producerConfig2= new ProducerConfig(props2)
     val producer2 = new Producer[String, String](producerConfig2)
     try{
-      producer2.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
+      producer2.send(new KeyedMessage[String, String]("new-topic", "test", "test1"))
     } catch {
       case e => fail("Should succeed sending the message", e)
     } finally {
@@ -125,7 +125,7 @@ class ProducerTest extends JUnit3Suite w
     val producerConfig3 = new ProducerConfig(props3)
     val producer3 = new Producer[String, String](producerConfig3)
     try{
-      producer3.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
+      producer3.send(new KeyedMessage[String, String]("new-topic", "test", "test1"))
     } catch {
       case e => fail("Should succeed sending the message", e)
     } finally {
@@ -159,8 +159,8 @@ class ProducerTest extends JUnit3Suite w
     val producer1 = new Producer[String, String](producerConfig1)
     val producer2 = new Producer[String, String](producerConfig2)
     // Available partition ids should be 0.
-    producer1.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
-    producer1.send(new ProducerData[String, String]("new-topic", "test", Array("test2")))
+    producer1.send(new KeyedMessage[String, String]("new-topic", "test", "test1"))
+    producer1.send(new KeyedMessage[String, String]("new-topic", "test", "test2"))
     // get the leader
     val leaderOpt = ZkUtils.getLeaderForPartition(zkClient, "new-topic", 0)
     assertTrue("Leader for topic new-topic partition 0 should exist", leaderOpt.isDefined)
@@ -174,12 +174,12 @@ class ProducerTest extends JUnit3Suite w
       response2.messageSet("new-topic", 0).iterator.toBuffer
     }
     assertEquals("Should have fetched 2 messages", 2, messageSet.size)
-    assertEquals(new Message("test1".getBytes), messageSet(0).message)
-    assertEquals(new Message("test2".getBytes), messageSet(1).message)
+    assertEquals(new Message(bytes = "test1".getBytes, key = "test".getBytes), messageSet(0).message)
+    assertEquals(new Message(bytes = "test2".getBytes, key = "test".getBytes), messageSet(1).message)
     producer1.close()
 
     try {
-      producer2.send(new ProducerData[String, String]("new-topic", "test", Array("test2")))
+      producer2.send(new KeyedMessage[String, String]("new-topic", "test", "test2"))
       fail("Should have timed out for 3 acks.")
     }
     catch {
@@ -215,7 +215,7 @@ class ProducerTest extends JUnit3Suite w
     try {
       // Available partition ids should be 0, 1, 2 and 3, all lead and hosted only
       // on broker 0
-      producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
+      producer.send(new KeyedMessage[String, String]("new-topic", "test", "test1"))
     } catch {
       case e => fail("Unexpected exception: " + e)
     }
@@ -226,7 +226,7 @@ class ProducerTest extends JUnit3Suite w
 
     try {
       // These sends should fail since there are no available brokers
-      producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
+      producer.send(new KeyedMessage[String, String]("new-topic", "test", "test1"))
       fail("Should fail since no leader exists for the partition.")
     } catch {
       case e => // success
@@ -241,7 +241,7 @@ class ProducerTest extends JUnit3Suite w
       val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
       val messageSet1 = response1.messageSet("new-topic", 0).iterator
       assertTrue("Message set should have 1 message", messageSet1.hasNext)
-      assertEquals(new Message("test1".getBytes), messageSet1.next.message)
+      assertEquals(new Message(bytes = "test1".getBytes, key = "test".getBytes), messageSet1.next.message)
       assertFalse("Message set should have another message", messageSet1.hasNext)
     } catch {
       case e: Exception => fail("Not expected", e)
@@ -270,7 +270,7 @@ class ProducerTest extends JUnit3Suite w
     // do a simple test to make sure plumbing is okay
     try {
       // this message should be assigned to partition 0 whose leader is on broker 0
-      producer.send(new ProducerData[String, String]("new-topic", "test", Array("test")))
+      producer.send(new KeyedMessage[String, String]("new-topic", "test", "test"))
       // cross check if brokers got the messages
       val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
       val messageSet1 = response1.messageSet("new-topic", 0).iterator
@@ -288,7 +288,7 @@ class ProducerTest extends JUnit3Suite w
     try {
       // this message should be assigned to partition 0 whose leader is on broker 0, but
       // broker 0 will not response within timeoutMs millis.
-      producer.send(new ProducerData[String, String]("new-topic", "test", Array("test")))
+      producer.send(new KeyedMessage[String, String]("new-topic", "test", "test"))
     } catch {
       case e: FailedToSendMessageException => /* success */
       case e: Exception => fail("Not expected", e)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala Thu Nov 15 22:15:14 2012
@@ -20,10 +20,12 @@ import org.scalatest.junit.JUnit3Suite
 import org.junit.Assert._
 import kafka.admin.CreateTopicCommand
 import kafka.utils.TestUtils._
+import kafka.utils.IntEncoder
 import kafka.utils.{Utils, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
+import kafka.serializer._
 import kafka.message.Message
-import kafka.producer.{ProducerConfig, ProducerData, Producer}
+import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
 
 class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
 
@@ -42,12 +44,17 @@ class LogRecoveryTest extends JUnit3Suit
   val configProps1 = configs.head
   val configProps2 = configs.last
 
-  val message = new Message("hello".getBytes())
+  val message = "hello"
 
-  var producer: Producer[Int, Message] = null
+  var producer: Producer[Int, String] = null
   var hwFile1: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps1.logDirs(0))
   var hwFile2: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps2.logDirs(0))
   var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
+  
+  val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000)
+  producerProps.put("key.serializer.class", classOf[IntEncoder].getName.toString)
+  producerProps.put("producer.request.required.acks", "-1")
+
 
   def testHWCheckpointNoFailuresSingleLogSegment {
     // start both servers
@@ -55,10 +62,7 @@ class LogRecoveryTest extends JUnit3Suit
     server2 = TestUtils.createServer(configProps2)
     servers ++= List(server1, server2)
 
-    val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000)
-    producerProps.put("producer.request.timeout.ms", "1000")
-    producerProps.put("producer.request.required.acks", "-1")
-    producer = new Producer[Int, Message](new ProducerConfig(producerProps))
+    producer = new Producer[Int, String](new ProducerConfig(producerProps))
 
     // create topic with 1 partition, 2 replicas, one on each broker
     CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
@@ -92,10 +96,7 @@ class LogRecoveryTest extends JUnit3Suit
     server2 = TestUtils.createServer(configProps2)
     servers ++= List(server1, server2)
 
-    val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000)
-    producerProps.put("producer.request.timeout.ms", "1000")
-    producerProps.put("producer.request.required.acks", "-1")
-    producer = new Producer[Int, Message](new ProducerConfig(producerProps))
+    producer = new Producer[Int, String](new ProducerConfig(producerProps))
 
     // create topic with 1 partition, 2 replicas, one on each broker
     CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
@@ -152,14 +153,6 @@ class LogRecoveryTest extends JUnit3Suit
   }
 
   def testHWCheckpointNoFailuresMultipleLogSegments {
-    val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
-      override val replicaMaxLagTimeMs = 5000L
-      override val replicaMaxLagBytes = 10L
-      override val flushInterval = 10
-      override val replicaMinBytes = 20
-      override val logFileSize = 30
-    })
-
     // start both servers
     server1 = TestUtils.createServer(configs.head)
     server2 = TestUtils.createServer(configs.last)
@@ -168,10 +161,7 @@ class LogRecoveryTest extends JUnit3Suit
     hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDirs(0))
     hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDirs(0))
 
-    val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000)
-    producerProps.put("producer.request.timeout.ms", "1000")
-    producerProps.put("producer.request.required.acks", "-1")
-    producer = new Producer[Int, Message](new ProducerConfig(producerProps))
+    producer = new Producer[Int, String](new ProducerConfig(producerProps))
 
     // create topic with 1 partition, 2 replicas, one on each broker
     CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
@@ -197,14 +187,6 @@ class LogRecoveryTest extends JUnit3Suit
   }
 
   def testHWCheckpointWithFailuresMultipleLogSegments {
-    val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
-      override val replicaMaxLagTimeMs = 5000L
-      override val replicaMaxLagBytes = 10L
-      override val flushInterval = 1000
-      override val replicaMinBytes = 20
-      override val logFileSize = 30
-    })
-
     // start both servers
     server1 = TestUtils.createServer(configs.head)
     server2 = TestUtils.createServer(configs.last)
@@ -213,10 +195,7 @@ class LogRecoveryTest extends JUnit3Suit
     hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDirs(0))
     hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDirs(0))
 
-    val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000)
-    producerProps.put("producer.request.timeout.ms", "1000")
-    producerProps.put("producer.request.required.acks", "-1")
-    producer = new Producer[Int, Message](new ProducerConfig(producerProps))
+    producer = new Producer[Int, String](new ProducerConfig(producerProps))
 
     // create topic with 1 partition, 2 replicas, one on each broker
     CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
@@ -268,6 +247,6 @@ class LogRecoveryTest extends JUnit3Suit
 
   private def sendMessages(n: Int = 1) {
     for(i <- 0 until n)
-      producer.send(new ProducerData[Int, Message](topic, 0, message))
+      producer.send(new KeyedMessage[Int, String](topic, 0, message))
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala Thu Nov 15 22:15:14 2012
@@ -20,7 +20,7 @@ package kafka.server
 import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.TestUtils._
-import kafka.producer.ProducerData
+import kafka.producer.KeyedMessage
 import kafka.serializer.StringEncoder
 import kafka.admin.CreateTopicCommand
 import kafka.utils.TestUtils
@@ -55,9 +55,11 @@ class ReplicaFetchTest extends JUnit3Sui
     }
 
     // send test messages to leader
-    val producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromConfigs(configs), new StringEncoder)
-    producer.send(new ProducerData[String, String](topic1, testMessageList1),
-                  new ProducerData[String, String](topic2, testMessageList2))
+    val producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromConfigs(configs), 
+                                                            new StringEncoder(), 
+                                                            new StringEncoder())
+    val messages = testMessageList1.map(m => new KeyedMessage(topic1, m, m)) ++ testMessageList2.map(m => new KeyedMessage(topic2, m, m))
+    producer.send(messages:_*)
     producer.close()
 
     def logsMatch(): Boolean = {



Mime
View raw message