kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [1/2] kafka git commit: kafka-1481; Stop using dashes AND underscores as separators in MBean names; patched by Vladimir Tretyakov; reviewed by Joel Koshy and Jun Rao
Date Thu, 20 Nov 2014 01:58:20 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 1c7d783dd -> 457744a82


http://git-wip-us.apache.org/repos/asf/kafka/blob/457744a8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index dd3640f..0da774d 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -26,7 +26,6 @@ import java.util.Properties
 
 import org.apache.kafka.common.utils.Utils._
 
-import collection.mutable.Map
 import collection.mutable.ListBuffer
 
 import org.I0Itec.zkclient.ZkClient
@@ -36,7 +35,7 @@ import kafka.producer._
 import kafka.message._
 import kafka.api._
 import kafka.cluster.Broker
-import kafka.consumer.ConsumerConfig
+import kafka.consumer.{KafkaStream, ConsumerConfig}
 import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder}
 import kafka.common.TopicAndPartition
 import kafka.admin.AdminUtils
@@ -47,6 +46,8 @@ import junit.framework.AssertionFailedError
 import junit.framework.Assert._
 import org.apache.kafka.clients.producer.KafkaProducer
 
+import scala.collection.Map
+
 /**
  * Utility functions to help with testing
  */
@@ -483,7 +484,7 @@ object TestUtils extends Logging {
     val data = topics.flatMap(topic =>
       partitions.map(partition => (TopicAndPartition(topic,  partition), message))
     )
-    new ProducerRequest(correlationId, clientId, acks.toShort, timeout, Map(data:_*))
+    new ProducerRequest(correlationId, clientId, acks.toShort, timeout, collection.mutable.Map(data:_*))
   }
 
   def makeLeaderForPartition(zkClient: ZkClient, topic: String,
@@ -720,6 +721,73 @@ object TestUtils extends Logging {
       time = time,
       brokerState = new BrokerState())
   }
+
+  def sendMessagesToPartition(configs: Seq[KafkaConfig],
+                              topic: String,
+                              partition: Int,
+                              numMessages: Int,
+                              compression: CompressionCodec = NoCompressionCodec): List[String]
= {
+    val header = "test-%d".format(partition)
+    val props = new Properties()
+    props.put("compression.codec", compression.codec.toString)
+    val producer: Producer[Int, String] =
+      createProducer(TestUtils.getBrokerListStrFromConfigs(configs),
+        encoder = classOf[StringEncoder].getName,
+        keyEncoder = classOf[IntEncoder].getName,
+        partitioner = classOf[FixedValuePartitioner].getName,
+        producerProps = props)
+
+    val ms = 0.until(numMessages).map(x => header + "-" + x)
+    producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*)
+    debug("Sent %d messages for partition [%s,%d]".format(ms.size, topic, partition))
+    producer.close()
+    ms.toList
+  }
+
+  def sendMessages(configs: Seq[KafkaConfig],
+                   topic: String,
+                   producerId: String,
+                   messagesPerNode: Int,
+                   header: String,
+                   compression: CompressionCodec,
+                   numParts: Int): List[String]= {
+    var messages: List[String] = Nil
+    val props = new Properties()
+    props.put("compression.codec", compression.codec.toString)
+    props.put("client.id", producerId)
+    val   producer: Producer[Int, String] =
+      createProducer(brokerList = TestUtils.getBrokerListStrFromConfigs(configs),
+        encoder = classOf[StringEncoder].getName,
+        keyEncoder = classOf[IntEncoder].getName,
+        partitioner = classOf[FixedValuePartitioner].getName,
+        producerProps = props)
+
+    for (partition <- 0 until numParts) {
+      val ms = 0.until(messagesPerNode).map(x => header + "-" + partition + "-" + x)
+      producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*)
+      messages ++= ms
+      debug("Sent %d messages for partition [%s,%d]".format(ms.size, topic, partition))
+    }
+    producer.close()
+    messages
+  }
+
+  def getMessages(nMessagesPerThread: Int,
+                  topicMessageStreams: Map[String, List[KafkaStream[String, String]]]): List[String]
= {
+    var messages: List[String] = Nil
+    for ((topic, messageStreams) <- topicMessageStreams) {
+      for (messageStream <- messageStreams) {
+        val iterator = messageStream.iterator
+        for (i <- 0 until nMessagesPerThread) {
+          assertTrue(iterator.hasNext)
+          val message = iterator.next.message
+          messages ::= message
+          debug("received message: " + message)
+        }
+      }
+    }
+    messages.reverse
+  }
 }
 
 object TestZKUtils {


Mime
View raw message