kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-1416; Unify sendMessages in TestUtils; reviewed by Guozhang Wang
Date Tue, 14 Apr 2015 21:42:56 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 157fba840 -> 9e5d481c7


KAFKA-1416; Unify sendMessages in TestUtils; reviewed by Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9e5d481c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9e5d481c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9e5d481c

Branch: refs/heads/trunk
Commit: 9e5d481c7ccb5ef82196b7093f3b916194bdd90d
Parents: 157fba8
Author: Flutra Osmani <flutra@gmail.com>
Authored: Tue Apr 14 14:42:53 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Apr 14 14:42:53 2015 -0700

----------------------------------------------------------------------
 .../ZookeeperConsumerConnectorTest.scala        |  58 ++++-----
 .../unit/kafka/integration/FetcherTest.scala    |  19 +--
 .../integration/UncleanLeaderElectionTest.scala |  45 +++----
 .../ZookeeperConsumerConnectorTest.scala        |  54 +++-----
 .../scala/unit/kafka/metrics/MetricsTest.scala  |   4 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala | 122 +++++++++++--------
 6 files changed, 139 insertions(+), 163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9e5d481c/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index f3e76db..7f9fca3 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -79,7 +79,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     // also the iterator should support re-entrant, so loop it twice
     for (i <- 0 until 2) {
       try {
-        getMessages(nMessages*2, topicMessageStreams0)
+        getMessages(topicMessageStreams0, nMessages * 2)
         fail("should get an exception")
       } catch {
         case e: ConsumerTimeoutException => // this is ok
@@ -90,8 +90,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     zkConsumerConnector0.shutdown
 
     // send some messages to each broker
-    val sentMessages1 = sendMessagesToPartition(servers, topic, 0, nMessages) ++
-                        sendMessagesToPartition(servers, topic, 1, nMessages)
+    val sentMessages1 = sendMessages(servers, topic, nMessages, 0) ++
+      sendMessages(servers, topic, nMessages, 1)
 
     // wait to make sure the topic and partition have a leader for the successful case
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
@@ -105,7 +105,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
     val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic ->
1), new StringDecoder(), new StringDecoder())
 
-    val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
+    val receivedMessages1 = getMessages(topicMessageStreams1, nMessages * 2)
     assertEquals(sentMessages1.sorted, receivedMessages1.sorted)
 
     // also check partition ownership
@@ -124,13 +124,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
     val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic ->
1), new StringDecoder(), new StringDecoder())
     // send some messages to each broker
-    val sentMessages2 = sendMessagesToPartition(servers, topic, 0, nMessages) ++
-                        sendMessagesToPartition(servers, topic, 1, nMessages)
+    val sentMessages2 = sendMessages(servers, topic, nMessages, 0) ++
+                         sendMessages(servers, topic, nMessages, 1)
 
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
 
-    val receivedMessages2 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages,
topicMessageStreams2)
+    val receivedMessages2 = getMessages(topicMessageStreams1, nMessages) ++ getMessages(topicMessageStreams2,
nMessages)
     assertEquals(sentMessages2.sorted, receivedMessages2.sorted)
 
     // also check partition ownership
@@ -145,13 +145,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
     val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String,
Int]())
     // send some messages to each broker
-    val sentMessages3 = sendMessagesToPartition(servers, topic, 0, nMessages) ++
-                        sendMessagesToPartition(servers, topic, 1, nMessages)
+    val sentMessages3 = sendMessages(servers, topic, nMessages, 0) ++
+                        sendMessages(servers, topic, nMessages, 1)
 
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
 
-    val receivedMessages3 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages,
topicMessageStreams2)
+    val receivedMessages3 = getMessages(topicMessageStreams1, nMessages) ++ getMessages(topicMessageStreams2,
nMessages)
     assertEquals(sentMessages3.sorted, receivedMessages3.sorted)
 
     // also check partition ownership
@@ -179,8 +179,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     requestHandlerLogger.setLevel(Level.FATAL)
 
     // send some messages to each broker
