kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: MINOR: Fix one flaky test in MetricsTest and improve checks for another
Date Mon, 15 May 2017 13:08:29 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk f56bbb651 -> 50eacb7b4


MINOR: Fix one flaky test in MetricsTest and improve checks for another

* Fix flakiness of `testBrokerTopicMetricsUnregisteredAfterDeletingTopic` by not
consuming messages. Filed KAFKA-5238 to track the issue that metrics for a deleted
topic may be re-created if there are fetch requests in the purgatory.

* Check the log size in `testBrokerTopicMetricsBytesInOut` before attempting to read
the `replicationBytesIn` metric. This helps understand where things have gone wrong
if if the metric has not increased (i.e. if it was an issue replicating or with the metric).

* Only remove the replication bytes in/out if the metrics are defined. This should not
affect the behaviour due to the tags, but it makes the code clearer. We've seen some
cases in Jenkins when the metric does not exist and it's still unclear how that can
happen.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>

Closes #3042 from ijuma/more-informative-assertion-for-flaky-metrics-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/50eacb7b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/50eacb7b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/50eacb7b

Branch: refs/heads/trunk
Commit: 50eacb7b45143dbc26a341620290991428194e4d
Parents: f56bbb6
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Mon May 15 14:08:18 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Mon May 15 14:08:18 2017 +0100

----------------------------------------------------------------------
 .../kafka/server/KafkaRequestHandler.scala      |  6 ++--
 .../scala/unit/kafka/metrics/MetricsTest.scala  | 35 +++++++++++++-------
 2 files changed, 27 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/50eacb7b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index d1d63f1..8dfbe64 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -131,8 +131,10 @@ class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup
{
     removeMetric(BrokerTopicStats.BytesInPerSec, tags)
     removeMetric(BrokerTopicStats.BytesOutPerSec, tags)
     removeMetric(BrokerTopicStats.BytesRejectedPerSec, tags)
-    removeMetric(BrokerTopicStats.ReplicationBytesInPerSec, tags)
-    removeMetric(BrokerTopicStats.ReplicationBytesOutPerSec, tags)
+    if (replicationBytesInRate.isDefined)
+      removeMetric(BrokerTopicStats.ReplicationBytesInPerSec, tags)
+    if (replicationBytesOutRate.isDefined)
+      removeMetric(BrokerTopicStats.ReplicationBytesOutPerSec, tags)
     removeMetric(BrokerTopicStats.FailedProduceRequestsPerSec, tags)
     removeMetric(BrokerTopicStats.FailedFetchRequestsPerSec, tags)
     removeMetric(BrokerTopicStats.TotalProduceRequestsPerSec, tags)

http://git-wip-us.apache.org/repos/asf/kafka/blob/50eacb7b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index c586a54..745fea6 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -35,11 +35,11 @@ import scala.collection.JavaConverters._
 import scala.util.matching.Regex
 import kafka.consumer.{ConsumerConfig, ZookeeperConsumerConnector}
 import kafka.log.LogConfig
+import org.apache.kafka.common.TopicPartition
 
 class MetricsTest extends KafkaServerTestHarness with Logging {
   val numNodes = 2
   val numParts = 2
-  val topic = "topic1"
 
   val overridingProps = new Properties
   overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)
@@ -52,6 +52,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
   @Test
   @deprecated("This test has been deprecated and it will be removed in a future release",
"0.10.0.0")
   def testMetricsLeak() {
+    val topic = "test-metrics-leak"
     // create topic topic1 with 1 partition on broker 0
     createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
     // force creation not client's specific metrics.
@@ -74,19 +75,21 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
     AdminUtils.createTopic(zkUtils, topic, 1, 1)
     AdminUtils.deleteTopic(zkUtils, topic)
     TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
-    assertFalse("Topic metrics exists after deleteTopic", checkTopicMetricsExists(topic))
+    assertEquals("Topic metrics exists after deleteTopic", Set.empty, topicMetricGroups(topic))
   }
 
   @Test
   def testBrokerTopicMetricsUnregisteredAfterDeletingTopic() {
     val topic = "test-broker-topic-metric"
     AdminUtils.createTopic(zkUtils, topic, 2, 1)
-    createAndShutdownStep(topic, "group0", "consumer0", "producer0")
-    assertTrue("Topic metrics don't exist", checkTopicMetricsExists(topic))
+    // Produce a few messages to create the metrics
+    // Don't consume messages as it may cause metrics to be re-created causing the test to
fail, see KAFKA-5238
+    TestUtils.produceMessages(servers, topic, nMessages)
+    assertTrue("Topic metrics don't exist", topicMetricGroups(topic).nonEmpty)
     assertNotNull(BrokerTopicStats.getBrokerTopicStats(topic))
     AdminUtils.deleteTopic(zkUtils, topic)
     TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
-    assertFalse("Topic metrics exists after deleteTopic", checkTopicMetricsExists(topic))
+    assertEquals("Topic metrics exists after deleteTopic", Set.empty, topicMetricGroups(topic))
   }
 
   @Test
@@ -110,6 +113,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
 
   @Test
   def testBrokerTopicMetricsBytesInOut(): Unit = {
+    val topic = "test-bytes-in-out"
     val replicationBytesIn = BrokerTopicStats.ReplicationBytesInPerSec
     val replicationBytesOut = BrokerTopicStats.ReplicationBytesOutPerSec
     val bytesIn = s"${BrokerTopicStats.BytesInPerSec},topic=$topic"
@@ -121,6 +125,17 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
     // Produce a few messages to create the metrics
     TestUtils.produceMessages(servers, topic, nMessages)
 
+    // Check the log size for each broker so that we can distinguish between failures caused
by replication issues
+    // versus failures caused by the metrics
+    val topicPartition = new TopicPartition(topic, 0)
+    servers.foreach { server =>
+      val log = server.logManager.logsByTopicPartition.get(new TopicPartition(topic, 0))
+      val brokerId = server.config.brokerId
+      val logSize = log.map(_.size)
+      assertTrue(s"Expected broker $brokerId to have a Log for $topicPartition with positive
size, actual: $logSize",
+        logSize.map(_ > 0).getOrElse(false))
+    }
+
     val initialReplicationBytesIn = meterCount(replicationBytesIn)
     val initialReplicationBytesOut = meterCount(replicationBytesOut)
     val initialBytesIn = meterCount(bytesIn)
@@ -151,13 +166,9 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
       .count
   }
 
-  private def checkTopicMetricsExists(topic: String): Boolean = {
+  private def topicMetricGroups(topic: String): Set[String] = {
     val topicMetricRegex = new Regex(".*BrokerTopicMetrics.*("+topic+")$")
-    val metricGroups = Metrics.defaultRegistry.groupedMetrics(MetricPredicate.ALL).entrySet
-    for (metricGroup <- metricGroups.asScala) {
-      if (topicMetricRegex.pattern.matcher(metricGroup.getKey).matches)
-        return true
-    }
-    false
+    val metricGroups = Metrics.defaultRegistry.groupedMetrics(MetricPredicate.ALL).keySet.asScala
+    metricGroups.filter(topicMetricRegex.pattern.matcher(_).matches)
   }
 }


Mime
View raw message