kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-5867: Log Kafka Connect worker info during startup
Date Thu, 28 Sep 2017 05:07:43 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 8e0e2a5b2 -> 1444b7b59


KAFKA-5867: Log Kafka Connect worker info during startup

Author: Konstantine Karantasis <konstantine@confluent.io>

Reviewers: Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #3932 from kkonstantine/KAFKA-5867-Kafka-Connect-applications-should-log-info-message-when-starting-up


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1444b7b5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1444b7b5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1444b7b5

Branch: refs/heads/trunk
Commit: 1444b7b594f9c0c426a3631ec43d77b19c8f4373
Parents: 8e0e2a5
Author: Konstantine Karantasis <konstantine@confluent.io>
Authored: Wed Sep 27 22:07:37 2017 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Wed Sep 27 22:07:37 2017 -0700

----------------------------------------------------------------------
 .../kafka/connect/cli/ConnectDistributed.java   |  10 +-
 .../kafka/connect/cli/ConnectStandalone.java    |  12 ++-
 .../kafka/connect/runtime/WorkerInfo.java       | 107 +++++++++++++++++++
 3 files changed, 126 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1444b7b5/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 717ccd9..1b2f94e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.runtime.Connect;
 import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.WorkerInfo;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
@@ -55,11 +56,17 @@ public class ConnectDistributed {
             Exit.exit(1);
         }
 
+        Time time = Time.SYSTEM;
+        log.info("Kafka Connect distributed worker initializing ...");
+        long initStart = time.hiResClockMs();
+        WorkerInfo initInfo = new WorkerInfo();
+        initInfo.logAll();
+
         String workerPropsFile = args[0];
         Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
                 Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String,
String>emptyMap();
 
-        Time time = Time.SYSTEM;
+        log.info("Scanning for plugin classes. This might take a moment ...");
         Plugins plugins = new Plugins(workerProps);
         plugins.compareAndSwapWithDelegatingLoader();
         DistributedConfig config = new DistributedConfig(workerProps);
@@ -81,6 +88,7 @@ public class ConnectDistributed {
         DistributedHerder herder = new DistributedHerder(config, time, worker, statusBackingStore,
configBackingStore,
                 advertisedUrl.toString());
         final Connect connect = new Connect(herder, rest);
+        log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs()
- initStart);
         try {
             connect.start();
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1444b7b5/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index c6d0e59..551c6b5 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -23,6 +23,7 @@ import org.apache.kafka.connect.runtime.Connect;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.WorkerInfo;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
@@ -60,11 +61,17 @@ public class ConnectStandalone {
             Exit.exit(1);
         }
 
+        Time time = Time.SYSTEM;
+        log.info("Kafka Connect standalone worker initializing ...");
+        long initStart = time.hiResClockMs();
+        WorkerInfo initInfo = new WorkerInfo();
+        initInfo.logAll();
+
         String workerPropsFile = args[0];
         Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
                 Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String,
String>emptyMap();
 
-        Time time = Time.SYSTEM;
+        log.info("Scanning for plugin classes. This might take a moment ...");
         Plugins plugins = new Plugins(workerProps);
         plugins.compareAndSwapWithDelegatingLoader();
         StandaloneConfig config = new StandaloneConfig(workerProps);
@@ -77,7 +84,8 @@ public class ConnectStandalone {
 
         Herder herder = new StandaloneHerder(worker);
         final Connect connect = new Connect(herder, rest);
-        
+        log.info("Kafka Connect standalone worker initialization took {}ms", time.hiResClockMs()
- initStart);
+
         try {
             connect.start();
             for (final String connectorPropsFile : Arrays.copyOfRange(args, 1, args.length))
{

http://git-wip-us.apache.org/repos/asf/kafka/blob/1444b7b5/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerInfo.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerInfo.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerInfo.java
new file mode 100644
index 0000000..7d13226
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerInfo.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.OperatingSystemMXBean;
+import java.lang.management.RuntimeMXBean;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Connect Worker system and runtime information.
+ */
+public class WorkerInfo {
+    private static final Logger log = LoggerFactory.getLogger(WorkerInfo.class);
+    private static final RuntimeMXBean RUNTIME;
+    private static final OperatingSystemMXBean OS;
+
+    static {
+        RUNTIME = ManagementFactory.getRuntimeMXBean();
+        OS = ManagementFactory.getOperatingSystemMXBean();
+    }
+
+    private final Map<String, Object> values;
+
+    /**
+     * Constructor.
+     */
+    public WorkerInfo() {
+        this.values = new LinkedHashMap<>();
+        addRuntimeInfo();
+        addSystemInfo();
+    }
+
+    /**
+     * Log the values of this object at level INFO.
+     */
+    // Equivalent to logAll in AbstractConfig
+    public void logAll() {
+        StringBuilder b = new StringBuilder();
+        b.append(getClass().getSimpleName());
+        b.append(" values: ");
+        b.append(Utils.NL);
+
+        for (Map.Entry<String, Object> entry : values.entrySet()) {
+            b.append('\t');
+            b.append(entry.getKey());
+            b.append(" = ");
+            b.append(format(entry.getValue()));
+            b.append(Utils.NL);
+        }
+        log.info(b.toString());
+    }
+
+    private static Object format(Object value) {
+        return value == null ? "NA" : value;
+    }
+
+    /**
+     * Collect general runtime information.
+     */
+    protected void addRuntimeInfo() {
+        List<String> jvmArgs = RUNTIME.getInputArguments();
+        values.put("jvm.args", Utils.join(jvmArgs, ", "));
+        String[] jvmSpec = {
+                RUNTIME.getVmVendor(),
+                RUNTIME.getVmName(),
+                RUNTIME.getSystemProperties().get("java.version"),
+                RUNTIME.getVmVersion()
+        };
+        values.put("jvm.spec", Utils.join(jvmSpec, ", "));
+        values.put("jvm.classpath", RUNTIME.getClassPath());
+    }
+
+    /**
+     * Collect system information.
+     */
+    protected void addSystemInfo() {
+        String[] osInfo = {
+                OS.getName(),
+                OS.getArch(),
+                OS.getVersion(),
+        };
+        values.put("os.spec", Utils.join(osInfo, ", "));
+        values.put("os.vcpus", String.valueOf(OS.getAvailableProcessors()));
+    }
+
+}


Mime
View raw message