kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject kafka git commit: KAFKA-1901; Move Kafka version to be generated in code by build (instead of in manifest); reviewed by Ismael Juma, Joel Koshy, Jason Rosenberg
Date Thu, 20 Aug 2015 20:28:05 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 767a8a762 -> 9a6b57334


KAFKA-1901; Move Kafka version to be generated in code by build (instead of in manifest);
reviewed by Ismael Juma, Joel Koshy, Jason Rosenberg


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9a6b5733
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9a6b5733
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9a6b5733

Branch: refs/heads/trunk
Commit: 9a6b57334953ef86603454e5b0d1287ee4281d03
Parents: 767a8a7
Author: Manikumar Reddy <manikumar.reddy@gmail.com>
Authored: Thu Aug 20 13:24:14 2015 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Thu Aug 20 13:24:14 2015 -0700

----------------------------------------------------------------------
 build.gradle                                    | 44 +++++++++
 .../kafka/clients/consumer/KafkaConsumer.java   | 10 +-
 .../kafka/clients/producer/KafkaProducer.java   | 10 +-
 .../kafka/common/utils/AppInfoParser.java       | 97 ++++++++++++++++++++
 core/src/main/scala/kafka/common/AppInfo.scala  | 44 ++++-----
 .../main/scala/kafka/server/KafkaServer.scala   |  4 +-
 .../kafka/server/KafkaServerStartable.scala     |  1 -
 7 files changed, 174 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9a6b5733/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 17fc223..a9bf059 100644
--- a/build.gradle
+++ b/build.gradle
@@ -43,6 +43,7 @@ allprojects {
 
 ext {
     gradleVersion = "2.4"
+    buildVersionFileName = "kafka-version.properties"
 
     skipSigning = project.hasProperty('skipSigning') && skipSigning.toBoolean()
     shouldSign = !skipSigning && !version.endsWith("SNAPSHOT")
@@ -396,6 +397,49 @@ project(':clients') {
     testRuntime "$slf4jlog4j"
   }
 
+  task determineCommitId {
+    ext.commitId = "unknown"
+    def takeFromHash = 16
+    if (file("../.git/HEAD").exists()) {
+      def headRef = file("../.git/HEAD").text
+      if (headRef.contains('ref: ')) {
+        headRef = headRef.replaceAll('ref: ', '').trim()
+        commitId = file("../.git/$headRef").text.trim().take(takeFromHash)
+      } else {
+        commitId = headRef.trim().take(takeFromHash)
+      }
+    } else {
+      commitId
+    }
+  }
+
+  task createVersionFile(dependsOn: determineCommitId) {
+    ext.receiptFile = file("$buildDir/kafka/$buildVersionFileName")
+    outputs.file receiptFile
+    outputs.upToDateWhen { false }
+    doLast {
+      def data = [
+        commitId: determineCommitId.commitId,
+        version: version,
+      ]
+
+      receiptFile.parentFile.mkdirs()
+      def content = data.entrySet().collect { "$it.key=$it.value" }.sort().join("\n")
+      receiptFile.setText(content, "ISO-8859-1")
+    }
+  }
+
+  jar {
+    dependsOn createVersionFile
+    from("$buildDir") {
+        include "kafka/$buildVersionFileName"
+    }
+  }
+
+  clean.doFirst {
+    delete "$buildDir/kafka/"
+  }
+
   task testJar(type: Jar) {
     classifier = 'test'
     from sourceSets.test.output

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a6b5733/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 3749880..8b54acd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -34,6 +34,7 @@ import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.network.Selector;
 import org.apache.kafka.common.network.ChannelBuilder;
+import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -397,7 +398,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
     private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
     private static final long NO_CURRENT_THREAD = -1L;
     private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
+    private static final String JMX_PREFIX = "kafka.consumer";
 
+    private String clientId;
     private final Coordinator coordinator;
     private final Deserializer<K> keyDeserializer;
     private final Deserializer<V> valueDeserializer;
@@ -512,13 +515,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
             MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
                     .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
                             TimeUnit.MILLISECONDS);
-            String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
-            String jmxPrefix = "kafka.consumer";
+            clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
             if (clientId.length() <= 0)
                 clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
             List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
                     MetricsReporter.class);
-            reporters.add(new JmxReporter(jmxPrefix));
+            reporters.add(new JmxReporter(JMX_PREFIX));
             this.metrics = new Metrics(metricConfig, reporters, time);
             this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
             this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG));
