kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch trunk updated: KAFKA-3417: Wrap metric reporter calls in try/catch blocks (#3635)
Date Mon, 30 Apr 2018 11:34:10 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 902009e  KAFKA-3417: Wrap metric reporter calls in try/catch blocks (#3635)
902009e is described below

commit 902009ea981075bdad178c96eb9e9a835d9cc52f
Author: Mickael Maison <mimaison@users.noreply.github.com>
AuthorDate: Mon Apr 30 12:34:02 2018 +0100

    KAFKA-3417: Wrap metric reporter calls in try/catch blocks (#3635)
    
    Prevent exception thrown by metric reporters to impact request processing and other reporters.
    
    Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
    Co-authored-by: Edoardo Comar <ecomar@uk.ibm.com>
    
    Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
---
 .../org/apache/kafka/common/metrics/Metrics.java   |  27 +++--
 .../scala/unit/kafka/server/BaseRequestTest.scala  |  12 +++
 .../KafkaMetricReporterExceptionHandlingTest.scala | 116 +++++++++++++++++++++
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  10 --
 4 files changed, 149 insertions(+), 16 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index dee69f5..7a8667c 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -524,8 +524,13 @@ public class Metrics implements Closeable {
     public synchronized KafkaMetric removeMetric(MetricName metricName) {
         KafkaMetric metric = this.metrics.remove(metricName);
         if (metric != null) {
-            for (MetricsReporter reporter : reporters)
-                reporter.metricRemoval(metric);
+            for (MetricsReporter reporter : reporters) {
+                try {
+                    reporter.metricRemoval(metric);
+                } catch (Exception e) {
+                    log.error("Error when removing metric from " + reporter.getClass().getName(),
e);
+                }
+            }
         }
         return metric;
     }
@@ -552,8 +557,13 @@ public class Metrics implements Closeable {
         if (this.metrics.containsKey(metricName))
             throw new IllegalArgumentException("A metric named '" + metricName + "' already
exists, can't register another one.");
         this.metrics.put(metricName, metric);
-        for (MetricsReporter reporter : reporters)
-            reporter.metricChange(metric);
+        for (MetricsReporter reporter : reporters) {
+            try {
+                reporter.metricChange(metric);
+            } catch (Exception e) {
+                log.error("Error when registering metric on " + reporter.getClass().getName(),
e);
+            }
+        }
     }
 
     /**
@@ -634,8 +644,13 @@ public class Metrics implements Closeable {
             }
         }
 
-        for (MetricsReporter reporter : this.reporters)
-            reporter.close();
+        for (MetricsReporter reporter : reporters) {
+            try {
+                reporter.close();
+            } catch (Exception e) {
+                log.error("Error when closing " + reporter.getClass().getName(), e);
+            }
+        }
     }
 
 }
diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
index f91afd4..99355bc 100644
--- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
@@ -155,6 +155,18 @@ abstract class BaseRequestTest extends KafkaServerTestHarness {
   }
 
   /**
+   * Sends a request built by the builder, waits for the response and parses it 
+   */
+  def requestResponse(socket: Socket, clientId: String, correlationId: Int, requestBuilder:
AbstractRequest.Builder[_ <: AbstractRequest]): Struct = {
+    val apiKey = requestBuilder.apiKey
+    val request = requestBuilder.build()
+    val header = new RequestHeader(apiKey, request.version, clientId, correlationId)
+    val response = requestAndReceive(socket, request.serialize(header).array)
+    val responseBuffer = skipResponseHeader(response)
+    apiKey.parseResponse(request.version, responseBuffer)
+  }
+  
+  /**
     * Serializes and sends the requestStruct to the given api.
     * A ByteBuffer containing the response (without the response header) is returned.
     */
diff --git a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterExceptionHandlingTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterExceptionHandlingTest.scala
new file mode 100644
index 0000000..30f3b23
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterExceptionHandlingTest.scala
@@ -0,0 +1,116 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package kafka.server
+
+import java.net.Socket
+import java.util.Properties
+
+import kafka.utils.TestUtils
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.requests.{ListGroupsRequest,ListGroupsResponse}
+import org.apache.kafka.common.metrics.MetricsReporter
+import org.apache.kafka.common.metrics.KafkaMetric
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.protocol.Errors
+
+import org.junit.Assert._
+import org.junit.{Before, Test}
+import org.junit.After
+import java.util.concurrent.atomic.AtomicInteger
+
+/*
+ * this test checks that a reporter that throws an exception will not affect other reporters
+ * and will not affect the broker's message handling
+ */
+class KafkaMetricReporterExceptionHandlingTest extends BaseRequestTest {
+
+  override def numBrokers: Int = 1
+
+  override def propertyOverrides(properties: Properties): Unit = {
+    properties.put(KafkaConfig.MetricReporterClassesProp, classOf[KafkaMetricReporterExceptionHandlingTest.BadReporter].getName
+ "," + classOf[KafkaMetricReporterExceptionHandlingTest.GoodReporter].getName)
+  }
+
+  @Before
+  override def setUp() {
+    super.setUp()
+
+    // need a quota prop to register a "throttle-time" metrics after server startup
+    val quotaProps = new Properties()
+    quotaProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, "0.1")
+    adminZkClient.changeClientIdConfig("<default>", quotaProps)
+  }
+
+  @After
+  override def tearDown() {
+    KafkaMetricReporterExceptionHandlingTest.goodReporterRegistered.set(0)
+    KafkaMetricReporterExceptionHandlingTest.badReporterRegistered.set(0)
+    
+    super.tearDown()
+  }
+
+  @Test
+  def testBothReportersAreInvoked() {
+    val port = anySocketServer.boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
+    val socket = new Socket("localhost", port)
+    socket.setSoTimeout(10000)
+
+    try {
+      TestUtils.retry(10000) {
+        val error = new ListGroupsResponse(requestResponse(socket, "clientId", 0, new ListGroupsRequest.Builder())).error()
+        assertEquals(Errors.NONE, error)
+        assertEquals(KafkaMetricReporterExceptionHandlingTest.goodReporterRegistered.get,
KafkaMetricReporterExceptionHandlingTest.badReporterRegistered.get)
+        assertTrue(KafkaMetricReporterExceptionHandlingTest.goodReporterRegistered.get >
0)
+      }
+    } finally {
+      socket.close()
+    }
+  }
+}
+
+object KafkaMetricReporterExceptionHandlingTest {
+  var goodReporterRegistered = new AtomicInteger
+  var badReporterRegistered = new AtomicInteger
+
+  class GoodReporter extends MetricsReporter {
+
+    def configure(configs: java.util.Map[String, _]) {
+    }
+
+    def init(metrics: java.util.List[KafkaMetric]) {
+    }
+
+    def metricChange(metric: KafkaMetric) {
+      if (metric.metricName.group == "Request") {
+        goodReporterRegistered.incrementAndGet
+      }
+    }
+
+    def metricRemoval(metric: KafkaMetric) {
+    }
+
+    def close() {
+    }
+  }
+
+  class BadReporter extends GoodReporter {
+
+    override def metricChange(metric: KafkaMetric) {
+      if (metric.metricName.group == "Request") {
+        badReporterRegistered.incrementAndGet
+        throw new RuntimeException(metric.metricName.toString)
+      }
+    }
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index ed85415..8a50fca 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -14,7 +14,6 @@
 
 package kafka.server
 
-import java.net.Socket
 import java.nio.ByteBuffer
 import java.util.{Collections, LinkedHashMap, Properties}
 import java.util.concurrent.{Executors, Future, TimeUnit}
@@ -331,15 +330,6 @@ class RequestQuotaTest extends BaseRequestTest {
     }
   }
 
-  private def requestResponse(socket: Socket, clientId: String, correlationId: Int, requestBuilder:
AbstractRequest.Builder[_ <: AbstractRequest]): Struct = {
-    val apiKey = requestBuilder.apiKey
-    val request = requestBuilder.build()
-    val header = new RequestHeader(apiKey, request.version, clientId, correlationId)
-    val response = requestAndReceive(socket, request.serialize(header).array)
-    val responseBuffer = skipResponseHeader(response)
-    apiKey.parseResponse(request.version, responseBuffer)
-  }
-
   case class Client(clientId: String, apiKey: ApiKeys) {
     var correlationId: Int = 0
     val builder = requestBuilder(apiKey)

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.

Mime
View raw message