kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-4756; The auto-generated broker id should be passed to MetricRe…
Date Mon, 13 Feb 2017 21:46:55 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 3ac10d5ae -> dfbe944e9


KAFKA-4756; The auto-generated broker id should be passed to MetricRe…

…porter.configure

Author: Colin P. Mccabe <cmccabe@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>

Closes #2540 from cmccabe/KAFKA-4756

(cherry picked from commit ed91af512fcb5c0ea3032ed259451817c96b5ee1)
Signed-off-by: Jun Rao <junrao@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dfbe944e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dfbe944e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dfbe944e

Branch: refs/heads/0.10.2
Commit: dfbe944e96e4f26bb99bf122e8cac122d069764c
Parents: 3ac10d5
Author: Colin P. Mccabe <cmccabe@confluent.io>
Authored: Mon Feb 13 13:46:37 2017 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Mon Feb 13 13:46:50 2017 -0800

----------------------------------------------------------------------
 .../kafka/common/config/AbstractConfig.java     | 17 ++++++++++++-
 .../apache/kafka/common/metrics/Metrics.java    |  4 ++++
 .../main/scala/kafka/server/KafkaConfig.scala   |  2 +-
 .../main/scala/kafka/server/KafkaServer.scala   | 23 +++++++++---------
 .../KafkaMetricReporterClusterIdTest.scala      | 25 +++++++++++++++++++-
 5 files changed, 57 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dfbe944e/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 096047f..5bc0570 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -217,10 +217,25 @@ public class AbstractConfig {
      * @return The list of configured instances
      */
     public <T> List<T> getConfiguredInstances(String key, Class<T> t) {
+        return getConfiguredInstances(key, t, Collections.EMPTY_MAP);
+    }
+
+    /**
+     * Get a list of configured instances of the given class specified by the given configuration
key. The configuration
+     * may specify either null or an empty string to indicate no configured instances. In
both cases, this method
+     * returns an empty list to indicate no configured instances.
+     * @param key The configuration key for the class
+     * @param t The interface the class should implement
+     * @param configOverrides Configuration overrides to use.
+     * @return The list of configured instances
+     */
+    public <T> List<T> getConfiguredInstances(String key, Class<T> t, Map<String,
Object> configOverrides) {
         List<String> klasses = getList(key);
         List<T> objects = new ArrayList<T>();
         if (klasses == null)
             return objects;
+        Map<String, Object> configPairs = originals();
+        configPairs.putAll(configOverrides);
         for (Object klass : klasses) {
             Object o;
             if (klass instanceof String) {
@@ -236,7 +251,7 @@ public class AbstractConfig {
             if (!t.isInstance(o))
                 throw new KafkaException(klass + " is not an instance of " + t.getName());
             if (o instanceof Configurable)
-                ((Configurable) o).configure(originals());
+                ((Configurable) o).configure(configPairs);
             objects.add(t.cast(o));
         }
         return objects;

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfbe944e/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index 7b303fa..512c18e 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -443,6 +443,10 @@ public class Metrics implements Closeable {
         return this.metrics;
     }
 
+    public List<MetricsReporter> reporters() {
+        return this.reporters;
+    }
+
     public KafkaMetric metric(MetricName metricName) {
         return this.metrics.get(metricName);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfbe944e/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index c4f54c9..2577543 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -824,6 +824,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends
Abstra
   val brokerIdGenerationEnable: Boolean = getBoolean(KafkaConfig.BrokerIdGenerationEnableProp)
   val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp)
   var brokerId: Int = getInt(KafkaConfig.BrokerIdProp)
+
   val numNetworkThreads = getInt(KafkaConfig.NumNetworkThreadsProp)
   val backgroundThreads = getInt(KafkaConfig.BackgroundThreadsProp)
   val queuedMaxRequests = getInt(KafkaConfig.QueuedMaxRequestsProp)
@@ -940,7 +941,6 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends
Abstra
   /** ********* Metric Configuration **************/
   val metricNumSamples = getInt(KafkaConfig.MetricNumSamplesProp)
   val metricSampleWindowMs = getLong(KafkaConfig.MetricSampleWindowMsProp)
-  val metricReporterClasses: java.util.List[MetricsReporter] = getConfiguredInstances(KafkaConfig.MetricReporterClassesProp,
classOf[MetricsReporter])
   val metricRecordingLevel = getString(KafkaConfig.MetricRecordingLevelProp)
 
   /** ********* SSL Configuration **************/

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfbe944e/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index b5075f9..68a62a4 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -101,13 +101,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM,
threadNameP
   private var shutdownLatch = new CountDownLatch(1)
 
   private val jmxPrefix: String = "kafka.server"
-  private val reporters: java.util.List[MetricsReporter] = config.metricReporterClasses
-  reporters.add(new JmxReporter(jmxPrefix))
 
   var metrics: Metrics = null
 
-  private val metricConfig: MetricConfig = KafkaServer.metricConfig(config)
-
   val brokerState: BrokerState = new BrokerState
 
   var apis: KafkaApis = null
@@ -182,9 +178,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
       val canStartup = isStartingUp.compareAndSet(false, true)
       if (canStartup) {
-        metrics = new Metrics(metricConfig, reporters, time, true)
-        quotaManagers = QuotaFactory.instantiate(config, metrics, time)
-
         brokerState.newState(Starting)
 
         /* start scheduler */
@@ -197,16 +190,24 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM,
threadNameP
         _clusterId = getOrGenerateClusterId(zkUtils)
         info(s"Cluster ID = $clusterId")
 
+        /* generate brokerId */
+        config.brokerId =  getBrokerId
+        this.logIdent = "[Kafka Server " + config.brokerId + "], "
+
+        /* create and configure metrics */
+        val reporters = config.getConfiguredInstances(KafkaConfig.MetricReporterClassesProp,
classOf[MetricsReporter],
+            Map[String, AnyRef](KafkaConfig.BrokerIdProp -> (config.brokerId.toString)).asJava)
+        reporters.add(new JmxReporter(jmxPrefix))
+        val metricConfig = KafkaServer.metricConfig(config)
+        metrics = new Metrics(metricConfig, reporters, time, true)
+
+        quotaManagers = QuotaFactory.instantiate(config, metrics, time)
         notifyClusterListeners(kafkaMetricsReporters ++ reporters.asScala)
 
         /* start log manager */
         logManager = createLogManager(zkUtils.zkClient, brokerState)
         logManager.startup()
 
-        /* generate brokerId */
-        config.brokerId =  getBrokerId
-        this.logIdent = "[Kafka Server " + config.brokerId + "], "
-
         metadataCache = new MetadataCache(config.brokerId)
         credentialProvider = new CredentialProvider(config.saslEnabledMechanisms)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfbe944e/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala
index d235d02..dfcb4ac 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala
@@ -28,6 +28,7 @@ import org.junit.{After, Before, Test}
 import org.apache.kafka.test.TestUtils.isValidClusterId
 
 object KafkaMetricReporterClusterIdTest {
+  val setupError = new AtomicReference[String]("")
 
   class MockKafkaMetricsReporter extends KafkaMetricsReporter with ClusterResourceListener
{
 
@@ -52,8 +53,26 @@ object KafkaMetricReporterClusterIdTest {
     override def onUpdate(clusterMetadata: ClusterResource) {
       MockBrokerMetricsReporter.CLUSTER_META.set(clusterMetadata)
     }
-  }
 
+    override def configure(configs: java.util.Map[String, _]): Unit = {
+      // Check that the configuration passed to the MetricsReporter includes the broker id
as an Integer.
+      // This is a regression test for KAFKA-4756.
+      //
+      // Because this code is run during the test setUp phase, if we throw an exception here,
+      // it just results in the test itself being declared "not found" rather than failing.
+      // So we track an error message which we will check later in the test body.
+      val brokerId = configs.get(KafkaConfig.BrokerIdProp)
+      if (brokerId == null)
+        setupError.compareAndSet("", "No value was set for the broker id.")
+      else if (!brokerId.isInstanceOf[String])
+        setupError.compareAndSet("", "The value set for the broker id was not a string.")
+      try
+        Integer.parseInt(brokerId.asInstanceOf[String])
+      catch {
+        case e: Exception => setupError.compareAndSet("", "Error parsing broker id " +
e.toString)
+      }
+    }
+  }
 }
 
 class KafkaMetricReporterClusterIdTest extends ZooKeeperTestHarness {
@@ -66,6 +85,8 @@ class KafkaMetricReporterClusterIdTest extends ZooKeeperTestHarness {
     val props = TestUtils.createBrokerConfig(1, zkConnect)
     props.setProperty("kafka.metrics.reporters", "kafka.server.KafkaMetricReporterClusterIdTest$MockKafkaMetricsReporter")
     props.setProperty(KafkaConfig.MetricReporterClassesProp, "kafka.server.KafkaMetricReporterClusterIdTest$MockBrokerMetricsReporter")
+    props.setProperty(KafkaConfig.BrokerIdGenerationEnableProp, "true")
+    props.setProperty(KafkaConfig.BrokerIdProp, "-1")
     config = KafkaConfig.fromProps(props)
     server = KafkaServerStartable.fromProps(props)
     server.startup()
@@ -73,6 +94,8 @@ class KafkaMetricReporterClusterIdTest extends ZooKeeperTestHarness {
 
   @Test
   def testClusterIdPresent() {
+    assertEquals("", KafkaMetricReporterClusterIdTest.setupError.get())
+
     assertNotNull(KafkaMetricReporterClusterIdTest.MockKafkaMetricsReporter.CLUSTER_META)
     isValidClusterId(KafkaMetricReporterClusterIdTest.MockKafkaMetricsReporter.CLUSTER_META.get().clusterId())
 


Mime
View raw message