@@ -582,6 +584,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
                     this.retryBackoffMs);
 
             config.logUnused();
+            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
 
             if (autoCommit)
                 scheduleAutoCommitTask(autoCommitIntervalMs);
@@ -1107,6 +1110,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
         ClientUtils.closeQuietly(client, "consumer network client", firstException);
         ClientUtils.closeQuietly(keyDeserializer, "consumer key deserializer", firstException);
         ClientUtils.closeQuietly(valueDeserializer, "consumer value deserializer", firstException);
+        AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId);
         log.debug("The Kafka consumer has closed.");
         if (firstException.get() != null && !swallowException) {
             throw new KafkaException("Failed to close kafka consumer", firstException.get());

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a6b5733/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index c4621e2..804d569 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -48,6 +48,7 @@ import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.KafkaThread;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
@@ -122,7 +123,9 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
 
     private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
     private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
+    private static final String JMX_PREFIX = "kafka.producer";
 
+    private String clientId;
     private final Partitioner partitioner;
     private final int maxRequestSize;
     private final long metadataFetchTimeoutMs;
@@ -199,13 +202,12 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
             MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
                     .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
                             TimeUnit.MILLISECONDS);
-            String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
+            clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
             if (clientId.length() <= 0)
                 clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
-            String jmxPrefix = "kafka.producer";
             List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
                     MetricsReporter.class);
-            reporters.add(new JmxReporter(jmxPrefix));
+            reporters.add(new JmxReporter(JMX_PREFIX));
             this.metrics = new Metrics(metricConfig, reporters, time);
             this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG,
Partitioner.class);
             long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
@@ -267,6 +269,7 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
                 this.valueSerializer = valueSerializer;
             }
             config.logUnused();
