kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1295861 [2/3] - in /incubator/kafka/branches/0.8: core/src/main/scala/kafka/admin/ core/src/main/scala/kafka/api/ core/src/main/scala/kafka/cluster/ core/src/main/scala/kafka/common/ core/src/main/scala/kafka/consumer/ core/src/main/scala/...
Date Thu, 01 Mar 2012 21:15:28 GMT
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala Thu Mar  1 21:15:26 2012
@@ -22,15 +22,19 @@ import junit.framework.Assert._
 import kafka.integration.KafkaServerTestHarness
 import kafka.server._
 import scala.collection._
-import kafka.utils.{Utils, Logging}
-import kafka.utils.{TestZKUtils, TestUtils}
 import org.scalatest.junit.JUnit3Suite
 import org.apache.log4j.{Level, Logger}
 import kafka.message._
 import kafka.serializer.StringDecoder
+import kafka.admin.CreateTopicCommand
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils._
+import kafka.producer.{ProducerConfig, ProducerData, Producer}
+import java.util.{Collections, Properties}
 
 class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
 
+  var dirs : ZKGroupTopicDirs = null
   val zookeeperConnect = TestZKUtils.zookeeperConnect
   val numNodes = 2
   val numParts = 2
@@ -48,25 +52,28 @@ class ZookeeperConsumerConnectorTest ext
   val consumer3 = "consumer3"
   val nMessages = 2
 
+  override def setUp() {
+    super.setUp()
+    dirs = new ZKGroupTopicDirs(group, topic)
+  }
+
   def testBasic() {
     val requestHandlerLogger = Logger.getLogger(classOf[KafkaApis])
     requestHandlerLogger.setLevel(Level.FATAL)
 
-    var actualMessages: List[Message] = Nil
-
     // test consumer timeout logic
     val consumerConfig0 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer0)) {
       override val consumerTimeoutMs = 200
     }
     val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true)
-    val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2))
+    val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(Predef.Map(topic -> 1))
 
     // no messages to consume, we should hit timeout;
     // also the iterator should support re-entrant, so loop it twice
-    for (i <- 0 until  2) {
+    for (i <- 0 until 2) {
       try {
-        getMessagesSortedByChecksum(nMessages*2, topicMessageStreams0)
+        getMessages(nMessages*2, topicMessageStreams0)
         fail("should get an exception")
       }
       catch {
@@ -78,14 +85,26 @@ class ZookeeperConsumerConnectorTest ext
     zkConsumerConnector0.shutdown
 
     // send some messages to each broker
-    val sentMessages1 = sendMessages(nMessages, "batch1")
+    val sentMessages1_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages)
+    val sentMessages1_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
+    val sentMessages1 = (sentMessages1_1 ++ sentMessages1_2).sortWith((s,t) => s.checksum < t.checksum)
+
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer1))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
-    val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2))
-    val receivedMessages1 = getMessagesSortedByChecksum(nMessages*2, topicMessageStreams1)
+    val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1))
+    val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
+    assertEquals(sentMessages1.size, receivedMessages1.size)
     assertEquals(sentMessages1, receivedMessages1)
+
+    // also check partition ownership
+    val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
+    val expected_1 = List( ("0", "group1_consumer1-0"),
+                           ("1", "group1_consumer1-0"))
+//   assertEquals(expected_1, actual_1)
+    assertEquals(expected_1, actual_1)
+
     // commit consumed offsets
     zkConsumerConnector1.commitOffsets
 
@@ -93,15 +112,25 @@ class ZookeeperConsumerConnectorTest ext
     val consumerConfig2 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer2))
     val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
-    val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2))
+    val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Predef.Map(topic -> 1))
     // send some messages to each broker
-    val sentMessages2 = sendMessages(nMessages, "batch2")
+    val sentMessages2_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages)
+    val sentMessages2_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
+    val sentMessages2 = (sentMessages2_1 ++ sentMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
+
     Thread.sleep(200)
-    val receivedMessages2_1 = getMessagesSortedByChecksum(nMessages, topicMessageStreams1)
-    val receivedMessages2_2 = getMessagesSortedByChecksum(nMessages, topicMessageStreams2)
+
+    val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
+    val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
     val receivedMessages2 = (receivedMessages2_1 ::: receivedMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
     assertEquals(sentMessages2, receivedMessages2)
 
+    // also check partition ownership
+    val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir)
+    val expected_2 = List( ("0", "group1_consumer1-0"),
+                           ("1", "group1_consumer2-0"))
+   assertEquals(expected_2, actual_2)
+
     // create a consumer with empty map
     val consumerConfig3 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer3))
@@ -109,13 +138,20 @@ class ZookeeperConsumerConnectorTest ext
     val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
     // send some messages to each broker
     Thread.sleep(200)
-    val sentMessages3 = sendMessages(nMessages, "batch3")
+    val sentMessages3_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages)
+    val sentMessages3_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
+    val sentMessages3 = (sentMessages3_1 ++ sentMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
     Thread.sleep(200)
-    val receivedMessages3_1 = getMessagesSortedByChecksum(nMessages, topicMessageStreams1)
-    val receivedMessages3_2 = getMessagesSortedByChecksum(nMessages, topicMessageStreams2)
+    val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
+    val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
     val receivedMessages3 = (receivedMessages3_1 ::: receivedMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
+    assertEquals(sentMessages3.size, receivedMessages3.size)
     assertEquals(sentMessages3, receivedMessages3)
 
+    // also check partition ownership
+    val actual_3 = getZKChildrenValues(dirs.consumerOwnerDir)
+   assertEquals(expected_2, actual_3)
+
     zkConsumerConnector1.shutdown
     zkConsumerConnector2.shutdown
     zkConsumerConnector3.shutdown
@@ -127,48 +163,73 @@ class ZookeeperConsumerConnectorTest ext
     val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
     requestHandlerLogger.setLevel(Level.FATAL)
 
-    println("Sending messages for 1st consumer")
     // send some messages to each broker
-    val sentMessages1 = sendMessages(nMessages, "batch1", DefaultCompressionCodec)
+    val sentMessages1_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec)
+    val sentMessages1_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
+    val sentMessages1 = (sentMessages1_1 ++ sentMessages1_2).sortWith((s,t) => s.checksum < t.checksum)
+
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer1))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
-    val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2))
-    val receivedMessages1 = getMessagesSortedByChecksum(nMessages*2, topicMessageStreams1)
+    val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1))
+    val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
+    assertEquals(sentMessages1.size, receivedMessages1.size)
     assertEquals(sentMessages1, receivedMessages1)
+
+    // also check partition ownership
+    val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
+    val expected_1 = List( ("0", "group1_consumer1-0"),
+                           ("1", "group1_consumer1-0"))
+   assertEquals(expected_1, actual_1)
+
     // commit consumed offsets
     zkConsumerConnector1.commitOffsets
 
-    println("Sending more messages for 2nd consumer")
     // create a consumer
     val consumerConfig2 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer2))
     val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
-    val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2))
+    val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Predef.Map(topic -> 1))
     // send some messages to each broker
-    val sentMessages2 = sendMessages(nMessages, "batch2", DefaultCompressionCodec)
+    val sentMessages2_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec)
+    val sentMessages2_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
+    val sentMessages2 = (sentMessages2_1 ++ sentMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
+
     Thread.sleep(200)
-    val receivedMessages2_1 = getMessagesSortedByChecksum(nMessages, topicMessageStreams1)
-    val receivedMessages2_2 = getMessagesSortedByChecksum(nMessages, topicMessageStreams2)
+
+    val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
+    val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
     val receivedMessages2 = (receivedMessages2_1 ::: receivedMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
     assertEquals(sentMessages2, receivedMessages2)
 
+    // also check partition ownership
+    val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir)
+    val expected_2 = List( ("0", "group1_consumer1-0"),
+                           ("1", "group1_consumer2-0"))
+   assertEquals(expected_2, actual_2)
+
     // create a consumer with empty map
-    println("Sending more messages for 3rd consumer")
     val consumerConfig3 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer3))
     val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
     val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
     // send some messages to each broker
     Thread.sleep(200)
-    val sentMessages3 = sendMessages(nMessages, "batch3", DefaultCompressionCodec)
+    val sentMessages3_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec)
+    val sentMessages3_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
+    val sentMessages3 = (sentMessages3_1 ++ sentMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
     Thread.sleep(200)
-    val receivedMessages3_1 = getMessagesSortedByChecksum(nMessages, topicMessageStreams1)
-    val receivedMessages3_2 = getMessagesSortedByChecksum(nMessages, topicMessageStreams2)
+    val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
+    val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
     val receivedMessages3 = (receivedMessages3_1 ::: receivedMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
+    assertEquals(sentMessages3.size, receivedMessages3.size)
     assertEquals(sentMessages3, receivedMessages3)
 
+    // also check partition ownership
+    val actual_3 = getZKChildrenValues(dirs.consumerOwnerDir)
+   assertEquals(expected_2, actual_3)
+
     zkConsumerConnector1.shutdown
     zkConsumerConnector2.shutdown
     zkConsumerConnector3.shutdown
@@ -187,7 +248,10 @@ class ZookeeperConsumerConnectorTest ext
     Thread.sleep(500)
 
     // send some messages to each broker
-    val sentMessages = sendMessages(configs.head, 200, "batch1", DefaultCompressionCodec)
+    val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, 200, DefaultCompressionCodec)
+    val sentMessages2 = sendMessagesToBrokerPartition(configs.last, topic, 1, 200, DefaultCompressionCodec)
+    val sentMessages = (sentMessages1 ++ sentMessages2).sortWith((s,t) => s.checksum < t.checksum)
+
     // test consumer timeout logic
     val consumerConfig0 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer0)) {
@@ -195,16 +259,30 @@ class ZookeeperConsumerConnectorTest ext
     }
     val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true)
     val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(Predef.Map(topic -> 1))
