kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-2944: Replaced the NPE with a nicer error and clean exit and added debug message to assist with figuring this out.
Date Thu, 03 Mar 2016 02:51:19 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 9cd65c46a -> 00a58f8e1


KAFKA-2944: Replaced the NPE with a nicer error and clean exit and added debug message to
assist with figuring this out.

…ssage to assist with figuring this out.

Author: Gwen Shapira <cshapi@gmail.com>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #993 from gwenshap/KAFKA-2944


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/00a58f8e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/00a58f8e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/00a58f8e

Branch: refs/heads/trunk
Commit: 00a58f8e1e0c82c2948a8fdfacf812ec4865b339
Parents: 9cd65c4
Author: Gwen Shapira <cshapi@gmail.com>
Authored: Wed Mar 2 18:50:59 2016 -0800
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Wed Mar 2 18:50:59 2016 -0800

----------------------------------------------------------------------
 .../connect/storage/KafkaConfigStorage.java     | 20 ++++++++++++++++----
 1 file changed, 16 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/00a58f8e/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
index 9bd191e..6a06fec 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
@@ -286,6 +286,7 @@ public class KafkaConfigStorage {
         }
 
         try {
+            log.debug("Writing connector configuration for connector " + connector + " configuration:
" + properties);
             configLog.send(CONNECTOR_KEY(connector), serializedConfig);
             configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
@@ -334,6 +335,7 @@ public class KafkaConfigStorage {
             Struct connectConfig = new Struct(TASK_CONFIGURATION_V0);
             connectConfig.put("properties", taskConfigEntry.getValue());
             byte[] serializedConfig = converter.fromConnectData(topic, TASK_CONFIGURATION_V0,
connectConfig);
+            log.debug("Writing configuration for task " + taskConfigEntry.getKey() + " configuration:
" + taskConfigEntry.getValue());
             configLog.send(TASK_KEY(taskConfigEntry.getKey()), serializedConfig);
         }
 
@@ -348,6 +350,7 @@ public class KafkaConfigStorage {
                 Struct connectConfig = new Struct(CONNECTOR_TASKS_COMMIT_V0);
                 connectConfig.put("tasks", taskCountEntry.getValue());
                 byte[] serializedConfig = converter.fromConnectData(topic, CONNECTOR_TASKS_COMMIT_V0,
connectConfig);
+                log.debug("Writing commit for connector " + taskCountEntry.getKey() + " with
" + taskCountEntry.getValue() + " tasks.");
                 configLog.send(COMMIT_TASKS_KEY(taskCountEntry.getKey()), serializedConfig);
             }
 
@@ -396,6 +399,7 @@ public class KafkaConfigStorage {
                 synchronized (lock) {
                     if (value.value() == null) {
                         // Connector deletion will be written as a null value
+                        log.info("Removed connector " + connectorName + " due to null configuration.
This is usually intentional and does not indicate an issue.");
                         connectorConfigs.remove(connectorName);
                     } else {
                         // Connector configs can be applied and callbacks invoked immediately
@@ -405,9 +409,10 @@ public class KafkaConfigStorage {
                         }
                         Object newConnectorConfig = ((Map<String, Object>) value.value()).get("properties");
                         if (!(newConnectorConfig instanceof Map)) {
-                            log.error("Invalid data for connector config: properties filed
should be a Map but is " + newConnectorConfig.getClass());
+                            log.error("Invalid data for connector config (" + connectorName
+ "): properties filed should be a Map but is " + newConnectorConfig.getClass());
                             return;
                         }
+                        log.debug("Updating configuration for connector " + connectorName
+ " configuation: " + newConnectorConfig);
                         connectorConfigs.put(connectorName, (Map<String, String>) newConnectorConfig);
                     }
                 }
@@ -421,13 +426,13 @@ public class KafkaConfigStorage {
                         return;
                     }
                     if (!(value.value() instanceof Map)) {
-                        log.error("Ignoring task configuration because it is in the wrong
format: " + value.value());
+                        log.error("Ignoring task configuration for task " + taskId + " because
it is in the wrong format: " + value.value());
                         return;
                     }
 
                     Object newTaskConfig = ((Map<String, Object>) value.value()).get("properties");
                     if (!(newTaskConfig instanceof Map)) {
-                        log.error("Invalid data for task config: properties filed should
be a Map but is " + newTaskConfig.getClass());
+                        log.error("Invalid data for task config (" + taskId + "): properties
filed should be a Map but is " + newTaskConfig.getClass());
                         return;
                     }
 
@@ -436,6 +441,7 @@ public class KafkaConfigStorage {
                         deferred = new HashMap<>();
                         deferredTaskUpdates.put(taskId.connector(), deferred);
                     }
+                    log.debug("Storing new config for task " + taskId + " this will wait
for a commit message before the new config will take effect. New config: " + newTaskConfig);
                     deferred.put(taskId, (Map<String, String>) newTaskConfig);
                 }
             } else if (record.key().startsWith(COMMIT_TASKS_PREFIX)) {
@@ -464,7 +470,7 @@ public class KafkaConfigStorage {
                     // resolve this (i.e., get the connector to recommit its configuration).
This inconsistent state is
                     // exposed in the snapshots provided via ClusterConfigState so they are
easy to handle.
                     if (!(value.value() instanceof Map)) { // Schema-less, so we get maps
instead of structs
-                        log.error("Ignoring connector tasks configuration commit because
it is in the wrong format: " + value.value());
+                        log.error("Ignoring connector tasks configuration commit for connector
" + connectorName + " because it is in the wrong format: " + value.value());
                         return;
                     }
 
@@ -476,11 +482,17 @@ public class KafkaConfigStorage {
                     // update of all tasks that are expected based on the number of tasks
in the commit message.
                     Map<String, Set<Integer>> updatedConfigIdsByConnector = taskIdsByConnector(deferred);
                     Set<Integer> taskIdSet = updatedConfigIdsByConnector.get(connectorName);
+                    if (taskIdSet == null) {
+                        //TODO: Figure out why this happens (KAFKA-3321)
+                        log.error("Received a commit message for connector " + connectorName
+ " but there is no matching configuration for tasks in this connector. This should never
happen.");
+                        return;
+                    }
                     if (!completeTaskIdSet(taskIdSet, newTaskCount)) {
                         // Given the logic for writing commit messages, we should only hit
this condition due to compacted
                         // historical data, in which case we would not have applied any updates
yet and there will be no
                         // task config data already committed for the connector, so we shouldn't
have to clear any data
                         // out. All we need to do is add the flag marking it inconsistent.
+                        log.debug("We have an incomplete set of task configs for connector
" + connectorName + " probably due to compaction. So we are not doing anything with the new
configuration.");
                         inconsistent.add(connectorName);
                     } else {
                         if (deferred != null) {


Mime
View raw message