-    val sentMessages1 = sendMessagesToPartition(servers, topic, 0, nMessages, GZIPCompressionCodec)
++
-                        sendMessagesToPartition(servers, topic, 1, nMessages, GZIPCompressionCodec)
+    val sentMessages1 = sendMessages(servers, topic, nMessages, 0, GZIPCompressionCodec)
++
+                        sendMessages(servers, topic, nMessages, 1, GZIPCompressionCodec)
 
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@@ -193,7 +193,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
       TestUtils.createConsumerProperties(zkConnect, group, consumer1))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
     val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic ->
1), new StringDecoder(), new StringDecoder())
-    val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
+    val receivedMessages1 = getMessages(topicMessageStreams1, nMessages * 2)
     assertEquals(sentMessages1.sorted, receivedMessages1.sorted)
 
     // also check partition ownership
@@ -212,13 +212,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
     val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic ->
1), new StringDecoder(), new StringDecoder())
     // send some messages to each broker
-    val sentMessages2 = sendMessagesToPartition(servers, topic, 0, nMessages, GZIPCompressionCodec)
++
-                        sendMessagesToPartition(servers, topic, 1, nMessages, GZIPCompressionCodec)
+    val sentMessages2 = sendMessages(servers, topic, nMessages, 0, GZIPCompressionCodec)
++
+                        sendMessages(servers, topic, nMessages, 1, GZIPCompressionCodec)
 
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
 
-    val receivedMessages2 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages,
topicMessageStreams2)
+    val receivedMessages2 = getMessages(topicMessageStreams1, nMessages) ++ getMessages(topicMessageStreams2,
nMessages)
     assertEquals(sentMessages2.sorted, receivedMessages2.sorted)
 
     // also check partition ownership
@@ -233,13 +233,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
     val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String,
Int](), new StringDecoder(), new StringDecoder())
     // send some messages to each broker
-    val sentMessages3 = sendMessagesToPartition(servers, topic, 0, nMessages, GZIPCompressionCodec)
++
-                        sendMessagesToPartition(servers, topic, 1, nMessages, GZIPCompressionCodec)
+    val sentMessages3 = sendMessages(servers, topic, nMessages, 0, GZIPCompressionCodec)
++
+                        sendMessages(servers, topic, nMessages, 1, GZIPCompressionCodec)
 
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
 
-    val receivedMessages3 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages,
topicMessageStreams2)
+    val receivedMessages3 = getMessages(topicMessageStreams1, nMessages) ++ getMessages(topicMessageStreams2,
nMessages)
     assertEquals(sentMessages3.sorted, receivedMessages3.sorted)
 
     // also check partition ownership
@@ -255,8 +255,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
 
   def testCompressionSetConsumption() {
     // send some messages to each broker
-    val sentMessages = sendMessagesToPartition(servers, topic, 0, 200, DefaultCompressionCodec)
++
-                       sendMessagesToPartition(servers, topic, 1, 200, DefaultCompressionCodec)
+    val sentMessages = sendMessages(servers, topic, 200, 0, DefaultCompressionCodec) ++
+                       sendMessages(servers, topic, 200, 1, DefaultCompressionCodec)
 
     TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
     TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1)
@@ -264,7 +264,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect,
group, consumer0))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
     val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic ->
1), new StringDecoder(), new StringDecoder())
-    val receivedMessages = getMessages(400, topicMessageStreams1)
+    val receivedMessages = getMessages(topicMessageStreams1, 400)
     assertEquals(sentMessages.sorted, receivedMessages.sorted)
 
     // also check partition ownership
@@ -281,8 +281,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     requestHandlerLogger.setLevel(Level.FATAL)
 
     // send some messages to each broker
-    val sentMessages = sendMessagesToPartition(servers, topic, 0, nMessages, NoCompressionCodec)
++
-                       sendMessagesToPartition(servers, topic, 1, nMessages, NoCompressionCodec)
+    val sentMessages = sendMessages(servers, topic, nMessages, 0, NoCompressionCodec) ++
+                       sendMessages(servers, topic, nMessages, 1, NoCompressionCodec)
 
     TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
     TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1)
@@ -322,7 +322,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
 
     // send some messages to each broker
-    val sentMessages1 = sendMessages(servers, topic, "producer1", nMessages, "batch1", NoCompressionCodec,
1)
+    val sentMessages1 = sendMessages(servers, topic, nMessages)
 
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect,
group, consumer1))
@@ -340,7 +340,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     val expected_1 = List( ("0", "group1_consumer1-0"))
     assertEquals(expected_1, actual_1)
 