+            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
             log.debug("Kafka producer started");
         } catch (Throwable t) {
             // call close methods if internal objects are already constructed
@@ -601,6 +604,7 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
         ClientUtils.closeQuietly(metrics, "producer metrics", firstException);
         ClientUtils.closeQuietly(keySerializer, "producer keySerializer", firstException);
         ClientUtils.closeQuietly(valueSerializer, "producer valueSerializer", firstException);
+        AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId);
         log.debug("The Kafka producer has closed.");
         if (firstException.get() != null && !swallowException)
             throw new KafkaException("Failed to close kafka producer", firstException.get());

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a6b5733/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java b/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
new file mode 100644
index 0000000..d0fd12a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.lang.management.ManagementFactory;
+import java.util.Properties;
+
+import javax.management.JMException;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AppInfoParser {
+    private static final Logger log = LoggerFactory.getLogger(AppInfoParser.class);
+    private static String version = "unknown";
+    private static String commitId = "unknown";
+
+    static {
+        try {
+            Properties props = new Properties();
+            props.load(AppInfoParser.class.getResourceAsStream("/kafka/kafka-version.properties"));
+            version = props.getProperty("version", version).trim();
+            commitId = props.getProperty("commitId", commitId).trim();
+        } catch (Exception e) {
+            log.warn("Error while loading kafka-version.properties :" + e.getMessage());
+        }
+    }
+
+    public static String getVersion() {
+        return version;
+    }
+
+    public static String getCommitId() {
+        return commitId;
+    }
+
+    public static void registerAppInfo(String prefix, String id) {
+        try {
+            ObjectName name = new ObjectName(prefix + ":type=app-info,id=" + id);
+            AppInfo mBean = new AppInfo();
+            ManagementFactory.getPlatformMBeanServer().registerMBean(mBean, name);
+        } catch (JMException e) {
+            log.warn("Error registering AppInfo mbean", e);
+        }
+    }
+
+    public static void unregisterAppInfo(String prefix, String id) {
+        MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+        try {
+            ObjectName name = new ObjectName(prefix + ":type=app-info,id=" + id);
+            if (server.isRegistered(name))
+                server.unregisterMBean(name);
+        } catch (JMException e) {
+            log.warn("Error unregistering AppInfo mbean", e);
+        }
+    }
+
+    public interface AppInfoMBean {
+        public String getVersion();
+        public String getCommitId();
+    }
+
+    public static class AppInfo implements AppInfoMBean {
+
+        public AppInfo() {
+            log.info("Kafka version : " + AppInfoParser.getVersion());
+            log.info("Kafka commitId : " + AppInfoParser.getCommitId());
+        }
+
+        @Override
+        public String getVersion() {
+            return AppInfoParser.getVersion();
+        }
+
+        @Override
+        public String getCommitId() {
+            return AppInfoParser.getCommitId();
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a6b5733/core/src/main/scala/kafka/common/AppInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/AppInfo.scala b/core/src/main/scala/kafka/common/AppInfo.scala
index d642ca5..8e2f49d 100644
--- a/core/src/main/scala/kafka/common/AppInfo.scala
+++ b/core/src/main/scala/kafka/common/AppInfo.scala
@@ -17,11 +17,9 @@
 
 package kafka.common
 
-import java.net.URL
-import java.util.jar.{Attributes, Manifest}
-
 import com.yammer.metrics.core.Gauge
 import kafka.metrics.KafkaMetricsGroup
+import org.apache.kafka.common.utils.AppInfoParser
 
 object AppInfo extends KafkaMetricsGroup {
   private var isRegistered = false
@@ -34,33 +32,23 @@ object AppInfo extends KafkaMetricsGroup {
       }
     }
 
-    try {
-      val clazz = AppInfo.getClass
-      val className = clazz.getSimpleName + ".class"
-      val classPath = clazz.getResource(className).toString
-      if (!classPath.startsWith("jar")) {
-        // Class not from JAR
-        return
-      }
-      val manifestPath = classPath.substring(0, classPath.lastIndexOf("!") + 1) + "/META-INF/MANIFEST.MF"
-
-      val mf = new Manifest
-      mf.read(new URL(manifestPath).openStream())
-      val version = mf.getMainAttributes.get(new Attributes.Name("Version")).toString
+    newGauge("Version",
+      new Gauge[String] {
+        def value = {
+          AppInfoParser.getVersion()
+        }
+      })
 
-      newGauge("Version",
-        new Gauge[String] {
-          def value = {
-            version
-          }
-        })
+    newGauge("CommitID",
+      new Gauge[String] {
+        def value = {
+          AppInfoParser.getCommitId();
+        }
+      })
 
-      lock.synchronized {
-        isRegistered = true
-      }
-    } catch {
-      case e: Exception =>
-        warn("Can't read Kafka version from MANIFEST.MF. Possible cause: %s".format(e))
+    lock.synchronized {
+      isRegistered = true
     }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a6b5733/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 0e7ba3e..17db4fa 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.common.metrics._
 import org.apache.kafka.common.network.NetworkReceive
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.metrics.{JmxReporter, Metrics}
+import org.apache.kafka.common.utils.AppInfoParser
 
 import scala.collection.mutable
 import org.I0Itec.zkclient.ZkClient
@@ -44,7 +45,6 @@ import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
 import kafka.coordinator.{GroupManagerConfig, ConsumerCoordinator}
 
-
 object KafkaServer {
   // Copy the subset of properties that are relevant to Logs
   // I'm listing out individual properties here since the names are slightly different in
each Config class...
@@ -212,6 +212,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends
Logg
         shutdownLatch = new CountDownLatch(1)
         startupComplete.set(true)
         isStartingUp.set(false)
+        AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString)
         info("started")
       }
     }
@@ -385,6 +386,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends
Logg
 
         startupComplete.set(false)
         isShuttingDown.set(false)
+        AppInfoParser.unregisterAppInfo(jmxPrefix, config.brokerId.toString)
         shutdownLatch.countDown()
         info("shut down completed")
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a6b5733/core/src/main/scala/kafka/server/KafkaServerStartable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServerStartable.scala b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
index 1c1b75b..0001331 100644
--- a/core/src/main/scala/kafka/server/KafkaServerStartable.scala
+++ b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
@@ -27,7 +27,6 @@ class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging
{
   def startup() {
     try {
       server.startup()
-      AppInfo.registerInfo()
     }
     catch {
       case e: Throwable =>


Mime
View raw message