-    getMessagesSortedByChecksum(100, topicMessageStreams0)
+    getMessages(100, topicMessageStreams0)
+
+    // also check partition ownership
+    val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
+    val expected_1 = List( ("0", "group1_consumer0-0"),
+                           ("1", "group1_consumer0-0"))
+   assertEquals(expected_1, actual_1)
+
     zkConsumerConnector0.shutdown
     // at this point, only some part of the message set was consumed. So consumed offset should still be 0
     // also fetched offset should be 0
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig0, true)
     val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1))
-    val receivedMessages = getMessagesSortedByChecksum(400, topicMessageStreams1)
+    val receivedMessages = getMessages(400, topicMessageStreams1)
     val sortedReceivedMessages = receivedMessages.sortWith((s,t) => s.checksum < t.checksum)
     val sortedSentMessages = sentMessages.sortWith((s,t) => s.checksum < t.checksum)
     assertEquals(sortedSentMessages, sortedReceivedMessages)
+
+    // also check partition ownership
+    val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir)
+    val expected_2 = List( ("0", "group1_consumer0-0"),
+                           ("1", "group1_consumer0-0"))
+   assertEquals(expected_2, actual_2)
+
     zkConsumerConnector1.shutdown
 
     requestHandlerLogger.setLevel(Level.ERROR)
@@ -214,17 +292,18 @@ class ZookeeperConsumerConnectorTest ext
     val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
     requestHandlerLogger.setLevel(Level.FATAL)
 
-    val sentMessages = sendMessages(nMessages, "batch1", NoCompressionCodec).
-      map(m => Utils.toString(m.payload, "UTF-8")).
+    // send some messages to each broker
+    val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, NoCompressionCodec)
+    val sentMessages2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, NoCompressionCodec)
+    val sentMessages = (sentMessages1 ++ sentMessages2).map(m => Utils.toString(m.payload, "UTF-8")).
       sortWith((s, t) => s.compare(t) == -1)
-    val consumerConfig = new ConsumerConfig(
-      TestUtils.createConsumerProperties(zkConnect, group, consumer1))
+
+    val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
 
     val zkConsumerConnector =
       new ZookeeperConsumerConnector(consumerConfig, true)
     val topicMessageStreams =
-      zkConsumerConnector.createMessageStreams(
-        Predef.Map(topic -> numNodes*numParts/2), new StringDecoder)
+      zkConsumerConnector.createMessageStreams(Predef.Map(topic -> 1), new StringDecoder)
 
     var receivedMessages: List[String] = Nil
     for ((topic, messageStreams) <- topicMessageStreams) {
@@ -245,31 +324,106 @@ class ZookeeperConsumerConnectorTest ext
     requestHandlerLogger.setLevel(Level.ERROR)
   }
 
-  def sendMessages(conf: KafkaConfig, messagesPerNode: Int, header: String, compression: CompressionCodec): List[Message]= {
+  def testLeaderSelectionForPartition() {
+    val zkClient = new ZkClient(zookeeperConnect, 6000, 30000, ZKStringSerializer)
+
+    // create topic topic1 with 1 partition on broker 0
+    CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
+
+    // send some messages to each broker
+    val sentMessages1 = sendMessages(configs.head, nMessages, "batch1", NoCompressionCodec, 1)
+
+    // create a consumer
+    val consumerConfig1 = new ConsumerConfig(
+      TestUtils.createConsumerProperties(zkConnect, group, consumer1))
+    val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
+    val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1))
+    val topicRegistry = zkConsumerConnector1.getTopicRegistry
+    assertEquals(1, topicRegistry.map(r => r._1).size)
+    assertEquals(topic, topicRegistry.map(r => r._1).head)
+    val topicsAndPartitionsInRegistry = topicRegistry.map(r => (r._1, r._2.map(p => p._1)))
+    val brokerPartition = topicsAndPartitionsInRegistry.head._2.head
+    assertEquals(0, brokerPartition.brokerId)
+    assertEquals(0, brokerPartition.partId)
+
+    // also check partition ownership
+    val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
+    val expected_1 = List( ("0", "group1_consumer1-0"))
+   assertEquals(expected_1, actual_1)
+
+    val receivedMessages1 = getMessages(nMessages, topicMessageStreams1)
+    assertEquals(nMessages, receivedMessages1.size)
+    assertEquals(sentMessages1, receivedMessages1)
+  }
+
+  def sendMessagesToBrokerPartition(config: KafkaConfig, topic: String, partition: Int, numMessages: Int,
+                                    compression: CompressionCodec = NoCompressionCodec): List[Message] = {
+    val header = "test-%d-%d".format(config.brokerId, partition)
+    val props = new Properties()
+    props.put("zk.connect", zkConnect)
+    props.put("partitioner.class", "kafka.utils.FixedValuePartitioner")
+    props.put("compression.codec", compression.codec.toString)
+    val producer: Producer[Int, Message] = new Producer[Int, Message](new ProducerConfig(props))
+    val ms = 0.until(numMessages).map(x =>
+      new Message((header + config.brokerId + "-" + partition + "-" + x).getBytes)).toArray
+    producer.send(new ProducerData[Int, Message](topic, partition, ms))
+    debug("Sent %d messages to broker %d for topic %s and partition %d".format(ms.size, config.brokerId, topic, partition))
+    producer
+    ms.toList
+  }
+
+  def sendMessages(conf: KafkaConfig, messagesPerNode: Int, header: String, compression: CompressionCodec, numParts: Int): List[Message]= {
     var messages: List[Message] = Nil
-    val producer = TestUtils.createProducer("localhost", conf.port)
+    val props = new Properties()
+    props.put("zk.connect", zkConnect)
+    props.put("partitioner.class", "kafka.utils.FixedValuePartitioner")
+    val producer: Producer[Int, Message] = new Producer[Int, Message](new ProducerConfig(props))
+
     for (partition <- 0 until numParts) {
       val ms = 0.until(messagesPerNode).map(x =>
         new Message((header + conf.brokerId + "-" + partition + "-" + x).getBytes)).toArray
-      val mSet = new ByteBufferMessageSet(compressionCodec = compression, messages = ms: _*)
       for (message <- ms)
         messages ::= message
-      producer.send(topic, partition, mSet)
+      producer.send(new ProducerData[Int, Message](topic, partition, ms))
+      debug("Sent %d messages to broker %d for topic %s and partition %d".format(ms.size, conf.brokerId, topic, partition))
     }
     producer.close()
-    messages
+    messages.reverse
   }
 
   def sendMessages(messagesPerNode: Int, header: String, compression: CompressionCodec = NoCompressionCodec): List[Message]= {
     var messages: List[Message] = Nil
     for(conf <- configs) {
-      messages ++= sendMessages(conf, messagesPerNode, header, compression)
+      messages ++= sendMessages(conf, messagesPerNode, header, compression, numParts)
     }
     messages.sortWith((s,t) => s.checksum < t.checksum)
   }
 
-  def getMessagesSortedByChecksum(nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaMessageStream[Message]]]): List[Message]= {
-    val messages = TestUtils.getConsumedMessages(nMessagesPerThread, topicMessageStreams)
+  def getMessages(nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaMessageStream[Message]]]): List[Message]= {
+    var messages: List[Message] = Nil
+    for ((topic, messageStreams) <- topicMessageStreams) {
+      for (messageStream <- messageStreams) {
+        val iterator = messageStream.iterator
+        for (i <- 0 until nMessagesPerThread) {
+          assertTrue(iterator.hasNext)
+          val message = iterator.next
+          messages ::= message
+          debug("received message: " + Utils.toString(message.payload, "UTF-8"))
+        }
+      }
+    }
     messages.sortWith((s,t) => s.checksum < t.checksum)
   }
+
+  def getZKChildrenValues(path : String) : Seq[Tuple2[String,String]] = {
+    import scala.collection.JavaConversions
+    val children = zookeeper.client.getChildren(path)
+    Collections.sort(children)
+    val childrenAsSeq : Seq[java.lang.String] = JavaConversions.asBuffer(children)
+    childrenAsSeq.map(partition =>
+      (partition, zookeeper.client.readData(path + "/" + partition).asInstanceOf[String]))
+  }
+
 }
+
+

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala Thu Mar  1 21:15:26 2012
@@ -18,7 +18,6 @@
 package kafka.integration
 
 import junit.framework.Assert._
-import kafka.zk.ZooKeeperTestHarness
 import java.nio.channels.ClosedByInterruptException
 import java.util.concurrent.atomic.AtomicInteger
 import kafka.utils.{ZKGroupTopicDirs, Logging}
@@ -27,15 +26,16 @@ import kafka.server._
 import org.apache.log4j.{Level, Logger}
 import org.scalatest.junit.JUnit3Suite
 import kafka.utils.TestUtils
+import kafka.message.Message
+import kafka.producer.{Producer, ProducerData}
 
-class AutoOffsetResetTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
+class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
 
   val topic = "test_topic"
   val group = "default_group"
   val testConsumer = "consumer"
   val brokerPort = 9892
-  val kafkaConfig = new KafkaConfig(TestUtils.createBrokerConfig(0, brokerPort))
-  var kafkaServer : KafkaServer = null
+  val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0, brokerPort)))
   val numMessages = 10
   val largeOffset = 10000
   val smallOffset = -1