-    val receivedMessages1 = getMessages(nMessages, topicMessageStreams1)
+    val receivedMessages1 = getMessages(topicMessageStreams1, nMessages)
     assertEquals(sentMessages1, receivedMessages1)
     zkConsumerConnector1.shutdown()
     zkClient.close()
@@ -348,8 +348,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
 
   def testConsumerRebalanceListener() {
     // Send messages to create topic
-    sendMessagesToPartition(servers, topic, 0, nMessages)
-    sendMessagesToPartition(servers, topic, 1, nMessages)
+    sendMessages(servers, topic, nMessages, 0)
+    sendMessages(servers, topic, nMessages, 1)
 
     val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect,
group, consumer1))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
@@ -385,7 +385,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic ->
1), new StringDecoder(), new StringDecoder())
 
     // Consume messages from consumer 1 to make sure it has finished rebalance
-    getMessages(nMessages, topicMessageStreams1)
+    getMessages(topicMessageStreams1, nMessages)
 
     val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir)
     val expected_2 = List(("0", "group1_consumer1-0"),

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e5d481c/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 0dc837a..facebd8 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -66,32 +66,17 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
 
   def testFetcher() {
     val perNode = 2
-    var count = sendMessages(perNode)
+    var count = TestUtils.sendMessages(servers, topic, perNode).size
 
     fetch(count)
     assertQueueEmpty()
-    count = sendMessages(perNode)
+    count = TestUtils.sendMessages(servers, topic, perNode).size
     fetch(count)
     assertQueueEmpty()
   }
 
   def assertQueueEmpty(): Unit = assertEquals(0, queue.size)
 
-  def sendMessages(messagesPerNode: Int): Int = {
-    var count = 0
-    for(conf <- configs) {
-      val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(
-        TestUtils.getBrokerListStrFromServers(servers),
-        keyEncoder = classOf[StringEncoder].getName)
-      val ms = 0.until(messagesPerNode).map(x => (conf.brokerId * 5 + x).toString.getBytes).toArray
-      messages += conf.brokerId -> ms
-      producer.send(ms.map(m => new KeyedMessage[String, Array[Byte]](topic, topic, m)):_*)
-      producer.close()
-      count += ms.size
-    }
-    count
-  }
-
   def fetch(expected: Int) {
     var count = 0
     while(true) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e5d481c/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index a130089..5b7b529 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -29,7 +29,7 @@ import kafka.admin.AdminUtils
 import kafka.common.FailedToSendMessageException
 import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException}
 import kafka.producer.{KeyedMessage, Producer}
-import kafka.serializer.StringEncoder
+import kafka.serializer.StringDecoder
 import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.utils.CoreUtils
 import kafka.utils.TestUtils._
@@ -175,14 +175,14 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness
{
     val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1
     debug("Follower for " + topic  + " is: %s".format(followerId))
 
-    produceMessage(topic, "first")
+    sendMessage(servers, topic, "first")
     waitUntilMetadataIsPropagated(servers, topic, partitionId)
     assertEquals(List("first"), consumeAllMessages(topic))
 
     // shutdown follower server
     servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server))
 
-    produceMessage(topic, "second")
+    sendMessage(servers, topic, "second")
     assertEquals(List("first", "second"), consumeAllMessages(topic))
 
     // shutdown leader and then restart follower
@@ -192,7 +192,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness
{
     // wait until new leader is (uncleanly) elected
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(followerId))
 
-    produceMessage(topic, "third")
+    sendMessage(servers, topic, "third")
 
     // second message was lost due to unclean election
     assertEquals(List("first", "third"), consumeAllMessages(topic))
@@ -210,14 +210,14 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness
{
     val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1
     debug("Follower for " + topic  + " is: %s".format(followerId))
 
-    produceMessage(topic, "first")
+    sendMessage(servers, topic, "first")
     waitUntilMetadataIsPropagated(servers, topic, partitionId)
     assertEquals(List("first"), consumeAllMessages(topic))
 
     // shutdown follower server
     servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server))
 
