kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 1.1 updated: MINOR: Avoid logging connector configuration in Connect framework (#5868)
Date Tue, 13 Nov 2018 22:53:09 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new eeba93e  MINOR: Avoid logging connector configuration in Connect framework (#5868)
eeba93e is described below

commit eeba93e894ce0acd1dcfe779e708a0c6adbf00c8
Author: Randall Hauch <rhauch@gmail.com>
AuthorDate: Tue Nov 13 15:30:34 2018 -0600

    MINOR: Avoid logging connector configuration in Connect framework (#5868)
    
    Some connector configs may be sensitive, so we should avoid logging them.
    
    Reviewers: Alex Diachenko, Dustin Cote <dustin@confluent.io>, Jason Gustafson <jason@confluent.io>
---
 .../connect/storage/KafkaConfigBackingStore.java   | 42 ++++++++++++----------
 1 file changed, 23 insertions(+), 19 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 2788cd0..e06bc96 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -291,7 +291,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
      */
     @Override
     public void putConnectorConfig(String connector, Map<String, String> properties)
{
-        log.debug("Writing connector configuration {} for connector {} configuration", properties,
connector);
+        log.debug("Writing connector configuration for connector '{}'", connector);
         Struct connectConfig = new Struct(CONNECTOR_CONFIGURATION_V0);
         connectConfig.put("properties", properties);
         byte[] serializedConfig = converter.fromConnectData(topic, CONNECTOR_CONFIGURATION_V0,
connectConfig);
@@ -304,7 +304,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
      */
     @Override
     public void removeConnectorConfig(String connector) {
-        log.debug("Removing connector configuration for connector {}", connector);
+        log.debug("Removing connector configuration for connector '{}'", connector);
         try {
             configLog.send(CONNECTOR_KEY(connector), null);
             configLog.send(TARGET_STATE_KEY(connector), null);
@@ -358,7 +358,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
             Struct connectConfig = new Struct(TASK_CONFIGURATION_V0);
             connectConfig.put("properties", taskConfig);
             byte[] serializedConfig = converter.fromConnectData(topic, TASK_CONFIGURATION_V0,
connectConfig);
-            log.debug("Writing configuration for task " + index + " configuration: " + taskConfig);
+            log.debug("Writing configuration for connector '{}' task {}", connector, index);
             ConnectorTaskId connectorTaskId = new ConnectorTaskId(connector, index);
             configLog.send(TASK_KEY(connectorTaskId), serializedConfig);
             index++;
@@ -375,7 +375,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
             Struct connectConfig = new Struct(CONNECTOR_TASKS_COMMIT_V0);
             connectConfig.put("tasks", taskCount);
             byte[] serializedConfig = converter.fromConnectData(topic, CONNECTOR_TASKS_COMMIT_V0,
connectConfig);
-            log.debug("Writing commit for connector " + connector + " with " + taskCount
+ " tasks.");
+            log.debug("Writing commit for connector '{}' with {} tasks.", connector, taskCount);
             configLog.send(COMMIT_TASKS_KEY(connector), serializedConfig);
 
             // Read to end to ensure all the commit messages have been written
@@ -484,17 +484,17 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
                         }
                         Object targetState = ((Map<String, Object>) value.value()).get("state");
                         if (!(targetState instanceof String)) {
-                            log.error("Invalid data for target state for connector ({}):
'state' field should be a Map but is {}",
+                            log.error("Invalid data for target state for connector '{}':
'state' field should be a Map but is {}",
                                     connectorName, targetState == null ? null : targetState.getClass());
                             return;
                         }
 
                         try {
                             TargetState state = TargetState.valueOf((String) targetState);
-                            log.debug("Setting target state for connector {} to {}", connectorName,
targetState);
+                            log.debug("Setting target state for connector '{}' to {}", connectorName,
targetState);
                             connectorTargetStates.put(connectorName, state);
                         } catch (IllegalArgumentException e) {
-                            log.error("Invalid target state for connector ({}): {}", connectorName,
targetState);
+                            log.error("Invalid target state for connector '{}': {}", connectorName,
targetState);
                             return;
                         }
                     }
@@ -511,22 +511,22 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
                 synchronized (lock) {
                     if (value.value() == null) {
                         // Connector deletion will be written as a null value
-                        log.info("Removed connector " + connectorName + " due to null configuration.
This is usually intentional and does not indicate an issue.");
+                        log.info("Successfully processed removal of connector '{}'", connectorName);
                         connectorConfigs.remove(connectorName);
                         removed = true;
                     } else {
                         // Connector configs can be applied and callbacks invoked immediately
                         if (!(value.value() instanceof Map)) {
-                            log.error("Found connector configuration (" + record.key() +
") in wrong format: " + value.value().getClass());
+                            log.error("Found configuration for connector '{}' in wrong format:
{}", record.key(), value.value().getClass());
                             return;
                         }
                         Object newConnectorConfig = ((Map<String, Object>) value.value()).get("properties");
                         if (!(newConnectorConfig instanceof Map)) {
-                            log.error("Invalid data for connector config ({}): properties
field should be a Map but is {}", connectorName,
-                                    newConnectorConfig == null ? null : newConnectorConfig.getClass());
+                            log.error("Invalid data for config for connector '{}': 'properties'
field should be a Map but is {}",
+                                      connectorName, newConnectorConfig == null ? null :
newConnectorConfig.getClass());
                             return;
                         }
-                        log.debug("Updating configuration for connector " + connectorName
+ " configuration: " + newConnectorConfig);
+                        log.debug("Updating configuration for connector '{}'", connectorName);
                         connectorConfigs.put(connectorName, (Map<String, String>) newConnectorConfig);
 
                         // Set the initial state of the connector to STARTED, which ensures
that any connectors
@@ -545,17 +545,21 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
                 synchronized (lock) {
                     ConnectorTaskId taskId = parseTaskId(record.key());
                     if (taskId == null) {
-                        log.error("Ignoring task configuration because " + record.key() +
" couldn't be parsed as a task config key");
+                        log.error("Ignoring task configuration because {} couldn't be parsed
as a task config key", record.key());
+                        return;
+                    }
+                    if (value.value() == null) {
+                        log.error("Ignoring task configuration for task {} because it is
unexpectedly null", taskId);
                         return;
                     }
                     if (!(value.value() instanceof Map)) {
-                        log.error("Ignoring task configuration for task " + taskId + " because
it is in the wrong format: " + value.value());
+                        log.error("Ignoring task configuration for task {} because the value
is not a Map but is {}", taskId, value.value().getClass());
                         return;
                     }
 
                     Object newTaskConfig = ((Map<String, Object>) value.value()).get("properties");
                     if (!(newTaskConfig instanceof Map)) {
-                        log.error("Invalid data for task config (" + taskId + "): properties
filed should be a Map but is " + newTaskConfig.getClass());
+                        log.error("Invalid data for config of task {} 'properties' field
should be a Map but is {}", taskId, newTaskConfig.getClass());
                         return;
                     }
 
@@ -564,7 +568,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
                         deferred = new HashMap<>();
                         deferredTaskUpdates.put(taskId.connector(), deferred);
                     }
-                    log.debug("Storing new config for task " + taskId + " this will wait
for a commit message before the new config will take effect. New config: " + newTaskConfig);
+                    log.debug("Storing new config for task {}; this will wait for a commit
message before the new config will take effect.", taskId);
                     deferred.put(taskId, (Map<String, String>) newTaskConfig);
                 }
             } else if (record.key().startsWith(COMMIT_TASKS_PREFIX)) {
@@ -593,7 +597,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
                     // resolve this (i.e., get the connector to recommit its configuration).
This inconsistent state is
                     // exposed in the snapshots provided via ClusterConfigState so they are
easy to handle.
                     if (!(value.value() instanceof Map)) { // Schema-less, so we get maps
instead of structs
-                        log.error("Ignoring connector tasks configuration commit for connector
" + connectorName + " because it is in the wrong format: " + value.value());
+                        log.error("Ignoring connector tasks configuration commit for connector
'{}' because it is in the wrong format: {}", connectorName, value.value());
                         return;
                     }
                     Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(connectorName);
@@ -608,7 +612,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
                         // historical data, in which case we would not have applied any updates
yet and there will be no
                         // task config data already committed for the connector, so we shouldn't
have to clear any data
                         // out. All we need to do is add the flag marking it inconsistent.
-                        log.debug("We have an incomplete set of task configs for connector
" + connectorName + " probably due to compaction. So we are not doing anything with the new
configuration.");
+                        log.debug("We have an incomplete set of task configs for connector
'{}' probably due to compaction. So we are not doing anything with the new configuration.",
connectorName);
                         inconsistent.add(connectorName);
                     } else {
                         if (deferred != null) {
@@ -629,7 +633,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
                 if (started)
                     updateListener.onTaskConfigUpdate(updatedTasks);
             } else {
-                log.error("Discarding config update record with invalid key: " + record.key());
+                log.error("Discarding config update record with invalid key: {}", record.key());
             }
         }
 


Mime
View raw message