@@ -44,7 +44,6 @@ class AutoOffsetResetTest extends JUnit3
 
   override def setUp() {
     super.setUp()
-    kafkaServer = TestUtils.createServer(kafkaConfig)
 
     // temporarily set request handler logger to a higher level
     requestHandlerLogger.setLevel(Level.FATAL)
@@ -53,15 +52,14 @@ class AutoOffsetResetTest extends JUnit3
   override def tearDown() {
     // restore set request handler logger to a higher level
     requestHandlerLogger.setLevel(Level.ERROR)
-    kafkaServer.shutdown
     super.tearDown
   }
   
   def testEarliestOffsetResetForward() = {
-    val producer = TestUtils.createProducer("localhost", brokerPort)
+    val producer: Producer[String, Message] = TestUtils.createProducer(zkConnect)
 
     for(i <- 0 until numMessages) {
-      producer.send(topic, TestUtils.singleMessageSet("test".getBytes()))
+      producer.send(new ProducerData[String, Message](topic, topic, new Message("test".getBytes())))
     }
 
     // update offset in zookeeper for consumer to jump "forward" in time
@@ -71,7 +69,7 @@ class AutoOffsetResetTest extends JUnit3
     consumerProps.put("consumer.timeout.ms", "2000")
     val consumerConfig = new ConsumerConfig(consumerProps)
     
-    TestUtils.updateConsumerOffset(consumerConfig, dirs.consumerOffsetDir + "/" + "0-0", largeOffset)
+    TestUtils.updateConsumerOffset(consumerConfig, dirs.consumerOffsetDir + "/" + "0", largeOffset)
     info("Updated consumer offset to " + largeOffset)
 
     Thread.sleep(500)
@@ -112,10 +110,10 @@ class AutoOffsetResetTest extends JUnit3
   }
 
   def testEarliestOffsetResetBackward() = {
-    val producer = TestUtils.createProducer("localhost", brokerPort)
+    val producer: Producer[String, Message] = TestUtils.createProducer(zkConnect)
 
     for(i <- 0 until numMessages) {
-      producer.send(topic, TestUtils.singleMessageSet("test".getBytes()))
+      producer.send(new ProducerData[String, Message](topic, topic, new Message("test".getBytes())))
     }
 
     // update offset in zookeeper for consumer to jump "forward" in time
@@ -125,7 +123,7 @@ class AutoOffsetResetTest extends JUnit3
     consumerProps.put("consumer.timeout.ms", "2000")
     val consumerConfig = new ConsumerConfig(consumerProps)
 
-    TestUtils.updateConsumerOffset(consumerConfig, dirs.consumerOffsetDir + "/" + "0-0", smallOffset)
+    TestUtils.updateConsumerOffset(consumerConfig, dirs.consumerOffsetDir + "/" + "0", smallOffset)
     info("Updated consumer offset to " + smallOffset)
 
 
@@ -145,7 +143,7 @@ class AutoOffsetResetTest extends JUnit3
               }
             }
             catch {
-              case _: InterruptedException => 
+              case _: InterruptedException =>
               case _: ClosedByInterruptException =>
               case e => throw e
             }
@@ -159,16 +157,15 @@ class AutoOffsetResetTest extends JUnit3
 
     threadList(0).join(2000)
 
-    info("Asserting...")
     assertEquals(numMessages, nMessages.get)
     consumerConnector.shutdown
   }
 
   def testLatestOffsetResetForward() = {
-    val producer = TestUtils.createProducer("localhost", brokerPort)
+    val producer: Producer[String, Message] = TestUtils.createProducer(zkConnect)
 
     for(i <- 0 until numMessages) {
-      producer.send(topic, TestUtils.singleMessageSet("test".getBytes()))
+      producer.send(new ProducerData[String, Message](topic, topic, new Message("test".getBytes())))
     }
 
     // update offset in zookeeper for consumer to jump "forward" in time
@@ -178,7 +175,7 @@ class AutoOffsetResetTest extends JUnit3
     consumerProps.put("consumer.timeout.ms", "2000")
     val consumerConfig = new ConsumerConfig(consumerProps)
 
-    TestUtils.updateConsumerOffset(consumerConfig, dirs.consumerOffsetDir + "/" + "0-0", largeOffset)
+    TestUtils.updateConsumerOffset(consumerConfig, dirs.consumerOffsetDir + "/" + "0", largeOffset)
     info("Updated consumer offset to " + largeOffset)
 
 
@@ -198,7 +195,7 @@ class AutoOffsetResetTest extends JUnit3
               }
             }
             catch {
-              case _: InterruptedException => 
+              case _: InterruptedException =>
               case _: ClosedByInterruptException =>
               case e => throw e
             }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala Thu Mar  1 21:15:26 2012
@@ -28,6 +28,7 @@ import kafka.server._
 import org.scalatest.junit.JUnit3Suite
 import kafka.integration.KafkaServerTestHarness
 import kafka.utils.TestUtils
+import kafka.producer.{ProducerData, Producer}
 
 class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
 
@@ -35,7 +36,7 @@ class FetcherTest extends JUnit3Suite wi
   val configs = 
     for(props <- TestUtils.createBrokerConfigs(numNodes))
       yield new KafkaConfig(props)
-  val messages = new mutable.HashMap[Int, ByteBufferMessageSet]
+  val messages = new mutable.HashMap[Int, Seq[Message]]
   val topic = "topic"
   val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, c.brokerId.toString, "localhost", c.port)))
   val shutdown = ZookeeperConsumerConnector.shutdownCommand
@@ -79,11 +80,10 @@ class FetcherTest extends JUnit3Suite wi
   def sendMessages(messagesPerNode: Int): Int = {
     var count = 0
     for(conf <- configs) {
-      val producer = TestUtils.createProducer("localhost", conf.port)
+      val producer: Producer[String, Message] = TestUtils.createProducer(zkConnect)
       val ms = 0.until(messagesPerNode).map(x => new Message((conf.brokerId * 5 + x).toString.getBytes)).toArray
-      val mSet = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = ms: _*)
-      messages += conf.brokerId -> mSet
-      producer.send(topic, mSet)
+      messages += conf.brokerId -> ms
+      producer.send(new ProducerData[String, Message](topic, topic, ms))
       producer.close()
       count += ms.size
     }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala Thu Mar  1 21:15:26 2012
@@ -17,33 +17,31 @@
 
 package kafka.integration
 
-import kafka.api.{FetchRequestBuilder, ProducerRequest}
+import kafka.api.FetchRequestBuilder
 import kafka.common.OffsetOutOfRangeException
-import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
-import kafka.server.{KafkaRequestHandler, KafkaServer, KafkaConfig}
-import kafka.utils.{TestUtils, Utils}
-import kafka.zk.ZooKeeperTestHarness
+import kafka.message.{Message, ByteBufferMessageSet}
+import kafka.server.{KafkaRequestHandler, KafkaConfig}
+import kafka.utils.TestUtils
 import org.apache.log4j.{Level, Logger}
 import org.scalatest.junit.JUnit3Suite
 import scala.collection._
+import kafka.producer.ProducerData
 
 /**
  * End to end tests of the primitive apis against a local server
  */