-    produceMessage(topic, "second")
+    sendMessage(servers, topic, "second")
     assertEquals(List("first", "second"), consumeAllMessages(topic))
 
     // shutdown leader and then restart follower
@@ -229,7 +229,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness
{
 
     // message production and consumption should both fail while leader is down
     intercept[FailedToSendMessageException] {
-      produceMessage(topic, "third")
+      sendMessage(servers, topic, "third")
     }
     assertEquals(List.empty[String], consumeAllMessages(topic))
 
@@ -237,7 +237,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness
{
     servers.filter(server => server.config.brokerId == leaderId).map(server => server.startup())
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(leaderId))
 
-    produceMessage(topic, "third")
+    sendMessage(servers, topic, "third")
     waitUntilMetadataIsPropagated(servers, topic, partitionId)
     servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server))
 
@@ -253,33 +253,16 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness
{
     server.awaitShutdown()
   }
 
-  private def produceMessage(topic: String, message: String) = {
-    val producer: Producer[String, Array[Byte]] = createProducer(
-      getBrokerListStrFromServers(servers),
-      keyEncoder = classOf[StringEncoder].getName)
-    producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, message.getBytes))
-    producer.close()
-  }
-
   private def consumeAllMessages(topic: String) : List[String] = {
     // use a fresh consumer group every time so that we don't need to mess with disabling
auto-commit or
     // resetting the ZK offset
     val consumerProps = createConsumerProperties(zkConnect, "group" + random.nextLong, "id",
1000)
     val consumerConnector = Consumer.create(new ConsumerConfig(consumerProps))
-    val messageStream = consumerConnector.createMessageStreams(Map(topic -> 1))(topic).head
-
-    val messages = new MutableList[String]
-    val iter = messageStream.iterator
-    try {
-      while(iter.hasNext()) {
-        messages += new String(iter.next.message) // will throw a timeout exception if the
message isn't there
-      }
-    } catch {
-      case e: ConsumerTimeoutException =>
-        debug("consumer timed out after receiving " + messages.length + " message(s).")
-    } finally {
-      consumerConnector.shutdown
-    }
-    messages.toList
+    val messageStream = consumerConnector.createMessageStreams(Map(topic -> 1), new StringDecoder(),
new StringDecoder())
+
+    val messages = getMessages(messageStream)
+    consumerConnector.shutdown
+
+    messages
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e5d481c/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
index ad66bb2..74c761d 100644
--- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
@@ -60,7 +60,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     TestUtils.createTopic(zkClient, topic, numParts, 1, servers)
 
     // send some messages to each broker
-    val sentMessages1 = sendMessages(nMessages, "batch1")
+    val sentMessages1 = sendMessages(servers, nMessages, "batch1")
 
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect,
group, consumer1))
@@ -82,32 +82,24 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     requestHandlerLogger.setLevel(Level.ERROR)
   }
 
-  def sendMessages(conf: KafkaConfig,
+  def sendMessages(servers: Seq[KafkaServer],
                    messagesPerNode: Int,
-                   header: String,
-                   compressed: CompressionCodec): List[String] = {
+                   header: String): List[String] = {
     var messages: List[String] = Nil
-    val producer: kafka.producer.Producer[Int, String] =
-      TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
-        encoder = classOf[StringEncoder].getName,
-        keyEncoder = classOf[IntEncoder].getName)
-    val javaProducer: Producer[Int, String] = new kafka.javaapi.producer.Producer(producer)
-    for (partition <- 0 until numParts) {
-      val ms = 0.until(messagesPerNode).map(x => header + conf.brokerId + "-" + partition
+ "-" + x)
-      messages ++= ms
-      import JavaConversions._
-      javaProducer.send(ms.map(new KeyedMessage[Int, String](topic, partition, _)): java.util.List[KeyedMessage[Int,
String]])
+    for(server <- servers) {
+      val producer: kafka.producer.Producer[Int, String] =
+        TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
+          encoder = classOf[StringEncoder].getName,
+          keyEncoder = classOf[IntEncoder].getName)
+      val javaProducer: Producer[Int, String] = new kafka.javaapi.producer.Producer(producer)
+      for (partition <- 0 until numParts) {
+        val ms = 0.until(messagesPerNode).map(x => header + server.config.brokerId + "-"
+ partition + "-" + x)
+        messages ++= ms
+        import JavaConversions._
+        javaProducer.send(ms.map(new KeyedMessage[Int, String](topic, partition, _)): java.util.List[KeyedMessage[Int,
String]])
+      }
+      javaProducer.close
     }
-    javaProducer.close
-    messages
-  }
-
-  def sendMessages(messagesPerNode: Int,
-                   header: String,
-                   compressed: CompressionCodec = NoCompressionCodec): List[String] = {
-    var messages: List[String] = Nil
-    for(conf <- configs)
-      messages ++= sendMessages(conf, messagesPerNode, header, compressed)
     messages
   }
 
