kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [1/2] kafka git commit: kafka-1481; Stop using dashes AND underscores as separators in MBean names; patched by Vladimir Tretyakov; reviewed by Joel Koshy and Jun Rao
Date Tue, 18 Nov 2014 02:35:57 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.8.2 7e1907e32 -> 117a02de0


http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
new file mode 100644
index 0000000..3cf23b3
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import com.yammer.metrics.Metrics
+import junit.framework.Assert._
+import kafka.integration.KafkaServerTestHarness
+import kafka.server._
+import scala.collection._
+import org.scalatest.junit.JUnit3Suite
+import kafka.message._
+import kafka.serializer._
+import kafka.utils._
+import kafka.utils.TestUtils._
+
+class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
+  val zookeeperConnect = TestZKUtils.zookeeperConnect
+  val numNodes = 2
+  val numParts = 2
+  val topic = "topic1"
+  val configs =
+    for (props <- TestUtils.createBrokerConfigs(numNodes))
+    yield new KafkaConfig(props) {
+      override val zkConnect = zookeeperConnect
+      override val numPartitions = numParts
+    }
+  val nMessages = 2
+
+  override def tearDown() {
+    super.tearDown()
+  }
+
+  def testMetricsLeak() {
+    // create topic topic1 with 1 partition on broker 0
+    createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
+    // force creation not client's specific metrics.
+    createAndShutdownStep("group0", "consumer0", "producer0")
+
+    val countOfStaticMetrics = Metrics.defaultRegistry().allMetrics().keySet().size
+
+    for (i <- 0 to 5) {
+      createAndShutdownStep("group" + i % 3, "consumer" + i % 2, "producer" + i % 2)
+      assertEquals(countOfStaticMetrics, Metrics.defaultRegistry().allMetrics().keySet().size)
+    }
+  }
+
+  def createAndShutdownStep(group: String, consumerId: String, producerId: String): Unit
= {
+    val sentMessages1 = sendMessages(configs, topic, producerId, nMessages, "batch1", NoCompressionCodec,
1)
+    // create a consumer
+    val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect,
group, consumerId))
+    val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
+    val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic ->
1), new StringDecoder(), new StringDecoder())
+    val receivedMessages1 = getMessages(nMessages, topicMessageStreams1)
+
+    zkConsumerConnector1.shutdown()
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/117a02de/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index dd3640f..0da774d 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -26,7 +26,6 @@ import java.util.Properties
 
 import org.apache.kafka.common.utils.Utils._
 
-import collection.mutable.Map
 import collection.mutable.ListBuffer
 
 import org.I0Itec.zkclient.ZkClient
@@ -36,7 +35,7 @@ import kafka.producer._
 import kafka.message._
 import kafka.api._
 import kafka.cluster.Broker
-import kafka.consumer.ConsumerConfig
+import kafka.consumer.{KafkaStream, ConsumerConfig}
 import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder}
 import kafka.common.TopicAndPartition
 import kafka.admin.AdminUtils
@@ -47,6 +46,8 @@ import junit.framework.AssertionFailedError
 import junit.framework.Assert._
 import org.apache.kafka.clients.producer.KafkaProducer
 
+import scala.collection.Map
+
 /**
  * Utility functions to help with testing
  */
@@ -483,7 +484,7 @@ object TestUtils extends Logging {
     val data = topics.flatMap(topic =>
       partitions.map(partition => (TopicAndPartition(topic,  partition), message))
     )
-    new ProducerRequest(correlationId, clientId, acks.toShort, timeout, Map(data:_*))
+    new ProducerRequest(correlationId, clientId, acks.toShort, timeout, collection.mutable.Map(data:_*))
   }
 
   def makeLeaderForPartition(zkClient: ZkClient, topic: String,
@@ -720,6 +721,73 @@ object TestUtils extends Logging {
       time = time,
       brokerState = new BrokerState())
   }
+
+  def sendMessagesToPartition(configs: Seq[KafkaConfig],
+                              topic: String,
+                              partition: Int,
+                              numMessages: Int,
+                              compression: CompressionCodec = NoCompressionCodec): List[String]
= {
+    val header = "test-%d".format(partition)
+    val props = new Properties()
+    props.put("compression.codec", compression.codec.toString)
+    val producer: Producer[Int, String] =
+      createProducer(TestUtils.getBrokerListStrFromConfigs(configs),
+        encoder = classOf[StringEncoder].getName,
+        keyEncoder = classOf[IntEncoder].getName,
+        partitioner = classOf[FixedValuePartitioner].getName,
+        producerProps = props)
+
+    val ms = 0.until(numMessages).map(x => header + "-" + x)
+    producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*)
+    debug("Sent %d messages for partition [%s,%d]".format(ms.size, topic, partition))
+    producer.close()
+    ms.toList
+  }
+
+  def sendMessages(configs: Seq[KafkaConfig],
+                   topic: String,
+                   producerId: String,
+                   messagesPerNode: Int,
+                   header: String,
+                   compression: CompressionCodec,
+                   numParts: Int): List[String]= {
+    var messages: List[String] = Nil
+    val props = new Properties()
+    props.put("compression.codec", compression.codec.toString)
+    props.put("client.id", producerId)
+    val   producer: Producer[Int, String] =
+      createProducer(brokerList = TestUtils.getBrokerListStrFromConfigs(configs),
+        encoder = classOf[StringEncoder].getName,
+        keyEncoder = classOf[IntEncoder].getName,
+        partitioner = classOf[FixedValuePartitioner].getName,
+        producerProps = props)
+
+    for (partition <- 0 until numParts) {
+      val ms = 0.until(messagesPerNode).map(x => header + "-" + partition + "-" + x)
+      producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*)
+      messages ++= ms
+      debug("Sent %d messages for partition [%s,%d]".format(ms.size, topic, partition))
+    }
+    producer.close()
+    messages
+  }
+
+  def getMessages(nMessagesPerThread: Int,
+                  topicMessageStreams: Map[String, List[KafkaStream[String, String]]]): List[String]
= {
+    var messages: List[String] = Nil
+    for ((topic, messageStreams) <- topicMessageStreams) {
+      for (messageStream <- messageStreams) {
+        val iterator = messageStream.iterator
+        for (i <- 0 until nMessagesPerThread) {
+          assertTrue(iterator.hasNext)
+          val message = iterator.next.message
+          messages ::= message
+          debug("received message: " + message)
+        }
+      }
+    }
+    messages.reverse
+  }
 }
 
 object TestZKUtils {


Mime
View raw message