kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6243: Enable dynamic updates of broker metrics reporters (#4464)
Date Tue, 30 Jan 2018 20:55:39 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 70d3b19  KAFKA-6243: Enable dynamic updates of broker metrics reporters (#4464)
70d3b19 is described below

commit 70d3b19b1126ae64c214615adef590275f218ee7
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Tue Jan 30 12:55:32 2018 -0800

    KAFKA-6243: Enable dynamic updates of broker metrics reporters (#4464)
    
    Dynamic metrics reporter updates described in KIP-226. This includes:
      - Addition and removal of metrics reporters
      - Reconfiguration of custom metrics reporter configs
      - Tests for metrics reporter updates at default cluster-level and as per-broker config
for testing
    
    Reviewers: Jason Gustafson <jason@confluent.io>
---
 .../apache/kafka/common/config/AbstractConfig.java |  21 ++-
 .../org/apache/kafka/common/metrics/Metrics.java   |   9 +
 .../java/org/apache/kafka/common/utils/Utils.java  |  15 +-
 core/src/main/scala/kafka/log/LogCleaner.scala     |  13 +-
 .../scala/kafka/server/DynamicBrokerConfig.scala   | 103 +++++++++--
 core/src/main/scala/kafka/server/KafkaServer.scala |  29 ++-
 .../server/DynamicBrokerReconfigurationTest.scala  | 203 +++++++++++++++++++--
 7 files changed, 343 insertions(+), 50 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 9e32074..427c492 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -288,11 +288,26 @@ public class AbstractConfig {
      * @return The list of configured instances
      */
     public <T> List<T> getConfiguredInstances(String key, Class<T> t, Map<String,
Object> configOverrides) {
-        List<String> klasses = getList(key);
-        List<T> objects = new ArrayList<>();
+        return getConfiguredInstances(getList(key), t, configOverrides);
+    }
+
+
+    /**
+     * Get a list of configured instances of the given class specified by the given configuration
key. The configuration
+     * may specify either null or an empty string to indicate no configured instances. In
both cases, this method
+     * returns an empty list to indicate no configured instances.
+     * @param classNames The list of class names of the instances to create
+     * @param t The interface the class should implement
+     * @param configOverrides Configuration overrides to use.
+     * @return The list of configured instances
+     */
+    public <T> List<T> getConfiguredInstances(List<String> classNames,
Class<T> t, Map<String, Object> configOverrides) {
+        List<T> objects = new ArrayList<T>();
+        if (classNames == null)
+            return objects;
         Map<String, Object> configPairs = originals();
         configPairs.putAll(configOverrides);
-        for (Object klass : klasses) {
+        for (Object klass : classNames) {
             Object o;
             if (klass instanceof String) {
                 try {
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index 68e1f47..7f9fb9d 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -526,6 +526,15 @@ public class Metrics implements Closeable {
         this.reporters.add(reporter);
     }
 
+    /**
+     * Remove a MetricReporter
+     */
+    public synchronized void removeReporter(MetricsReporter reporter) {
+        if (this.reporters.remove(reporter)) {
+            reporter.close();
+        }
+    }
+
     synchronized void registerMetric(KafkaMetric metric) {
         MetricName metricName = metric.metricName();
         if (this.metrics.containsKey(metricName))
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index dff6107..c7a654a 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -304,11 +304,22 @@ public final class Utils {
      * Look up the class by name and instantiate it.
      * @param klass class name
      * @param base super class of the class to be instantiated
-     * @param <T>
+     * @param <T> the type of the base class
      * @return the new instance
      */
     public static <T> T newInstance(String klass, Class<T> base) throws ClassNotFoundException
{
-        return Utils.newInstance(Class.forName(klass, true, Utils.getContextOrKafkaClassLoader()).asSubclass(base));
+        return Utils.newInstance(loadClass(klass, base));
+    }
+
+    /**
+     * Look up a class by name.
+     * @param klass class name
+     * @param base super class of the class for verification
+     * @param <T> the type of the base class
+     * @return the new class
+     */
+    public static <T> Class<? extends T> loadClass(String klass, Class<T>
base) throws ClassNotFoundException {
+        return Class.forName(klass, true, Utils.getContextOrKafkaClassLoader()).asSubclass(base);
     }
 
     /**
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index e013cfb..063f443 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -32,6 +32,7 @@ import kafka.utils._
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.ConfigException
 import org.apache.kafka.common.errors.KafkaStorageException
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
@@ -154,14 +155,22 @@ class LogCleaner(initialConfig: CleanerConfig,
     cleaners.clear()
   }
 
-  override def reconfigurableConfigs(): Set[String] = {
+  override def reconfigurableConfigs: Set[String] = {
     LogCleaner.ReconfigurableConfigs
   }
 
-  override def validateReconfiguration(newConfig: KafkaConfig): Boolean = {
+  override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
     val newCleanerConfig = LogCleaner.cleanerConfig(newConfig)
     val numThreads = newCleanerConfig.numThreads
     numThreads >= 1 && numThreads >= config.numThreads / 2 && numThreads
<= config.numThreads * 2
+    val currentThreads = config.numThreads
+    if (numThreads <= 0)
+      throw new ConfigException(s"Log cleaner threads should be at least 1")
+    if (numThreads < currentThreads / 2)
+      throw new ConfigException(s"Log cleaner threads cannot be reduced to less than half
the current value $currentThreads")
+    if (numThreads > currentThreads * 2)
+      throw new ConfigException(s"Log cleaner threads cannot be increased to more than double
the current value $currentThreads")
+
   }
 
   /**
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 58fa583..168654d 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -19,7 +19,7 @@ package kafka.server
 
 import java.nio.charset.StandardCharsets
 import java.util
-import java.util.Properties
+import java.util.{Collections, Properties}
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
 import kafka.log.{LogCleaner, LogConfig, LogManager}
@@ -29,7 +29,8 @@ import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.common.Reconfigurable
 import org.apache.kafka.common.config.{ConfigDef, ConfigException, SslConfigs}
 import org.apache.kafka.common.network.ListenerReconfigurable
-import org.apache.kafka.common.utils.Base64
+import org.apache.kafka.common.metrics.MetricsReporter
+import org.apache.kafka.common.utils.{Base64, Utils}
 
 import scala.collection._
 import scala.collection.JavaConverters._
@@ -81,6 +82,7 @@ object DynamicBrokerConfig {
   AllDynamicConfigs ++= LogCleaner.ReconfigurableConfigs
   AllDynamicConfigs ++= DynamicLogConfig.ReconfigurableConfigs
   AllDynamicConfigs ++= DynamicThreadPool.ReconfigurableConfigs
+  AllDynamicConfigs ++= Set(KafkaConfig.MetricReporterClassesProp)
 
   private val PerBrokerConfigs = DynamicSecurityConfigs
 
@@ -134,6 +136,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends
Logging
     if (kafkaServer.logManager.cleaner != null)
       addBrokerReconfigurable(kafkaServer.logManager.cleaner)
     addReconfigurable(new DynamicLogConfig(kafkaServer.logManager))
+    addReconfigurable(new DynamicMetricsReporters(brokerId, kafkaServer))
   }
 
   def addReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock)
{
@@ -295,6 +298,11 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends
Logging
     }
   }
 
+  private[server] def maybeReconfigure(reconfigurable: Reconfigurable, oldConfig: KafkaConfig,
newConfig: util.Map[String, _]): Unit = {
+    if (reconfigurable.reconfigurableConfigs.asScala.exists(key => oldConfig.originals.get(key)
!= newConfig.get(key)))
+      reconfigurable.reconfigure(newConfig)
+  }
+
   private def updatedConfigs(newProps: java.util.Map[String, _], currentProps: java.util.Map[_,
_]): mutable.Map[String, _] = {
     newProps.asScala.filter {
       case (k, v) => v != currentProps.get(k)
@@ -388,10 +396,9 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends
Logging
                                           oldConfig: KafkaConfig,
                                           newConfig: KafkaConfig,
                                           validateOnly: Boolean): Unit = {
-    if (validateOnly) {
-      if (!reconfigurable.validateReconfiguration(newConfig))
-        throw new ConfigException("Validation of dynamic config update failed")
-    } else
+    if (validateOnly)
+      reconfigurable.validateReconfiguration(newConfig)
+    else
       reconfigurable.reconfigure(oldConfig, newConfig)
   }
 }
@@ -400,7 +407,7 @@ trait BrokerReconfigurable {
 
   def reconfigurableConfigs: Set[String]
 
-  def validateReconfiguration(newConfig: KafkaConfig): Boolean
+  def validateReconfiguration(newConfig: KafkaConfig): Unit
 
   def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit
 }
@@ -463,12 +470,12 @@ object DynamicThreadPool {
 
 class DynamicThreadPool(server: KafkaServer) extends BrokerReconfigurable {
 
-  override def reconfigurableConfigs(): Set[String] = {
+  override def reconfigurableConfigs: Set[String] = {
     DynamicThreadPool.ReconfigurableConfigs
   }
 
-  override def validateReconfiguration(newConfig: KafkaConfig): Boolean = {
-    newConfig.values.asScala.filterKeys(DynamicThreadPool.ReconfigurableConfigs.contains).forall
{ case (k, v) =>
+  override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
+    newConfig.values.asScala.filterKeys(DynamicThreadPool.ReconfigurableConfigs.contains).foreach
{ case (k, v) =>
       val newValue = v.asInstanceOf[Int]
       val oldValue = currentValue(k)
       if (newValue != oldValue) {
@@ -480,7 +487,6 @@ class DynamicThreadPool(server: KafkaServer) extends BrokerReconfigurable
{
         if (newValue > oldValue * 2)
           throw new ConfigException(s"$errorMsg, value should not be greater than double
the current value $oldValue")
       }
-      true
     }
   }
 
@@ -508,3 +514,78 @@ class DynamicThreadPool(server: KafkaServer) extends BrokerReconfigurable
{
     }
   }
 }
+
+class DynamicMetricsReporters(brokerId: Int, server: KafkaServer) extends Reconfigurable
{
+
+  private val dynamicConfig = server.config.dynamicConfig
+  private val metrics = server.metrics
+  private val propsOverride = Map[String, AnyRef](KafkaConfig.BrokerIdProp -> brokerId.toString)
+  private val currentReporters = mutable.Map[String, MetricsReporter]()
+
+  createReporters(dynamicConfig.currentKafkaConfig.getList(KafkaConfig.MetricReporterClassesProp),
+    Collections.emptyMap[String, Object])
+
+  private[server] def currentMetricsReporters: List[MetricsReporter] = currentReporters.values.toList
+
+  override def configure(configs: util.Map[String, _]): Unit = {}
+
+  override def reconfigurableConfigs(): util.Set[String] = {
+    val configs = new util.HashSet[String]()
+    configs.add(KafkaConfig.MetricReporterClassesProp)
+    currentReporters.values.foreach {
+      case reporter: Reconfigurable => configs.addAll(reporter.reconfigurableConfigs)
+      case _ =>
+    }
+    configs
+  }
+
+  override def validateReconfiguration(configs: util.Map[String, _]): Boolean = {
+    val updatedMetricsReporters = metricsReporterClasses(configs)
+
+    // Ensure all the reporter classes can be loaded and have a default constructor
+    updatedMetricsReporters.foreach { className =>
+      val clazz = Utils.loadClass(className, classOf[MetricsReporter])
+      clazz.getConstructor()
+    }
+
+    // Validate the new configuration using every reconfigurable reporter instance that is
not being deleted
+    currentReporters.values.forall {
+      case reporter: Reconfigurable =>
+        !updatedMetricsReporters.contains(reporter.getClass.getName) || reporter.validateReconfiguration(configs)
+      case _ => true
+    }
+  }
+
+  override def reconfigure(configs: util.Map[String, _]): Unit = {
+    val updatedMetricsReporters = metricsReporterClasses(configs)
+    val deleted = currentReporters.keySet -- updatedMetricsReporters
+    deleted.foreach(removeReporter)
+    currentReporters.values.foreach {
+      case reporter: Reconfigurable => dynamicConfig.maybeReconfigure(reporter, dynamicConfig.currentKafkaConfig,
configs)
+      case _ =>
+    }
+    val added = updatedMetricsReporters -- currentReporters.keySet
+    createReporters(added.asJava, configs)
+  }
+
+  private def createReporters(reporterClasses: util.List[String],
+                              updatedConfigs: util.Map[String, _]): Unit = {
+    val props = new util.HashMap[String, AnyRef]
+    updatedConfigs.asScala.foreach { case (k, v) => props.put(k, v.asInstanceOf[AnyRef])
}
+    propsOverride.foreach { case (k, v) => props.put(k, v) }
+    val reporters = dynamicConfig.currentKafkaConfig.getConfiguredInstances(reporterClasses,
classOf[MetricsReporter], props)
+    reporters.asScala.foreach { reporter =>
+      metrics.addReporter(reporter)
+      currentReporters += reporter.getClass.getName -> reporter
+    }
+    server.notifyClusterListeners(reporters.asScala)
+  }
+
+  private def removeReporter(className: String): Unit = {
+    currentReporters.remove(className).foreach(metrics.removeReporter)
+  }
+
+  private def metricsReporterClasses(configs: util.Map[String, _]): mutable.Buffer[String]
= {
+    configs.get(KafkaConfig.MetricReporterClassesProp).asInstanceOf[util.List[String]].asScala
+  }
+}
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index c4123f1..98e4877 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -186,10 +186,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM,
threadNameP
     try {
       info("starting")
 
-      if(isShuttingDown.get)
+      if (isShuttingDown.get)
         throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
 
-      if(startupComplete.get)
+      if (startupComplete.get)
         return
 
       val canStartup = isStartingUp.compareAndSet(false, true)
@@ -218,8 +218,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         this.logIdent = logContext.logPrefix
 
         /* create and configure metrics */
-        val reporters = config.getConfiguredInstances(KafkaConfig.MetricReporterClassesProp,
classOf[MetricsReporter],
-            Map[String, AnyRef](KafkaConfig.BrokerIdProp -> (config.brokerId.toString)).asJava)
+        val reporters = new util.ArrayList[MetricsReporter]
         reporters.add(new JmxReporter(jmxPrefix))
         val metricConfig = KafkaServer.metricConfig(config)
         metrics = new Metrics(metricConfig, reporters, time, true)
@@ -228,7 +227,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         _brokerTopicStats = new BrokerTopicStats
 
         quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
-        notifyClusterListeners(kafkaMetricsReporters ++ reporters.asScala)
+        notifyClusterListeners(kafkaMetricsReporters ++ metrics.reporters.asScala)
 
         logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
 
@@ -321,7 +320,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
     }
   }
 
-  private def notifyClusterListeners(clusterListeners: Seq[AnyRef]): Unit = {
+  private[server] def notifyClusterListeners(clusterListeners: Seq[AnyRef]): Unit = {
     val clusterResourceListeners = new ClusterResourceListeners
     clusterResourceListeners.maybeAddAll(clusterListeners.asJava)
     clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
@@ -387,7 +386,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
   }
 
   /**
-   *  Performs controlled shutdown
+   * Performs controlled shutdown
    */
   private def controlledShutdown() {
 
@@ -637,7 +636,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
     *
     * @return A 2-tuple containing the brokerId and a sequence of offline log directories.
     */
-  private def getBrokerIdAndOfflineDirs: (Int, Seq[String]) =  {
+  private def getBrokerIdAndOfflineDirs: (Int, Seq[String]) = {
     var brokerId = config.brokerId
     val brokerIdSet = mutable.HashSet[Int]()
     val offlineDirs = mutable.ArrayBuffer.empty[String]
@@ -649,24 +648,24 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM,
threadNameP
           brokerIdSet.add(brokerMetadata.brokerId)
         }
       } catch {
-        case e : IOException =>
+        case e: IOException =>
           offlineDirs += logDir
           error(s"Fail to read $brokerMetaPropsFile under log directory $logDir", e)
       }
     }
 
-    if(brokerIdSet.size > 1)
+    if (brokerIdSet.size > 1)
       throw new InconsistentBrokerIdException(
         s"Failed to match broker.id across log.dirs. This could happen if multiple brokers
shared a log directory (log.dirs) " +
         s"or partial data was manually copied from another broker. Found $brokerIdSet")
-    else if(brokerId >= 0 && brokerIdSet.size == 1 && brokerIdSet.last
!= brokerId)
+    else if (brokerId >= 0 && brokerIdSet.size == 1 && brokerIdSet.last
!= brokerId)
       throw new InconsistentBrokerIdException(
         s"Configured broker.id $brokerId doesn't match stored broker.id ${brokerIdSet.last}
in meta.properties. " +
         s"If you moved your data, make sure your configured broker.id matches. " +
         s"If you intend to create a new broker, you should remove all data in your data directories
(log.dirs).")
-    else if(brokerIdSet.isEmpty && brokerId < 0 && config.brokerIdGenerationEnable)
 // generate a new brokerId from Zookeeper
+    else if (brokerIdSet.isEmpty && brokerId < 0 && config.brokerIdGenerationEnable)
// generate a new brokerId from Zookeeper
       brokerId = generateBrokerId
-    else if(brokerIdSet.size == 1) // pick broker.id from meta.properties
+    else if (brokerIdSet.size == 1) // pick broker.id from meta.properties
       brokerId = brokerIdSet.last
 
 
@@ -678,11 +677,11 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM,
threadNameP
 
     for (logDir <- config.logDirs if logManager.isLogDirOnline(new File(logDir).getAbsolutePath))
{
       val brokerMetadataOpt = brokerMetadataCheckpoints(logDir).read()
-      if(brokerMetadataOpt.isEmpty)
+      if (brokerMetadataOpt.isEmpty)
         logDirsWithoutMetaProps ++= List(logDir)
     }
 
-    for(logDir <- logDirsWithoutMetaProps) {
+    for (logDir <- logDirsWithoutMetaProps) {
       val checkpoint = brokerMetadataCheckpoints(logDir)
       checkpoint.write(BrokerMetadata(brokerId))
     }
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 819d672..374aac2 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -18,11 +18,14 @@
 
 package kafka.server
 
+import java.io.Closeable
 import java.io.File
 import java.nio.file.{Files, StandardCopyOption}
+import java.lang.management.ManagementFactory
 import java.util
 import java.util.{Collections, Properties}
-import java.util.concurrent.{ExecutionException, TimeUnit}
+import java.util.concurrent.{ConcurrentLinkedQueue, ExecutionException, TimeUnit}
+import javax.management.ObjectName
 
 import kafka.api.SaslSetup
 import kafka.log.LogConfig
@@ -35,12 +38,13 @@ import org.apache.kafka.clients.admin.ConfigEntry.{ConfigSource, ConfigSynonym}
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, Reconfigurable,
TopicPartition}
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.config.SslConfigs._
 import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.errors.{AuthenticationException, InvalidRequestException}
 import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.metrics.{KafkaMetric, MetricsReporter}
 import org.apache.kafka.common.network.{ListenerName, Mode}
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.security.auth.SecurityProtocol
@@ -48,6 +52,7 @@ import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializ
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
+import scala.collection._
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.JavaConverters._
 
@@ -93,6 +98,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
       props.put(KafkaConfig.ZkEnableSecureAclsProp, "true")
       props.put(KafkaConfig.SaslEnabledMechanismsProp, kafkaServerSaslMechanisms.mkString(","))
       props.put(KafkaConfig.LogSegmentBytesProp, "2000")
+      props.put(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, "10000000")
 
       props ++= sslProperties1
       addKeystoreWithListenerPrefix(sslProperties1, props, SecureInternal)
@@ -112,6 +118,8 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
 
     TestUtils.createTopic(zkClient, topic, numPartitions = 10, replicationFactor = numServers,
servers)
     createAdminClient(SecurityProtocol.SSL, SecureInternal)
+
+    TestMetricsReporter.testReporters.clear()
   }
 
   @After
@@ -351,15 +359,6 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
     props.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, "1000")
     reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.LogMessageTimestampTypeProp,
TimestampType.CREATE_TIME.toString))
     consumerThread.waitForMatchingRecords(record => record.timestampType == TimestampType.CREATE_TIME)
-
-    // Verify that even though broker defaults can be defined at default cluster level for
consistent
-    // configuration across brokers, they can also be defined at per-broker level for testing
-    props.clear()
-    props.put(KafkaConfig.LogIndexSizeMaxBytesProp, "500000")
-    alterConfigsOnServer(servers.head, props)
-    assertEquals(500000, servers.head.config.values.get(KafkaConfig.LogIndexSizeMaxBytesProp))
-    servers.tail.foreach { server => assertEquals(Defaults.LogIndexSizeMaxBytes, server.config.values.get(KafkaConfig.LogIndexSizeMaxBytesProp))
}
-
     // Verify that invalid configs are not applied
     val invalidProps = Map(
       KafkaConfig.LogMessageTimestampDifferenceMaxMsProp -> "abc", // Invalid type
@@ -373,6 +372,14 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
       reconfigureServers(props, perBrokerConfig = false, (k, props.getProperty(k)), expectFailure
= true)
     }
 
+    // Verify that even though broker defaults can be defined at default cluster level for
consistent
+    // configuration across brokers, they can also be defined at per-broker level for testing
+    props.clear()
+    props.put(KafkaConfig.LogIndexSizeMaxBytesProp, "500000")
+    alterConfigsOnServer(servers.head, props)
+    assertEquals(500000, servers.head.config.values.get(KafkaConfig.LogIndexSizeMaxBytesProp))
+    servers.tail.foreach { server => assertEquals(Defaults.LogIndexSizeMaxBytes, server.config.values.get(KafkaConfig.LogIndexSizeMaxBytesProp))
}
+
     // Verify that produce/consume worked throughout this test without any retries in producer
     stopAndVerifyProduceConsume(producerThread, consumerThread, mayFailRequests = false)
   }
@@ -449,6 +456,81 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
       "", mayFailRequests = false)
   }
 
+  @Test
+  def testMetricsReporterUpdate(): Unit = {
+    // Add a new metrics reporter
+    val newProps = new Properties
+    newProps.put(TestMetricsReporter.PollingIntervalProp, "100")
+    configureMetricsReporters(Seq(classOf[TestMetricsReporter]), newProps)
+
+    val reporters = TestMetricsReporter.waitForReporters(servers.size)
+    reporters.foreach { reporter =>
+      reporter.verifyState(reconfigureCount = 0, deleteCount = 0, pollingInterval = 100)
+      assertFalse("No metrics found", reporter.kafkaMetrics.isEmpty)
+      reporter.verifyMetricValue("request-total", "socket-server-metrics")
+    }
+
+    val clientId = "test-client-1"
+    val (producerThread, consumerThread) = startProduceConsume(retries = 0, clientId)
+    TestUtils.waitUntilTrue(() => consumerThread.received >= 5, "Messages not sent")
+
+    // Verify that JMX reporter is still active (test a metric registered after the dynamic
reporter update)
+    val mbeanServer = ManagementFactory.getPlatformMBeanServer
+    val byteRate = mbeanServer.getAttribute(new ObjectName(s"kafka.server:type=Produce,client-id=$clientId"),
"byte-rate")
+    assertTrue("JMX attribute not updated", byteRate.asInstanceOf[Double] > 0)
+
+    // Property not related to the metrics reporter config should not reconfigure reporter
+    newProps.setProperty("some.prop", "some.value")
+    reconfigureServers(newProps, perBrokerConfig = false, (TestMetricsReporter.PollingIntervalProp,
"100"))
+    reporters.foreach(_.verifyState(reconfigureCount = 0, deleteCount = 0, pollingInterval
= 100))
+
+    // Update of custom config of metrics reporter should reconfigure reporter
+    newProps.put(TestMetricsReporter.PollingIntervalProp, "1000")
+    reconfigureServers(newProps, perBrokerConfig = false, (TestMetricsReporter.PollingIntervalProp,
"1000"))
+    reporters.foreach(_.verifyState(reconfigureCount = 1, deleteCount = 0, pollingInterval
= 1000))
+
+    // Verify removal of metrics reporter
+    configureMetricsReporters(Seq.empty[Class[_]], newProps)
+    reporters.foreach(_.verifyState(reconfigureCount = 1, deleteCount = 1, pollingInterval
= 1000))
+    TestMetricsReporter.testReporters.clear()
+
+    // Verify recreation of metrics reporter
+    newProps.put(TestMetricsReporter.PollingIntervalProp, "2000")
+    configureMetricsReporters(Seq(classOf[TestMetricsReporter]), newProps)
+    val newReporters = TestMetricsReporter.waitForReporters(servers.size)
+    newReporters.foreach(_.verifyState(reconfigureCount = 0, deleteCount = 0, pollingInterval
= 2000))
+
+    // Verify that validation failure of metrics reporter fails reconfiguration and leaves
config unchanged
+    newProps.put(KafkaConfig.MetricReporterClassesProp, "unknownMetricsReporter")
+    reconfigureServers(newProps, perBrokerConfig = false, (TestMetricsReporter.PollingIntervalProp,
"2000"), expectFailure = true)
+    servers.foreach { server =>
+      assertEquals(classOf[TestMetricsReporter].getName, server.config.originals.get(KafkaConfig.MetricReporterClassesProp))
+    }
+    newReporters.foreach(_.verifyState(reconfigureCount = 1, deleteCount = 0, pollingInterval
= 2000))
+
+    // Verify that validation failure of custom config fails reconfiguration and leaves config
unchanged
+    newProps.put(TestMetricsReporter.PollingIntervalProp, "invalid")
+    reconfigureServers(newProps, perBrokerConfig = false, (TestMetricsReporter.PollingIntervalProp,
"2000"), expectFailure = true)
+    newReporters.foreach(_.verifyState(reconfigureCount = 1, deleteCount = 0, pollingInterval
= 2000))
+
+    // Delete reporters
+    configureMetricsReporters(Seq.empty[Class[_]], newProps)
+    TestMetricsReporter.testReporters.clear()
+
+    // Verify that even though metrics reporters can be defined at default cluster level
for consistent
+    // configuration across brokers, they can also be defined at per-broker level for testing
+    newProps.put(KafkaConfig.MetricReporterClassesProp, classOf[TestMetricsReporter].getName)
+    newProps.put(TestMetricsReporter.PollingIntervalProp, "4000")
+    alterConfigsOnServer(servers.head, newProps)
+    TestUtils.waitUntilTrue(() => !TestMetricsReporter.testReporters.isEmpty, "Metrics
reporter not created")
+    val perBrokerReporter = TestMetricsReporter.waitForReporters(1).head
+    perBrokerReporter.verifyState(reconfigureCount = 1, deleteCount = 0, pollingInterval
= 4000)
+    servers.tail.foreach { server => assertEquals("", server.config.originals.get(KafkaConfig.MetricReporterClassesProp))
}
+
+    // Verify that produce/consume worked throughout this test without any retries in producer
+    stopAndVerifyProduceConsume(producerThread, consumerThread)
+  }
+
   private def createProducer(trustStore: File, retries: Int,
                              clientId: String = "test-producer"): KafkaProducer[String, String]
= {
     val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName(SecureExternal))
@@ -575,7 +657,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
         val exception = intercept[ExecutionException](alterResult.values.get(brokerResource).get)
         assertTrue(exception.getCause.isInstanceOf[InvalidRequestException])
       }
-      assertEquals(oldProps, servers.head.config.values.asScala.filterKeys(newProps.containsKey))
+      servers.foreach { server => assertEquals(oldProps, server.config.values.asScala.filterKeys(newProps.containsKey))
}
     } else {
       alterResult.all.get
       waitForConfig(aPropToVerify._1, aPropToVerify._2)
@@ -619,6 +701,13 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
     }
   }
 
+  private def configureMetricsReporters(reporters: Seq[Class[_]], props: Properties,
+                                       perBrokerConfig: Boolean = false): Unit = {
+    val reporterStr = reporters.map(_.getName).mkString(",")
+    props.put(KafkaConfig.MetricReporterClassesProp, reporterStr)
+    reconfigureServers(props, perBrokerConfig, (KafkaConfig.MetricReporterClassesProp, reporterStr))
+  }
+
   private def invalidSslConfigs: Properties = {
     val props = new Properties
     props.put(SSL_KEYSTORE_LOCATION_CONFIG, "invalid/file/path")
@@ -644,8 +733,8 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
     assertTrue(s"Invalid threads: expected $expectedCount, got ${threads.size}: $threads",
resized)
   }
 
-  private def startProduceConsume(retries: Int): (ProducerThread, ConsumerThread) = {
-    val producerThread = new ProducerThread(retries)
+  private def startProduceConsume(retries: Int, producerClientId: String = "test-producer"):
(ProducerThread, ConsumerThread) = {
+    val producerThread = new ProducerThread(producerClientId, retries)
     clientThreads += producerThread
     val consumerThread = new ConsumerThread(producerThread)
     clientThreads += consumerThread
@@ -656,7 +745,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
   }
 
   private def stopAndVerifyProduceConsume(producerThread: ProducerThread, consumerThread:
ConsumerThread,
-                                                                                   mayFailRequests:
Boolean): Unit = {
+                                          mayFailRequests: Boolean = false): Unit = {
     TestUtils.waitUntilTrue(() => producerThread.sent >= 10, "Messages not sent")
     producerThread.shutdown()
     consumerThread.initiateShutdown()
@@ -669,8 +758,8 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
     }
   }
 
-  private class ProducerThread(retries: Int) extends ShutdownableThread("test-producer",
isInterruptible = false) {
-    private val producer = createProducer(trustStoreFile1, retries)
+  private class ProducerThread(clientId: String, retries: Int) extends ShutdownableThread(clientId,
isInterruptible = false) {
+    private val producer = createProducer(trustStoreFile1, retries, clientId)
     @volatile var sent = 0
     override def doWork(): Unit = {
         try {
@@ -719,3 +808,83 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
     }
   }
 }
+
+object TestMetricsReporter {
+  val PollingIntervalProp = "polling.interval"
+  val testReporters = new ConcurrentLinkedQueue[TestMetricsReporter]()
+
+  def waitForReporters(count: Int): List[TestMetricsReporter] = {
+    TestUtils.waitUntilTrue(() => testReporters.size == count, msg = "Metrics reporters
not created")
+
+    val reporters = testReporters.asScala.toList
+    TestUtils.waitUntilTrue(() => reporters.forall(_.configureCount == 1), msg = "Metrics
reporters not configured")
+    reporters
+  }
+}
+
+class TestMetricsReporter extends MetricsReporter with Reconfigurable with Closeable with
ClusterResourceListener {
+  import TestMetricsReporter._
+  val kafkaMetrics = ArrayBuffer[KafkaMetric]()
+  @volatile var initializeCount = 0
+  @volatile var configureCount = 0
+  @volatile var reconfigureCount = 0
+  @volatile var closeCount = 0
+  @volatile var clusterUpdateCount = 0
+  @volatile var pollingInterval: Int = -1
+  testReporters.add(this)
+
+  override def init(metrics: util.List[KafkaMetric]): Unit = {
+    kafkaMetrics ++= metrics.asScala
+    initializeCount += 1
+  }
+
+  override def configure(configs: util.Map[String, _]): Unit = {
+    configureCount += 1
+    pollingInterval = configs.get(PollingIntervalProp).toString.toInt
+  }
+
+  override def metricChange(metric: KafkaMetric): Unit = {
+  }
+
+  override def metricRemoval(metric: KafkaMetric): Unit = {
+    kafkaMetrics -= metric
+  }
+
+  override def onUpdate(clusterResource: ClusterResource): Unit = {
+    assertNotNull("Cluster id not set", clusterResource.clusterId)
+    clusterUpdateCount += 1
+  }
+
+  override def reconfigurableConfigs(): util.Set[String] = {
+    Set(PollingIntervalProp).asJava
+  }
+
+  override def validateReconfiguration(configs: util.Map[String, _]): Boolean = {
+    configs.get(PollingIntervalProp).toString.toInt > 0
+  }
+
+  override def reconfigure(configs: util.Map[String, _]): Unit = {
+    reconfigureCount += 1
+    pollingInterval = configs.get(PollingIntervalProp).toString.toInt
+  }
+
+  override def close(): Unit = {
+    closeCount += 1
+  }
+
+  def verifyState(reconfigureCount: Int, deleteCount: Int, pollingInterval: Int): Unit =
{
+    assertEquals(1, initializeCount)
+    assertEquals(1, configureCount)
+    assertEquals(reconfigureCount, reconfigureCount)
+    assertEquals(deleteCount, closeCount)
+    assertEquals(1, clusterUpdateCount)
+    assertEquals(pollingInterval, this.pollingInterval)
+  }
+
+  def verifyMetricValue(name: String, group: String): Unit = {
+    val matchingMetrics = kafkaMetrics.filter(metric => metric.metricName.name == name
&& metric.metricName.group == group)
+    assertTrue("Metric not found", matchingMetrics.nonEmpty)
+    val total = matchingMetrics.foldLeft(0.0)((total, metric) => total + metric.metricValue.asInstanceOf[Double])
+    assertTrue("Invalid metric value", total > 0.0)
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.

Mime
View raw message