@@ -115,18 +107,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
                   jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaStream[String,
String]]]): List[String] = {
     var messages: List[String] = Nil
     import scala.collection.JavaConversions._
-    val topicMessageStreams: collection.mutable.Map[String, java.util.List[KafkaStream[String,
String]]] = jTopicMessageStreams
-    for ((topic, messageStreams) <- topicMessageStreams) {
-      for (messageStream <- messageStreams) {
-        val iterator = messageStream.iterator
-        for (i <- 0 until nMessagesPerThread) {
-          assertTrue(iterator.hasNext)
-          val message = iterator.next.message
-          messages ::= message
-          debug("received message: " + message)
-        }
-      }
-    }
+    val topicMessageStreams = jTopicMessageStreams.mapValues(_.toList)
+    messages = TestUtils.getMessages(topicMessageStreams, nMessagesPerThread)
     messages
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e5d481c/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index 247a6e9..b42101b 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -77,12 +77,12 @@ class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with
Logging {
   }
 
   def createAndShutdownStep(group: String, consumerId: String, producerId: String): Unit
= {
-    val sentMessages1 = sendMessages(servers, topic, producerId, nMessages, "batch1", NoCompressionCodec,
1)
+    val sentMessages1 = sendMessages(servers, topic, nMessages)
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect,
group, consumerId))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
     val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic ->
1), new StringDecoder(), new StringDecoder())
-    val receivedMessages1 = getMessages(nMessages, topicMessageStreams1)
+    val receivedMessages1 = getMessages(topicMessageStreams1, nMessages)
 
     zkConsumerConnector1.shutdown()
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e5d481c/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 5a9e84d..8dc99b6 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -36,7 +36,7 @@ import kafka.producer._
 import kafka.message._
 import kafka.api._
 import kafka.cluster.Broker
-import kafka.consumer.{KafkaStream, ConsumerConfig}
+import kafka.consumer.{ConsumerTimeoutException, KafkaStream, ConsumerConfig}
 import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder}
 import kafka.common.TopicAndPartition
 import kafka.admin.AdminUtils
@@ -745,71 +745,97 @@ object TestUtils extends Logging {
                    time = time,
                    brokerState = new BrokerState())
   }
-
-  def sendMessagesToPartition(servers: Seq[KafkaServer],
-                              topic: String,
-                              partition: Int,
-                              numMessages: Int,
-                              compression: CompressionCodec = NoCompressionCodec): List[String]
= {
+  def sendMessages(servers: Seq[KafkaServer],
+                   topic: String,
+                   numMessages: Int,
+                   partition: Int = -1,
+                   compression: CompressionCodec = NoCompressionCodec): List[String] = {
     val header = "test-%d".format(partition)
     val props = new Properties()
     props.put("compression.codec", compression.codec.toString)
-    val producer: Producer[Int, String] =
-      createProducer(TestUtils.getBrokerListStrFromServers(servers),
-        encoder = classOf[StringEncoder].getName,
-        keyEncoder = classOf[IntEncoder].getName,
-        partitioner = classOf[FixedValuePartitioner].getName,
-        producerProps = props)
-
     val ms = 0.until(numMessages).map(x => header + "-" + x)
-    producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*)
-    debug("Sent %d messages for partition [%s,%d]".format(ms.size, topic, partition))
-    producer.close()
-    ms.toList
-  }
 