-class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness with ZooKeeperTestHarness  {
+class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness {
 
   val port = TestUtils.choosePort
   val props = TestUtils.createBrokerConfig(0, port)
   val config = new KafkaConfig(props)
   val configs = List(config)
-  var servers: List[KafkaServer] = null
   val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
 
   override def setUp() {
     super.setUp
     if(configs.size <= 0)
       throw new IllegalArgumentException("Must suply at least one server config.")
-    servers = configs.map(TestUtils.createServer(_))
 
     // temporarily set request handler logger to a higher level
     requestHandlerLogger.setLevel(Level.FATAL)    
@@ -54,24 +52,21 @@ class LazyInitProducerTest extends JUnit
     requestHandlerLogger.setLevel(Level.ERROR)
 
     super.tearDown    
-    servers.map(server => server.shutdown())
-    servers.map(server => Utils.rm(server.config.logDir))
   }
   
   def testProduceAndFetch() {
     // send some messages
     val topic = "test"
-    val sent = new ByteBufferMessageSet(NoCompressionCodec,
-                                        new Message("hello".getBytes()), new Message("there".getBytes()))
-    producer.send(topic, sent)
-    sent.getBuffer.rewind
+    val sentMessages = List(new Message("hello".getBytes()), new Message("there".getBytes()))
+    val producerData = new ProducerData[String, Message](topic, topic, sentMessages)
 
+    producer.send(producerData)
     var fetchedMessage: ByteBufferMessageSet = null
     while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
       val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
       fetchedMessage = fetched.messageSet(topic, 0)
     }
-    TestUtils.checkEquals(sent.iterator, fetchedMessage.iterator)
+    TestUtils.checkEquals(sentMessages.iterator, fetchedMessage.map(m => m.message).iterator)
 
     // send an invalid offset
     try {
@@ -87,14 +82,12 @@ class LazyInitProducerTest extends JUnit
     // send some messages, with non-ordered topics
     val topicOffsets = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
     {
-      val messages = new mutable.HashMap[String, ByteBufferMessageSet]
+      val messages = new mutable.HashMap[String, Seq[Message]]
       val builder = new FetchRequestBuilder()
       for( (topic, offset) <- topicOffsets) {
-        val set = new ByteBufferMessageSet(NoCompressionCodec,
-                                           new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
-        producer.send(topic, set)
-        set.getBuffer.rewind
-        messages += topic -> set
+        val producedData = List(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
+        messages += topic -> producedData
+        producer.send(new ProducerData[String, Message](topic, topic, producedData))
         builder.addFetch(topic, offset, 0, 10000)
       }
 
@@ -104,7 +97,7 @@ class LazyInitProducerTest extends JUnit
       val response = consumer.fetch(request)
       for( (topic, offset) <- topicOffsets) {
         val fetched = response.messageSet(topic, offset)
-        TestUtils.checkEquals(messages(topic).iterator, fetched.iterator)
+        TestUtils.checkEquals(messages(topic).iterator, fetched.map(m => m.message).iterator)
       }
     }
 
@@ -121,7 +114,7 @@ class LazyInitProducerTest extends JUnit
           responses.messageSet(topic, offset).iterator
           fail("Expected an OffsetOutOfRangeException exception to be thrown")
         } catch {
-          case e: OffsetOutOfRangeException => 
+          case e: OffsetOutOfRangeException =>
         }
       }
     }
@@ -130,20 +123,16 @@ class LazyInitProducerTest extends JUnit
   def testMultiProduce() {
     // send some messages
     val topics = List("test1", "test2", "test3");
-    val messages = new mutable.HashMap[String, ByteBufferMessageSet]
+    val messages = new mutable.HashMap[String, Seq[Message]]
     val builder = new FetchRequestBuilder()
-    var produceList: List[ProducerRequest] = Nil
+    var produceList: List[ProducerData[String, Message]] = Nil
     for(topic <- topics) {
-      val set = new ByteBufferMessageSet(NoCompressionCodec,
-                                         new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
+      val set = List(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
       messages += topic -> set
-      produceList ::= new ProducerRequest(topic, 0, set)
+      produceList ::= new ProducerData[String, Message](topic, topic, set)
       builder.addFetch(topic, 0, 0, 10000)
     }
-    producer.multiSend(produceList.toArray)
-
-    for (messageSet <- messages.values)
-      messageSet.getBuffer.rewind
+    producer.send(produceList: _*)
 
     // wait a bit for produced message to be available
     Thread.sleep(200)
@@ -151,39 +140,33 @@ class LazyInitProducerTest extends JUnit
     val response = consumer.fetch(request)
     for(topic <- topics) {
       val fetched = response.messageSet(topic, 0)
-      TestUtils.checkEquals(messages(topic).iterator, fetched.iterator)
+      TestUtils.checkEquals(messages(topic).iterator, fetched.map(m => m.message).iterator)
     }
   }
 
   def testMultiProduceResend() {
     // send some messages
     val topics = List("test1", "test2", "test3");
-    val messages = new mutable.HashMap[String, ByteBufferMessageSet]
+    val messages = new mutable.HashMap[String, Seq[Message]]
     val builder = new FetchRequestBuilder()
-    var produceList: List[ProducerRequest] = Nil
+    var produceList: List[ProducerData[String, Message]] = Nil
     for(topic <- topics) {
-      val set = new ByteBufferMessageSet(NoCompressionCodec,
-                                         new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
+      val set = List(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
       messages += topic -> set
-      produceList ::= new ProducerRequest(topic, 0, set)
+      produceList ::= new ProducerData[String, Message](topic, topic, set)
       builder.addFetch(topic, 0, 0, 10000)
     }
-    producer.multiSend(produceList.toArray)
-
-    // resend the same multisend
-    producer.multiSend(produceList.toArray)
-
-    for (messageSet <- messages.values)
-      messageSet.getBuffer.rewind
+    producer.send(produceList: _*)
 
+    producer.send(produceList: _*)
     // wait a bit for produced message to be available
     Thread.sleep(750)
     val request = builder.build()
     val response = consumer.fetch(request)
     for(topic <- topics) {
       val topicMessages = response.messageSet(topic, 0)
-      TestUtils.checkEquals(TestUtils.stackedIterator(messages(topic).map(m => m.message).iterator,
-                                                      messages(topic).map(m => m.message).iterator),
+      TestUtils.checkEquals(TestUtils.stackedIterator(messages(topic).iterator,
+                                                      messages(topic).iterator),
                             topicMessages.iterator.map(_.message))
     }
   }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala Thu Mar  1 21:15:26 2012
@@ -24,10 +24,11 @@ import kafka.api.FetchRequestBuilder
 import kafka.common.InvalidMessageSizeException
 import kafka.consumer.{ZookeeperConsumerConnector, ConsumerConfig}
 import kafka.integration.{KafkaServerTestHarness, ProducerConsumerTestHarness}
-import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
+import kafka.message.Message
 import kafka.utils.{Utils, TestUtils}
 import org.scalatest.junit.JUnit3Suite
 import org.apache.log4j.{Logger, Level}
+import kafka.producer.ProducerData
 
 class LogCorruptionTest extends JUnit3Suite with ProducerConsumerTestHarness with KafkaServerTestHarness {
   val port = TestUtils.choosePort
@@ -47,8 +48,9 @@ class LogCorruptionTest extends JUnit3Su
     fetcherLogger.setLevel(Level.FATAL)
 
     // send some messages
-    val sent1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("hello".getBytes()))
-    producer.send(topic, sent1)
+    val producerData = new ProducerData[String, Message](topic, topic, List(new Message("hello".getBytes())))
+
+    producer.send(producerData)
     Thread.sleep(200)
 
     // corrupt the file on disk

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala Thu Mar  1 21:15:26 2012
@@ -17,25 +17,25 @@
 
 package kafka.integration
 
-import java.io.File
 import java.nio.ByteBuffer
-import java.util.Properties
 import junit.framework.Assert._
-import kafka.api.{OffsetDetail, FetchRequest, FetchRequestBuilder, ProducerRequest}
+import kafka.api.{OffsetDetail, FetchRequest, FetchRequestBuilder}
 import kafka.common.{FetchRequestFormatException, OffsetOutOfRangeException, InvalidPartitionException}
-import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, Message, ByteBufferMessageSet}
-import kafka.producer.{ProducerData, Producer, ProducerConfig}
-import kafka.serializer.StringDecoder
 import kafka.server.{KafkaRequestHandler, KafkaConfig}
-import kafka.utils.TestUtils
 import org.apache.log4j.{Level, Logger}
 import org.scalatest.junit.JUnit3Suite
+import java.util.Properties
+import kafka.producer.{ProducerData, Producer, ProducerConfig}
+import kafka.serializer.StringDecoder
+import kafka.message.Message
+import java.io.File
+import kafka.utils.{TestZKUtils, TestUtils}
 import scala.collection._
 
 /**
  * End to end tests of the primitive apis against a local server
  */
-class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with KafkaServerTestHarness {
+class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness {
   
   val port = TestUtils.choosePort
   val props = TestUtils.createBrokerConfig(0, port)
@@ -45,6 +45,20 @@ class PrimitiveApiTest extends JUnit3Sui
   val configs = List(config)
   val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
 
+//<<<<<<< .mine
+  override def setUp() {
+    super.setUp
+    // temporarily set request handler logger to a higher level
+    requestHandlerLogger.setLevel(Level.FATAL)
+  }
+
+  override def tearDown() {
+    // restore set request handler logger to a higher level
+    requestHandlerLogger.setLevel(Level.ERROR)
+
+    super.tearDown
+  }
+
   def testFetchRequestCanProperlySerialize() {
     val request = new FetchRequestBuilder()
       .correlationId(100)
@@ -83,7 +97,7 @@ class PrimitiveApiTest extends JUnit3Sui
     val topic = "test-topic"
     val props = new Properties()
     props.put("serializer.class", "kafka.serializer.StringEncoder")
-    props.put("broker.list", "0:localhost:" + port)
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
     val config = new ProducerConfig(props)
 
     val stringProducer1 = new Producer[String, String](config)
@@ -111,7 +125,7 @@ class PrimitiveApiTest extends JUnit3Sui
     val topic = "test-topic"
     val props = new Properties()
     props.put("serializer.class", "kafka.serializer.StringEncoder")
-    props.put("broker.list", "0:localhost:" + port)
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
     props.put("compression", "true")
     val config = new ProducerConfig(props)
 
@@ -133,14 +147,13 @@ class PrimitiveApiTest extends JUnit3Sui
     // send some messages
     val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
     {
-      val messages = new mutable.HashMap[String, ByteBufferMessageSet]
+      val messages = new mutable.HashMap[String, Seq[Message]]
       val builder = new FetchRequestBuilder()
       for( (topic, partition) <- topics) {
-        val set = new ByteBufferMessageSet(NoCompressionCodec,
-                                           new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
-        messages += topic -> set
-        producer.send(topic, set)
-        set.getBuffer.rewind
+        val messageList = List(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
+        val producerData = new ProducerData[String, Message](topic, topic, messageList)
+        messages += topic -> messageList
+        producer.send(producerData)
         builder.addFetch(topic, partition, 0, 10000)
       }
 
@@ -150,7 +163,7 @@ class PrimitiveApiTest extends JUnit3Sui
       val response = consumer.fetch(request)
       for( (topic, partition) <- topics) {
         val fetched = response.messageSet(topic, partition)
-        TestUtils.checkEquals(messages(topic).iterator, fetched.iterator)
+        TestUtils.checkEquals(messages(topic).iterator, fetched.map(messageAndOffset => messageAndOffset.message).iterator)
       }
     }
 
@@ -172,7 +185,7 @@ class PrimitiveApiTest extends JUnit3Sui
       } catch {
         case e: OffsetOutOfRangeException => "this is good"
       }
-    }    
+    }
 
     {
       // send some invalid partitions
@@ -199,14 +212,13 @@ class PrimitiveApiTest extends JUnit3Sui
     // send some messages
     val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
     {
-      val messages = new mutable.HashMap[String, ByteBufferMessageSet]
+      val messages = new mutable.HashMap[String, Seq[Message]]
       val builder = new FetchRequestBuilder()
       for( (topic, partition) <- topics) {
-        val set = new ByteBufferMessageSet(DefaultCompressionCodec,
-                                           new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
-        messages += topic -> set
-        producer.send(topic, set)
-        set.getBuffer.rewind
+        val messageList = List(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
+        val producerData = new ProducerData[String, Message](topic, topic, messageList)
+        messages += topic -> messageList
+        producer.send(producerData)
         builder.addFetch(topic, partition, 0, 10000)
       }
 
@@ -216,7 +228,7 @@ class PrimitiveApiTest extends JUnit3Sui
       val response = consumer.fetch(request)
       for( (topic, partition) <- topics) {
         val fetched = response.messageSet(topic, partition)
-        TestUtils.checkEquals(messages(topic).iterator, fetched.iterator)
+        TestUtils.checkEquals(messages(topic).iterator, fetched.map(messageAndOffset => messageAndOffset.message).iterator)
       }
     }
 
@@ -264,48 +276,42 @@ class PrimitiveApiTest extends JUnit3Sui
   def testMultiProduce() {
     // send some messages
     val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
-    val messages = new mutable.HashMap[String, ByteBufferMessageSet]
+    val messages = new mutable.HashMap[String, Seq[Message]]
     val builder = new FetchRequestBuilder()
-    var produceList: List[ProducerRequest] = Nil
+    var produceList: List[ProducerData[String, Message]] = Nil
     for( (topic, partition) <- topics) {
-      val set = new ByteBufferMessageSet(NoCompressionCodec,
-                                         new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
-      messages += topic -> set
-      produceList ::= new ProducerRequest(topic, 0, set)
+      val messageList = List(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
+      val producerData = new ProducerData[String, Message](topic, topic, messageList)
+      messages += topic -> messageList
+      producer.send(producerData)
       builder.addFetch(topic, partition, 0, 10000)
     }
-    producer.multiSend(produceList.toArray)
+    producer.send(produceList: _*)
 
-    for (messageSet <- messages.values)
-      messageSet.getBuffer.rewind
-      
     // wait a bit for produced message to be available
     Thread.sleep(200)
     val request = builder.build()
     val response = consumer.fetch(request)
     for( (topic, partition) <- topics) {
       val fetched = response.messageSet(topic, partition)
-      TestUtils.checkEquals(messages(topic).iterator, fetched.iterator)
+      TestUtils.checkEquals(messages(topic).iterator, fetched.map(messageAndOffset => messageAndOffset.message).iterator)
     }
   }
 
   def testMultiProduceWithCompression() {
     // send some messages
     val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
-    val messages = new mutable.HashMap[String, ByteBufferMessageSet]
+    val messages = new mutable.HashMap[String, Seq[Message]]
     val builder = new FetchRequestBuilder()
-    var produceList: List[ProducerRequest] = Nil
+    var produceList: List[ProducerData[String, Message]] = Nil
     for( (topic, partition) <- topics) {
-      val set = new ByteBufferMessageSet(DefaultCompressionCodec,
-                                         new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
-      messages += topic -> set
-      produceList ::= new ProducerRequest(topic, 0, set)
+      val messageList = List(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
+      val producerData = new ProducerData[String, Message](topic, topic, messageList)
+      messages += topic -> messageList
+      producer.send(producerData)
       builder.addFetch(topic, partition, 0, 10000)
     }
-    producer.multiSend(produceList.toArray)
-
-    for (messageSet <- messages.values)
-      messageSet.getBuffer.rewind
+    producer.send(produceList: _*)
 
     // wait a bit for produced message to be available
     Thread.sleep(200)
@@ -313,7 +319,7 @@ class PrimitiveApiTest extends JUnit3Sui
     val response = consumer.fetch(request)
     for( (topic, partition) <- topics) {
       val fetched = response.messageSet(topic, 0)
-      TestUtils.checkEquals(messages(topic).iterator, fetched.iterator)
+      TestUtils.checkEquals(messages(topic).iterator, fetched.map(messageAndOffset => messageAndOffset.message).iterator)
     }
   }
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala Thu Mar  1 21:15:26 2012
@@ -20,33 +20,35 @@ package kafka.integration
 import kafka.consumer.SimpleConsumer
 import org.scalatest.junit.JUnit3Suite
 import java.util.Properties
-import kafka.producer.{SyncProducerConfig, SyncProducer}
+import kafka.utils.TestZKUtils
+import kafka.producer.{ProducerConfig, Producer}
+import kafka.message.Message
 
-trait ProducerConsumerTestHarness extends JUnit3Suite {
+trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarness {
   
     val port: Int
     val host = "localhost"
-    var producer: SyncProducer = null
+    var producer: Producer[String, Message] = null
     var consumer: SimpleConsumer = null
 
     override def setUp() {
+      super.setUp
       val props = new Properties()
-      props.put("host", host)
-      props.put("port", port.toString)
+      props.put("partitioner.class", "kafka.utils.StaticPartitioner")
+      props.put("zk.connect", TestZKUtils.zookeeperConnect)
       props.put("buffer.size", "65536")
       props.put("connect.timeout.ms", "100000")
       props.put("reconnect.interval", "10000")
-      producer = new SyncProducer(new SyncProducerConfig(props))
+      producer = new Producer(new ProducerConfig(props))
       consumer = new SimpleConsumer(host,
                                    port,
                                    1000000,
                                    64*1024)
-      super.setUp
     }
 
    override def tearDown() {
-     super.tearDown
      producer.close()
      consumer.close()
+     super.tearDown
    }
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala Thu Mar  1 21:15:26 2012
@@ -24,10 +24,10 @@ import kafka.utils.{Utils, Logging}
 import kafka.utils.TestUtils
 import org.scalatest.junit.JUnit3Suite
 import scala.collection.JavaConversions._
-import kafka.javaapi.message.ByteBufferMessageSet
 import kafka.consumer.{ConsumerConfig, KafkaMessageStream}
 import org.apache.log4j.{Level, Logger}
 import kafka.message._
+import kafka.javaapi.producer.{ProducerData, Producer}
 
 class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
 
@@ -67,16 +67,17 @@ class ZookeeperConsumerConnectorTest ext
 
   def sendMessages(conf: KafkaConfig, messagesPerNode: Int, header: String, compressed: CompressionCodec): List[Message]= {
     var messages: List[Message] = Nil
-    val producer = new kafka.javaapi.producer.SyncProducer(TestUtils.createProducer("localhost", conf.port))
+    val producer: kafka.producer.Producer[Int, Message] = TestUtils.createProducer(zkConnect)
+    val javaProducer: Producer[Int, Message] = new kafka.javaapi.producer.Producer(producer)
     for (partition <- 0 until numParts) {
       val ms = 0.until(messagesPerNode).map(x =>
         new Message((header + conf.brokerId + "-" + partition + "-" + x).getBytes)).toArray
-      val mSet = new ByteBufferMessageSet(compressionCodec = compressed, messages = getMessageList(ms: _*))
       for (message <- ms)
         messages ::= message
-      producer.send(topic, partition, mSet)
+      import scala.collection.JavaConversions._
+      javaProducer.send(new ProducerData[Int, Message](topic, partition, asList(ms)))
     }
-    producer.close()
+    javaProducer.close
     messages
   }
 
@@ -106,12 +107,6 @@ class ZookeeperConsumerConnectorTest ext
     messages.sortWith((s,t) => s.checksum < t.checksum)
   }
 
-  private def getMessageList(messages: Message*): java.util.List[Message] = {
-    val messageList = new java.util.ArrayList[Message]()
-    messages.foreach(m => messageList.add(m))
-    messageList
-  }
-
   private def toJavaMap(scalaMap: Map[String, Int]): java.util.Map[String, java.lang.Integer] = {
     val javaMap = new java.util.HashMap[String, java.lang.Integer]()
     scalaMap.foreach(m => javaMap.put(m._1, m._2.asInstanceOf[java.lang.Integer]))

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Thu Mar  1 21:15:26 2012
@@ -25,6 +25,7 @@ import kafka.common.OffsetOutOfRangeExce
 import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.{TestZKUtils, Utils, MockTime, TestUtils}
 import org.scalatest.junit.JUnit3Suite
+import kafka.admin.CreateTopicCommand
 
 class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness {
 
@@ -34,6 +35,8 @@ class LogManagerTest extends JUnit3Suite
   var logManager: LogManager = null
   var config:KafkaConfig = null
   val zookeeperConnect = TestZKUtils.zookeeperConnect
+  val name = "kafka"
+  val veryLargeLogFlushInterval = 10000000L
 
   override def setUp() {
     super.setUp()
@@ -41,9 +44,13 @@ class LogManagerTest extends JUnit3Suite
     config = new KafkaConfig(props) {
                    override val logFileSize = 1024
                  }
-    logManager = new LogManager(config, null, time, -1, maxLogAge, false)
+    logManager = new LogManager(config, time, veryLargeLogFlushInterval, maxLogAge, false)
     logManager.startup
     logDir = logManager.logDir
+
+    // setup brokers in zookeeper as owners of partitions for this test
+    CreateTopicCommand.createTopic(zookeeper.client, name, 3, 1, "0,0,0")
+
   }
 
   override def tearDown() {
@@ -55,7 +62,6 @@ class LogManagerTest extends JUnit3Suite
   
   @Test
   def testCreateLog() {
-    val name = "kafka"
     val log = logManager.getOrCreateLog(name, 0)
     val logFile = new File(config.logDir, name + "-0")
     assertTrue(logFile.exists)
@@ -64,7 +70,6 @@ class LogManagerTest extends JUnit3Suite
 
   @Test
   def testGetLog() {
-    val name = "kafka"
     val log = logManager.getLog(name, 0)
     val logFile = new File(config.logDir, name + "-0")
     assertTrue(!logFile.exists)
@@ -72,7 +77,7 @@ class LogManagerTest extends JUnit3Suite
 
   @Test
   def testCleanupExpiredSegments() {
-    val log = logManager.getOrCreateLog("cleanup", 0)
+    val log = logManager.getOrCreateLog(name, 0)
     var offset = 0L
     for(i <- 0 until 1000) {
       var set = TestUtils.singleMessageSet("test".getBytes())
@@ -111,11 +116,11 @@ class LogManagerTest extends JUnit3Suite
       override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Int] // keep exactly 6 segments + 1 roll over
       override val logRetentionHours = retentionHours
     }
-    logManager = new LogManager(config, null, time, -1, retentionMs, false)
+    logManager = new LogManager(config, time, veryLargeLogFlushInterval, retentionMs, false)
     logManager.startup
 
     // create a log
-    val log = logManager.getOrCreateLog("cleanup", 0)
+    val log = logManager.getOrCreateLog(name, 0)
     var offset = 0L
 
     // add a bunch of messages that should be larger than the retentionSize
@@ -151,14 +156,14 @@ class LogManagerTest extends JUnit3Suite
     logManager.close
     Thread.sleep(100)
     config = new KafkaConfig(props) {
-                   override val logFileSize = 1024 *1024 *1024 
+                   override val logFileSize = 1024 *1024 *1024
                    override val flushSchedulerThreadRate = 50
                    override val flushInterval = Int.MaxValue
                    override val flushIntervalMap = Utils.getTopicFlushIntervals("timebasedflush:100")
                  }
-    logManager = new LogManager(config, null, time, -1, maxLogAge, false)
+    logManager = new LogManager(config, time, veryLargeLogFlushInterval, maxLogAge, false)
     logManager.startup
-    val log = logManager.getOrCreateLog("timebasedflush", 0)
+    val log = logManager.getOrCreateLog(name, 0)
     for(i <- 0 until 200) {
       var set = TestUtils.singleMessageSet("test".getBytes())
       log.append(set)
@@ -177,12 +182,12 @@ class LogManagerTest extends JUnit3Suite
                    override val logFileSize = 256
                    override val topicPartitionsMap = Utils.getTopicPartitions("testPartition:2")
                  }
-    
-    logManager = new LogManager(config, null, time, -1, maxLogAge, false)
+
+    logManager = new LogManager(config, time, veryLargeLogFlushInterval, maxLogAge, false)
     logManager.startup
-    
-    for(i <- 0 until 2) {
-      val log = logManager.getOrCreateLog("testPartition", i)
+
+    for(i <- 0 until 1) {
+      val log = logManager.getOrCreateLog(name, i)
       for(i <- 0 until 250) {
         var set = TestUtils.singleMessageSet("test".getBytes())
         log.append(set)
@@ -191,7 +196,7 @@ class LogManagerTest extends JUnit3Suite
 
     try
     {
-      val log = logManager.getOrCreateLog("testPartition", 2)
+      val log = logManager.getOrCreateLog(name, 2)
       assertTrue("Should not come here", log != null)
     } catch {
        case _ =>

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala Thu Mar  1 21:15:26 2012
@@ -29,6 +29,7 @@ import kafka.message.{NoCompressionCodec
 import org.apache.log4j._
 import kafka.zk.ZooKeeperTestHarness
 import org.scalatest.junit.JUnit3Suite
+import kafka.admin.CreateTopicCommand
 import kafka.api.{FetchRequestBuilder, OffsetRequest}
 
 object LogOffsetTest {
@@ -51,7 +52,7 @@ class LogOffsetTest extends JUnit3Suite 
     val config: Properties = createBrokerConfig(1, brokerPort)
     val logDirPath = config.getProperty("log.dir")
     logDir = new File(logDirPath)
-    
+
     server = TestUtils.createServer(new KafkaConfig(config))
     simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024)
   }
@@ -94,10 +95,12 @@ class LogOffsetTest extends JUnit3Suite 
   @Test
   def testGetOffsetsBeforeLatestTime() {
     val topicPartition = "kafka-" + 0
-    val topicPartitionPath = getLogDir.getAbsolutePath + "/" + topicPartition
     val topic = topicPartition.split("-").head
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
+    // setup brokers in zookeeper as owners of partitions for this test
+    CreateTopicCommand.createTopic(zookeeper.client, topic, 1, 1, "1")
+
     val logManager = server.getLogManager
     val log = logManager.getOrCreateLog(topic, part)
 
@@ -133,6 +136,9 @@ class LogOffsetTest extends JUnit3Suite 
     val topic = topicPartition.split("-").head
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
+    // setup brokers in zookeeper as owners of partitions for this test
+    CreateTopicCommand.createTopic(zookeeper.client, topic, 1, 1, "1")
+
     var offsetChanged = false
     for(i <- 1 to 14) {
       val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part,
@@ -147,11 +153,13 @@ class LogOffsetTest extends JUnit3Suite 
 
   @Test
   def testGetOffsetsBeforeNow() {
-    val topicPartition = "kafka-" + LogOffsetTest.random.nextInt(10)
-    val topicPartitionPath = getLogDir.getAbsolutePath + "/" + topicPartition
+    val topicPartition = "kafka-" + LogOffsetTest.random.nextInt(3)
     val topic = topicPartition.split("-").head
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
+    // setup brokers in zookeeper as owners of partitions for this test
+    CreateTopicCommand.createTopic(zookeeper.client, topic, 3, 1, "1,1,1")
+
     val logManager = server.getLogManager
     val log = logManager.getOrCreateLog(topic, part)
     val message = new Message(Integer.toString(42).getBytes())
@@ -172,11 +180,13 @@ class LogOffsetTest extends JUnit3Suite 
 
   @Test
   def testGetOffsetsBeforeEarliestTime() {
-    val topicPartition = "kafka-" + LogOffsetTest.random.nextInt(10)
-    val topicPartitionPath = getLogDir.getAbsolutePath + "/" + topicPartition
+    val topicPartition = "kafka-" + LogOffsetTest.random.nextInt(3)
     val topic = topicPartition.split("-").head
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
+    // setup brokers in zookeeper as owners of partitions for this test
+    CreateTopicCommand.createTopic(zookeeper.client, topic, 3, 1, "1,1,1")
+
     val logManager = server.getLogManager
     val log = logManager.getOrCreateLog(topic, part)
     val message = new Message(Integer.toString(42).getBytes())

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala Thu Mar  1 21:15:26 2012
@@ -26,18 +26,17 @@ import kafka.message.Message
 import kafka.producer.async.MissingConfigException
 import kafka.serializer.Encoder
 import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.{TestUtils, TestZKUtils, Utils, Logging}
 import kafka.zk.{EmbeddedZookeeper, ZooKeeperTestHarness}
 import org.apache.log4j.spi.LoggingEvent
 import org.apache.log4j.{PropertyConfigurator, Logger}
 import org.junit.{After, Before, Test}
 import org.scalatest.junit.JUnit3Suite
+import kafka.utils._
 
 class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
 
   var logDirZk: File = null
   var logDirBl: File = null
-  //  var topicLogDir: File = null
   var serverBl: KafkaServer = null
   var serverZk: KafkaServer = null
 
@@ -63,7 +62,7 @@ class KafkaLog4jAppenderTest extends JUn
     logDirZk = new File(logDirZkPath)
     serverZk = TestUtils.createServer(new KafkaConfig(propsZk));
 
-    val propsBl: Properties = createBrokerConfig(brokerBl, portBl)
+    val propsBl: Properties = TestUtils.createBrokerConfig(brokerBl, portBl)
     val logDirBlPath = propsBl.getProperty("log.dir")
     logDirBl = new File(logDirBlPath)
     serverBl = TestUtils.createServer(new KafkaConfig(propsBl))
@@ -85,8 +84,6 @@ class KafkaLog4jAppenderTest extends JUn
     Utils.rm(logDirBl)
 
     Thread.sleep(500)
-//    zkServer.shutdown
-//    Thread.sleep(500)
     super.tearDown()
   }
 
@@ -132,7 +129,7 @@ class KafkaLog4jAppenderTest extends JUn
     props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
     props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
     props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder")
-    props.put("log4j.appender.KAFKA.BrokerList", "0:localhost:"+portBl.toString)
+    props.put("log4j.appender.KAFKA.ZkConnect", TestZKUtils.zookeeperConnect)
     props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
 
     // topic missing
@@ -148,7 +145,7 @@ class KafkaLog4jAppenderTest extends JUn
     props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
     props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
     props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
-    props.put("log4j.appender.KAFKA.BrokerList", "0:localhost:"+portBl.toString)
+    props.put("log4j.appender.KAFKA.ZkConnect", TestZKUtils.zookeeperConnect)
     props.put("log4j.appender.KAFKA.Topic", "test-topic")
     props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
 
@@ -161,27 +158,6 @@ class KafkaLog4jAppenderTest extends JUn
   }
 
   @Test
-  def testBrokerListLog4jAppends() {
-    PropertyConfigurator.configure(getLog4jConfigWithBrokerList)
-
-    for(i <- 1 to 5)
-      info("test")
-
-    Thread.sleep(2500)
-
-    var offset = 0L
-    val response = simpleConsumerBl.fetch(new FetchRequestBuilder().addFetch("test-topic", 0, offset, 1024*1024).build())
-    val fetchedMessage = response.messageSet("test-topic", 0)
-    var count = 0
-    for(message <- fetchedMessage) {
-      count = count + 1
-      offset += message.offset
-    }
-
-    assertEquals(5, count)
-  }
-
-  @Test
   def testZkConnectLog4jAppends() {
     PropertyConfigurator.configure(getLog4jConfigWithZkConnect)
 
@@ -208,18 +184,6 @@ class KafkaLog4jAppenderTest extends JUn
     assertEquals(5, count)
   }
 
-  private def getLog4jConfigWithBrokerList: Properties = {
-    var props = new Properties()
-    props.put("log4j.rootLogger", "INFO")
-    props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
-    props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
-    props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
-    props.put("log4j.appender.KAFKA.BrokerList", "0:localhost:"+portBl.toString)
-    props.put("log4j.appender.KAFKA.Topic", "test-topic")
-    props.put("log4j.logger.kafka.log4j", "INFO,KAFKA")
-    props
-  }
-
   private def getLog4jConfigWithZkConnect: Properties = {
     var props = new Properties()
     props.put("log4j.rootLogger", "INFO")
@@ -232,21 +196,6 @@ class KafkaLog4jAppenderTest extends JUn
     props
   }
 
-  private def createBrokerConfig(nodeId: Int, port: Int): Properties = {
-    val props = new Properties
-    props.put("brokerid", nodeId.toString)
-    props.put("port", port.toString)
-    props.put("log.dir", getLogDir.getAbsolutePath)
-    props.put("log.flush.interval", "1")
-    props.put("enable.zookeeper", "false")
-    props.put("num.partitions", "1")
-    props.put("log.retention.hours", "10")
-    props.put("log.cleanup.interval.mins", "5")
-    props.put("log.file.size", "1000")
-    props.put("zk.connect", zkConnect.toString)
-    props
-  }
-
   private def getLogDir(): File = {
     val dir = TestUtils.tempDir()
     dir

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1295861&r1=1295860&r2=1295861&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala Thu Mar  1 21:15:26 2012
@@ -18,23 +18,41 @@
 package kafka.producer
 
 import org.easymock.EasyMock
-import kafka.api.ProducerRequest
 import org.junit.Test
-import org.scalatest.junit.JUnitSuite
 import kafka.producer.async._
 import java.util.concurrent.LinkedBlockingQueue
 import junit.framework.Assert._
-import collection.SortedSet
-import kafka.cluster.{Broker, Partition}
-import collection.mutable.{HashMap, ListBuffer}
+import kafka.cluster.Broker
+import collection.mutable.ListBuffer
 import collection.Map
 import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
 import kafka.serializer.{StringEncoder, StringDecoder, Encoder}
 import java.util.{LinkedList, Properties}
-import kafka.utils.{TestZKUtils, TestUtils}
 import kafka.common.{InvalidConfigException, NoBrokersForPartitionException, InvalidPartitionException}
+import kafka.api.{PartitionMetadata, TopicMetadata, TopicMetadataRequest, ProducerRequest}
+import kafka.utils.{NegativePartitioner, TestZKUtils, TestUtils}
+import kafka.zk.ZooKeeperTestHarness
+import org.scalatest.junit.JUnit3Suite
+import kafka.utils.TestUtils._
+import kafka.server.KafkaConfig
+import org.I0Itec.zkclient.ZkClient
+
+class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
+  val props = createBrokerConfigs(1)
+  val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1})
+  var zkClient: ZkClient = null
+  var brokers: Seq[Broker] = null
+
+  override def setUp() {
+    super.setUp()
+    zkClient = zookeeper.client
+    // create brokers in zookeeper
+    brokers = TestUtils.createBrokersInZk(zkClient, configs.map(config => config.brokerId))
+  }
 
-class AsyncProducerTest extends JUnitSuite {
+  override def tearDown() {
+    super.tearDown()
+  }
 
   @Test
   def testProducerQueueSize() {
@@ -50,7 +68,7 @@ class AsyncProducerTest extends JUnitSui
 
     val props = new Properties()
     props.put("serializer.class", "kafka.serializer.StringEncoder")
-    props.put("broker.list", "0:localhost:9092")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
     props.put("producer.type", "async")
     props.put("queue.size", "10")
     props.put("batch.size", "1")
@@ -72,13 +90,13 @@ class AsyncProducerTest extends JUnitSui
   def testProduceAfterClosed() {
     val props = new Properties()
     props.put("serializer.class", "kafka.serializer.StringEncoder")
-    props.put("broker.list", "0:localhost:9092")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
     props.put("producer.type", "async")
     props.put("batch.size", "1")
 
     val config = new ProducerConfig(props)
     val produceData = getProduceData(10)
-    val producer = new Producer[String, String](config)
+    val producer = new Producer[String, String](config, zkClient)
     producer.close
 
     try {
@@ -157,18 +175,35 @@ class AsyncProducerTest extends JUnitSui
     producerDataList.append(new ProducerData[Int,Message]("topic2", 4, new Message("msg5".getBytes)))
 
     val props = new Properties()
-    props.put("broker.list", "0:localhost:9092,1:localhost:9092")
+    props.put("zk.connect", zkConnect)
+    val broker1 = new Broker(0, "localhost", "localhost", 9092)
+    val broker2 = new Broker(1, "localhost", "localhost", 9093)
+    // form expected partitions metadata
+    val partition1Metadata = new PartitionMetadata(0, Some(broker1), List(broker1, broker2))
+    val partition2Metadata = new PartitionMetadata(1, Some(broker2), List(broker1, broker2))
+    val topic1Metadata = new TopicMetadata("topic1", List(partition1Metadata, partition2Metadata))
+    val topic2Metadata = new TopicMetadata("topic2", List(partition1Metadata, partition2Metadata))
 
     val intPartitioner = new Partitioner[Int] {
       def partition(key: Int, numPartitions: Int): Int = key % numPartitions
     }
     val config = new ProducerConfig(props)
+
+    val syncProducer = getSyncProducer(List("topic1", "topic2"), List(topic1Metadata, topic2Metadata))
+
+    val producerPool = EasyMock.createMock(classOf[ProducerPool])
+    producerPool.getZkClient
+    EasyMock.expectLastCall().andReturn(zkClient)
+    producerPool.addProducers(config)
+    EasyMock.expectLastCall()
+    producerPool.getAnyProducer
+    EasyMock.expectLastCall().andReturn(syncProducer).times(2)
+    EasyMock.replay(producerPool)
     val handler = new DefaultEventHandler[Int,String](config,
                                                       partitioner = intPartitioner,
                                                       encoder = null.asInstanceOf[Encoder[String]],
-                                                      producerPool = null,
-                                                      populateProducerPool = false,
-                                                      brokerPartitionInfo = null)
+                                                      producerPool)
+
 
     val topic1Broker1Data = new ListBuffer[ProducerData[Int,Message]]
     topic1Broker1Data.appendAll(List(new ProducerData[Int,Message]("topic1", 0, new Message("msg1".getBytes)),
@@ -181,29 +216,34 @@ class AsyncProducerTest extends JUnitSui
     topic2Broker2Data.appendAll(List(new ProducerData[Int,Message]("topic2", 1, new Message("msg2".getBytes))))
     val expectedResult = Map(
         0 -> Map(
-              ("topic1", -1) -> topic1Broker1Data,
-              ("topic2", -1) -> topic2Broker1Data),
+              ("topic1", 0) -> topic1Broker1Data,
+              ("topic2", 0) -> topic2Broker1Data),
         1 -> Map(
-              ("topic1", -1) -> topic1Broker2Data,
-              ("topic2", -1) -> topic2Broker2Data)
+              ("topic1", 1) -> topic1Broker2Data,
+              ("topic2", 1) -> topic2Broker2Data)
       )
 
     val actualResult = handler.partitionAndCollate(producerDataList)
     assertEquals(expectedResult, actualResult)
+    EasyMock.verify(syncProducer)
+    EasyMock.verify(producerPool)
   }
 
   @Test
   def testSerializeEvents() {
     val produceData = TestUtils.getMsgStrings(5).map(m => new ProducerData[String,String]("topic1",m))
     val props = new Properties()
-    props.put("broker.list", "0:localhost:9092,1:localhost:9092")
+    props.put("zk.connect", zkConnect)
     val config = new ProducerConfig(props)
+    // form expected partitions metadata
+    val topic1Metadata = getTopicMetadata("topic1", 0, "localhost", 9092)
+
+    val syncProducer = getSyncProducer(List("topic1"), List(topic1Metadata))
+    val producerPool = getMockProducerPool(config, syncProducer)
     val handler = new DefaultEventHandler[String,String](config,
                                                          partitioner = null.asInstanceOf[Partitioner[String]],
                                                          encoder = new StringEncoder,
-                                                         producerPool = null,
-                                                         populateProducerPool = false,
-                                                         brokerPartitionInfo = null)
+                                                         producerPool)
 
     val serializedData = handler.serialize(produceData)
     val decoder = new StringDecoder
@@ -216,14 +256,20 @@ class AsyncProducerTest extends JUnitSui
     val producerDataList = new ListBuffer[ProducerData[String,Message]]
     producerDataList.append(new ProducerData[String,Message]("topic1", "key1", new Message("msg1".getBytes)))
     val props = new Properties()
-    props.put("broker.list", "0:localhost:9092,1:localhost:9092")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
     val config = new ProducerConfig(props)
+
+    // form expected partitions metadata
+    val topic1Metadata = getTopicMetadata("topic1", 0, "localhost", 9092)
+
+    val syncProducer = getSyncProducer(List("topic1"), List(topic1Metadata))
+
+    val producerPool = getMockProducerPool(config, syncProducer)
+
     val handler = new DefaultEventHandler[String,String](config,
                                                          partitioner = new NegativePartitioner,
                                                          encoder = null.asInstanceOf[Encoder[String]],
-                                                         producerPool = null,
-                                                         populateProducerPool = false,
-                                                         brokerPartitionInfo = null)
+                                                         producerPool)
     try {
       handler.partitionAndCollate(producerDataList)
       fail("Should fail with InvalidPartitionException")
@@ -231,34 +277,29 @@ class AsyncProducerTest extends JUnitSui
     catch {
       case e: InvalidPartitionException => // expected, do nothing
     }
+    EasyMock.verify(syncProducer)
+    EasyMock.verify(producerPool)
   }
 
-  private def getMockBrokerPartitionInfo(): BrokerPartitionInfo ={
-    new BrokerPartitionInfo {
-      def getBrokerPartitionInfo(topic: String = null): SortedSet[Partition] = SortedSet.empty[Partition]
-
-      def getBrokerInfo(brokerId: Int): Option[Broker] = None
+  @Test
+  def testNoBroker() {
+    val props = new Properties()
+    props.put("zk.connect", zkConnect)
 
-      def getAllBrokerInfo: Map[Int, Broker] = new HashMap[Int, Broker]
+    val config = new ProducerConfig(props)
+    // create topic metadata with 0 partitions
+    val topic1Metadata = new TopicMetadata("topic1", Seq.empty)
 
-      def updateInfo = {}
+    val syncProducer = getSyncProducer(List("topic1"), List(topic1Metadata))
 
-      def close = {}
-    }
-  }
+    val producerPool = getMockProducerPool(config, syncProducer)
 
-  @Test
-  def testNoBroker() {
     val producerDataList = new ListBuffer[ProducerData[String,String]]
     producerDataList.append(new ProducerData[String,String]("topic1", "msg1"))
-    val props = new Properties()
-    val config = new ProducerConfig(props)
     val handler = new DefaultEventHandler[String,String](config,
                                                          partitioner = null.asInstanceOf[Partitioner[String]],
                                                          encoder = new StringEncoder,
-                                                         producerPool = null,
-                                                         populateProducerPool = false,
-                                                         brokerPartitionInfo = getMockBrokerPartitionInfo)
+                                                         producerPool)
     try {
       handler.handle(producerDataList)
       fail("Should fail with NoBrokersForPartitionException")
@@ -266,12 +307,14 @@ class AsyncProducerTest extends JUnitSui
     catch {
       case e: NoBrokersForPartitionException => // expected, do nothing
     }
+    EasyMock.verify(syncProducer)
+    EasyMock.verify(producerPool)
   }
 
   @Test
   def testIncompatibleEncoder() {
     val props = new Properties()
-    props.put("broker.list", "0:localhost:9092,1:localhost:9092")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
     val config = new ProducerConfig(props)
 
     val producer=new Producer[String, String](config)
@@ -286,14 +329,28 @@ class AsyncProducerTest extends JUnitSui
   @Test
   def testRandomPartitioner() {
     val props = new Properties()
-    props.put("broker.list", "0:localhost:9092,1:localhost:9092")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
     val config = new ProducerConfig(props)
+
+    // create topic metadata with 0 partitions
+    val topic1Metadata = getTopicMetadata("topic1", 0, "localhost", 9092)
+    val topic2Metadata = getTopicMetadata("topic2", 0, "localhost", 9092)
+
+    val syncProducer = getSyncProducer(List("topic1", "topic2"), List(topic1Metadata, topic2Metadata))
+
+    val producerPool = EasyMock.createMock(classOf[ProducerPool])
+    producerPool.getZkClient
+    EasyMock.expectLastCall().andReturn(zkClient)
+    producerPool.addProducers(config)
+    EasyMock.expectLastCall()
+    producerPool.getAnyProducer
+    EasyMock.expectLastCall().andReturn(syncProducer).times(2)
+    EasyMock.replay(producerPool)
+
     val handler = new DefaultEventHandler[String,String](config,
                                                          partitioner = null.asInstanceOf[Partitioner[String]],
                                                          encoder = null.asInstanceOf[Encoder[String]],
-                                                         producerPool = null,
-                                                         populateProducerPool = false,
-                                                         brokerPartitionInfo = null)
+                                                         producerPool)
     val producerDataList = new ListBuffer[ProducerData[String,Message]]
     producerDataList.append(new ProducerData[String,Message]("topic1", new Message("msg1".getBytes)))
     producerDataList.append(new ProducerData[String,Message]("topic2", new Message("msg2".getBytes)))
@@ -302,41 +359,51 @@ class AsyncProducerTest extends JUnitSui
     val partitionedData = handler.partitionAndCollate(producerDataList)
     for ((brokerId, dataPerBroker) <- partitionedData) {
       for ( ((topic, partitionId), dataPerTopic) <- dataPerBroker)
-        assertTrue(partitionId == ProducerRequest.RandomPartition)
+        assertTrue(partitionId == 0)
     }
+    EasyMock.verify(producerPool)
   }
 
   @Test
   def testBrokerListAndAsync() {
+    val props = new Properties()
+    props.put("serializer.class", "kafka.serializer.StringEncoder")
+    props.put("producer.type", "async")
+    props.put("batch.size", "5")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+
+    val config = new ProducerConfig(props)
+
     val topic = "topic1"
+    val topic1Metadata = getTopicMetadata(topic, 0, "localhost", 9092)
+
     val msgs = TestUtils.getMsgStrings(10)
     val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
-    mockSyncProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic, ProducerRequest.RandomPartition,
-      messagesToSet(msgs.take(5))))))
+    mockSyncProducer.send(new TopicMetadataRequest(List(topic)))
+    EasyMock.expectLastCall().andReturn(List(topic1Metadata))
+    mockSyncProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic, 0, messagesToSet(msgs.take(5))))))
     EasyMock.expectLastCall
-    mockSyncProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic, ProducerRequest.RandomPartition,
-      messagesToSet(msgs.takeRight(5))))))
-    EasyMock.expectLastCall
-    mockSyncProducer.close
+    mockSyncProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic, 0, messagesToSet(msgs.takeRight(5))))))
     EasyMock.expectLastCall
     EasyMock.replay(mockSyncProducer)
 
-    val props = new Properties()
-    props.put("serializer.class", "kafka.serializer.StringEncoder")
-    props.put("producer.type", "async")
-    props.put("batch.size", "5")
-    props.put("broker.list", "0:localhost:9092")
-
-    val config = new ProducerConfig(props)
-    val producerPool = new ProducerPool(config)
-    producerPool.addProducer(0, mockSyncProducer)
+    val producerPool = EasyMock.createMock(classOf[ProducerPool])
+    producerPool.getZkClient
+    EasyMock.expectLastCall().andReturn(zkClient)
+    producerPool.addProducers(config)
+    EasyMock.expectLastCall()
+    producerPool.getAnyProducer
+    EasyMock.expectLastCall().andReturn(mockSyncProducer)
+    producerPool.getProducer(0)
+    EasyMock.expectLastCall().andReturn(mockSyncProducer).times(2)
+    producerPool.close()
+    EasyMock.expectLastCall()
+    EasyMock.replay(producerPool)
 
     val handler = new DefaultEventHandler[String,String](config,
                                                       partitioner = null.asInstanceOf[Partitioner[String]],
                                                       encoder = new StringEncoder,
-                                                      producerPool = producerPool,
-                                                      populateProducerPool = false,
-                                                      brokerPartitionInfo = null)
+                                                      producerPool = producerPool)
 
     val producer = new Producer[String, String](config, handler)
     try {
@@ -349,6 +416,7 @@ class AsyncProducerTest extends JUnitSui
     }
 
     EasyMock.verify(mockSyncProducer)
+    EasyMock.verify(producerPool)
   }
 
   @Test
@@ -380,7 +448,7 @@ class AsyncProducerTest extends JUnitSui
   def testInvalidConfiguration() {
     val props = new Properties()
     props.put("serializer.class", "kafka.serializer.StringEncoder")
-    props.put("broker.list", "0:localhost:9092")
+    props.put("broker.list", TestZKUtils.zookeeperConnect)
     props.put("zk.connect", TestZKUtils.zookeeperConnect)
     props.put("producer.type", "async")
 
@@ -398,6 +466,34 @@ class AsyncProducerTest extends JUnitSui
     new ByteBufferMessageSet(NoCompressionCodec, messages.map(m => encoder.toMessage(m)): _*)
   }
 
+  private def getSyncProducer(topic: Seq[String], topicMetadata: Seq[TopicMetadata]): SyncProducer = {
+    val syncProducer = EasyMock.createMock(classOf[SyncProducer])
+    topic.zip(topicMetadata).foreach { topicAndMetadata =>
+      syncProducer.send(new TopicMetadataRequest(List(topicAndMetadata._1)))
+      EasyMock.expectLastCall().andReturn(List(topicAndMetadata._2))
+    }
+    EasyMock.replay(syncProducer)
+    syncProducer
+  }
+
+  private def getMockProducerPool(config: ProducerConfig, syncProducer: SyncProducer): ProducerPool = {
+    val producerPool = EasyMock.createMock(classOf[ProducerPool])
+    producerPool.getZkClient
+    EasyMock.expectLastCall().andReturn(zkClient)
+    producerPool.addProducers(config)
+    EasyMock.expectLastCall()
+    producerPool.getAnyProducer
+    EasyMock.expectLastCall().andReturn(syncProducer)
+    EasyMock.replay(producerPool)
+    producerPool
+  }
+
+  private def getTopicMetadata(topic: String, brokerId: Int, brokerHost: String, brokerPort: Int): TopicMetadata = {
+    val broker1 = new Broker(brokerId, brokerHost, brokerHost, brokerPort)
+    val partition1Metadata = new PartitionMetadata(brokerId, Some(broker1), List(broker1))
+    new TopicMetadata(topic, List(partition1Metadata))
+  }
+
   class MockProducer(override val config: SyncProducerConfig) extends SyncProducer(config) {
     override def send(topic: String, messages: ByteBufferMessageSet): Unit = {
       Thread.sleep(1000)



Mime
View raw message