kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-8591; WorkerConfigTransformer NPE on connector configuration reloading (#6991)
Date Tue, 09 Jul 2019 06:07:32 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 289ac09  KAFKA-8591; WorkerConfigTransformer NPE on connector configuration reloading
(#6991)
289ac09 is described below

commit 289ac092923a32e5a839750fcaef0ae0856d05b3
Author: Nacho Muñoz Gómez <nachomdo@gmail.com>
AuthorDate: Tue Jul 9 07:07:10 2019 +0100

    KAFKA-8591; WorkerConfigTransformer NPE on connector configuration reloading (#6991)
    
    A bug in `WorkerConfigTransformer` prevents the connector configuration reload when the
ConfigData TTL expires.
    
    The issue boils down to the fact that `worker.herder().restartConnector` is receiving
a null callback.
    
    ```
    [2019-06-17 14:34:12,320] INFO Scheduling a restart of connector workshop-incremental
in 60000 ms (org.apache.kafka.connect.runtime.WorkerConfigTransformer:88)
    [2019-06-17 14:34:12,321] ERROR Uncaught exception in herder work thread, exiting:  (org.apache.kafka.connect.runtime.distributed.DistributedHerder:227)
    java.lang.NullPointerException
            at org.apache.kafka.connect.runtime.distributed.DistributedHerder$19.onCompletion(DistributedHerder.java:1187)
            at org.apache.kafka.connect.runtime.distributed.DistributedHerder$19.onCompletion(DistributedHerder.java:1183)
            at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:273)
            at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:219)
    ```
    This patch adds a callback which just logs the error.
    
    Reviewers: Robert Yokota <rayokota@gmail.com>, Jason Gustafson <jason@confluent.io>
---
 .../apache/kafka/connect/runtime/WorkerConfigTransformer.java | 11 ++++++++++-
 .../kafka/connect/runtime/WorkerConfigTransformerTest.java    |  9 +++++----
 2 files changed, 15 insertions(+), 5 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
index 3373d5c..1a799bb 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.config.provider.ConfigProvider;
 import org.apache.kafka.common.config.ConfigTransformer;
 import org.apache.kafka.common.config.ConfigTransformerResult;
 import org.apache.kafka.connect.runtime.Herder.ConfigReloadAction;
+import org.apache.kafka.connect.util.Callback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -86,7 +87,15 @@ public class WorkerConfigTransformer {
             }
         }
         log.info("Scheduling a restart of connector {} in {} ms", connectorName, ttl);
-        HerderRequest request = worker.herder().restartConnector(ttl, connectorName, null);
+        Callback<Void> cb = new Callback<Void>() {
+            @Override
+            public void onCompletion(Throwable error, Void result) {
+                if (error != null) {
+                    log.error("Unexpected error during connector restart: ", error);
+                }
+            }
+        };
+        HerderRequest request = worker.herder().restartConnector(ttl, connectorName, cb);
         connectorRequests.put(path, request);
     }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
index e30acb1..6f4bda6 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.config.ConfigChangeCallback;
 import org.apache.kafka.common.config.ConfigData;
 import org.apache.kafka.common.config.provider.ConfigProvider;
 import org.easymock.EasyMock;
+import static org.easymock.EasyMock.eq;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -34,6 +35,7 @@ import java.util.Set;
 
 import static org.apache.kafka.connect.runtime.ConnectorConfig.CONFIG_RELOAD_ACTION_CONFIG;
 import static org.apache.kafka.connect.runtime.ConnectorConfig.CONFIG_RELOAD_ACTION_NONE;
+import static org.easymock.EasyMock.notNull;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.powermock.api.easymock.PowerMock.replayAll;
@@ -84,8 +86,7 @@ public class WorkerConfigTransformerTest {
     @Test
     public void testReplaceVariableWithTTLAndScheduleRestart() {
         EasyMock.expect(worker.herder()).andReturn(herder);
-        EasyMock.expect(herder.restartConnector(1L, MY_CONNECTOR, null)).andReturn(requestId);
-
+        EasyMock.expect(herder.restartConnector(eq(1L), eq(MY_CONNECTOR), notNull())).andReturn(requestId);
         replayAll();
 
         Map<String, String> result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY,
"${test:testPath:testKeyWithTTL}"));
@@ -95,13 +96,13 @@ public class WorkerConfigTransformerTest {
     @Test
     public void testReplaceVariableWithTTLFirstCancelThenScheduleRestart() {
         EasyMock.expect(worker.herder()).andReturn(herder);
-        EasyMock.expect(herder.restartConnector(1L, MY_CONNECTOR, null)).andReturn(requestId);
+        EasyMock.expect(herder.restartConnector(eq(1L), eq(MY_CONNECTOR), notNull())).andReturn(requestId);
 
         EasyMock.expect(worker.herder()).andReturn(herder);
         EasyMock.expectLastCall();
         requestId.cancel();
         EasyMock.expectLastCall();
-        EasyMock.expect(herder.restartConnector(10L, MY_CONNECTOR, null)).andReturn(requestId);
+        EasyMock.expect(herder.restartConnector(eq(10L), eq(MY_CONNECTOR), notNull())).andReturn(requestId);
 
         replayAll();
 


Mime
View raw message