-  def sendMessages(servers: Seq[KafkaServer],
-                   topic: String,
-                   producerId: String,
-                   messagesPerNode: Int,
-                   header: String,
-                   compression: CompressionCodec,
-                   numParts: Int): List[String]= {
-    var messages: List[String] = Nil
-    val props = new Properties()
-    props.put("compression.codec", compression.codec.toString)
-    props.put("client.id", producerId)
-    val   producer: Producer[Int, String] =
-      createProducer(brokerList = TestUtils.getBrokerListStrFromServers(servers),
-        encoder = classOf[StringEncoder].getName,
-        keyEncoder = classOf[IntEncoder].getName,
-        partitioner = classOf[FixedValuePartitioner].getName,
-        producerProps = props)
+    // Specific Partition
+    if (partition >= 0) {
+      val producer: Producer[Int, String] =
+        createProducer(TestUtils.getBrokerListStrFromServers(servers),
+          encoder = classOf[StringEncoder].getName,
+          keyEncoder = classOf[IntEncoder].getName,
+          partitioner = classOf[FixedValuePartitioner].getName,
+          producerProps = props)
 
-    for (partition <- 0 until numParts) {
-      val ms = 0.until(messagesPerNode).map(x => header + "-" + partition + "-" + x)
       producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*)
-      messages ++= ms
       debug("Sent %d messages for partition [%s,%d]".format(ms.size, topic, partition))
+      producer.close()
+      ms.toList
+    } else {
+      // Use topic as the key to determine partition
+      val producer: Producer[String, String] = createProducer(
+        TestUtils.getBrokerListStrFromServers(servers),
+        encoder = classOf[StringEncoder].getName,
+        keyEncoder = classOf[StringEncoder].getName,
+        partitioner = classOf[DefaultPartitioner].getName,
+        producerProps = props)
+      producer.send(ms.map(m => new KeyedMessage[String, String](topic, topic, m)):_*)
+      producer.close()
+      debug("Sent %d messages for topic [%s]".format(ms.size, topic))
+      ms.toList
     }
+
+  }
+
+  def sendMessage(servers: Seq[KafkaServer],
+                  topic: String,
+                  message: String) = {
+
+    val producer: Producer[String, String] =
+      createProducer(TestUtils.getBrokerListStrFromServers(servers),
+        encoder = classOf[StringEncoder].getName(),
+        keyEncoder = classOf[StringEncoder].getName())
+
+    producer.send(new KeyedMessage[String, String](topic, topic, message))
     producer.close()
-    messages
   }
 
-  def getMessages(nMessagesPerThread: Int,
-                  topicMessageStreams: Map[String, List[KafkaStream[String, String]]]): List[String]
= {
+  /**
+   * Consume all messages (or a specific number of messages)
+   * @param topicMessageStreams the Topic Message Streams
+   * @param nMessagesPerThread an optional field to specify the exact number of messages
to be returned.
+   *                           ConsumerTimeoutException will be thrown if there are no messages
to be consumed.
+   *                           If not specified, then all available messages will be consumed,
and no exception is thrown.
+   *
+   *
+   * @return the list of messages consumed.
+   */
+  def getMessages(topicMessageStreams: Map[String, List[KafkaStream[String, String]]],
+                     nMessagesPerThread: Int = -1): List[String] = {
+
     var messages: List[String] = Nil
+    val shouldGetAllMessages = nMessagesPerThread < 0
     for ((topic, messageStreams) <- topicMessageStreams) {
       for (messageStream <- messageStreams) {
-        val iterator = messageStream.iterator
-        for (i <- 0 until nMessagesPerThread) {
-          assertTrue(iterator.hasNext)
-          val message = iterator.next.message
-          messages ::= message
-          debug("received message: " + message)
+        val iterator = messageStream.iterator()
+        try {
+          var i = 0
+          while ((shouldGetAllMessages && iterator.hasNext()) || (i < nMessagesPerThread))
{
+            assertTrue(iterator.hasNext)
+            val message = iterator.next.message // will throw a timeout exception if the
message isn't there
+            messages ::= message
+            debug("received message: " + message)
+            i += 1
+          }
+        } catch {
+          case e: ConsumerTimeoutException =>
+            if (shouldGetAllMessages) {
+              // swallow the exception
+              debug("consumer timed out after receiving " + messages.length + " message(s).")
+            } else {
+              throw e
+            }
         }
       }
     }
+
     messages.reverse
   }
 


Mime
View raw message