kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6577: Fix Connect system tests and add debug messages
Date Thu, 22 Feb 2018 09:42:34 GMT
This is an automated email from the ASF dual-hosted git repository.

damianguy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fc19c3e  KAFKA-6577: Fix Connect system tests and add debug messages
fc19c3e is described below

commit fc19c3e6f243a8d1b3e27cdc912dc092bbd342e0
Author: Randall Hauch <rhauch@gmail.com>
AuthorDate: Thu Feb 22 09:39:59 2018 +0000

    KAFKA-6577: Fix Connect system tests and add debug messages
    
    **NOTE: This should be backported to the `1.1` branch, and is currently a blocker for
1.1.**
    
    The `connect_test.py::ConnectStandaloneFileTest.test_file_source_and_sink` system test
is failing with the SASL configuration without a sufficient explanation. During the test,
the Connect worker fails to start, but the Connect log contains no useful information. There
are actual several things compounding to cause the failure and make it difficult to understand
the problem.
    
    First, the `tests/kafkatest/tests/connect/templates/connect_standalone.properties` is
only adding in the broker's security configuration with the `producer.` and `consumer.` prefixes,
but is not adding them with no prefix. The worker uses the AdminClient to connect to the broker
to get the Kafka cluster ID and to manage the three internal topics, and the AdminClient is
configured via top-level properties. Because the SASL test requires the clients all connect
using SASL, the lack of b [...]
    
    Second, the default `request.timeout.ms` for the AdminClient (and the other clients) is
120 seconds, so the AdminClient was retrying for 120 seconds before it would give up and thrown
an error. However, the test was only waiting for 60 seconds before determining that the service
failed to start. This can be corrected by setting `request.timeout.ms=10000` in the Connect
distributed and standalone worker configurations.
    
    Third, the Connect workers were recently changed to lookup the Kafka cluster ID before
it started the herder. This is unlike the older uses of the AdminClient to find and manage
the internal topics, where failure to connect was not necessarily logged correctly but nevertheless
still skipped over, relying upon broker auto-topic creation to create the internal topics.
(This may be why the test did not fail prior to the recent change to always require a successful
AdminClient connection. [...]
    
    The `ConnectStandaloneFileTest.test_file_source_and_sink` system tests were run locally
prior to this fix, and they failed as with the nightlies. Once these fixes were made, the
locally run system tests passed.
    
    Author: Randall Hauch <rhauch@gmail.com>
    
    Reviewers: Konstantine Karantasis <konstantine@confluent.io>, Ewen Cheslack-Postava
<me@ewencp.org>
    
    Closes #4610 from rhauch/kafka-6577-trunk
---
 .../main/java/org/apache/kafka/connect/cli/ConnectDistributed.java   | 1 +
 .../main/java/org/apache/kafka/connect/cli/ConnectStandalone.java    | 1 +
 .../org/apache/kafka/connect/storage/KafkaConfigBackingStore.java    | 1 +
 .../org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java    | 1 +
 .../org/apache/kafka/connect/storage/KafkaStatusBackingStore.java    | 1 +
 .../src/main/java/org/apache/kafka/connect/util/ConnectUtils.java    | 5 ++++-
 tests/kafkatest/tests/connect/connect_test.py                        | 2 +-
 .../kafkatest/tests/connect/templates/connect-distributed.properties | 3 +++
 .../kafkatest/tests/connect/templates/connect-standalone.properties  | 4 ++++
 9 files changed, 17 insertions(+), 2 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 4afa47d..3b7ec87 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -74,6 +74,7 @@ public class ConnectDistributed {
         DistributedConfig config = new DistributedConfig(workerProps);
 
         String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
+        log.debug("Kafka cluster ID: {}", kafkaClusterId);
 
         RestServer rest = new RestServer(config);
         URI advertisedUrl = rest.advertisedUrl();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index 1769905..413cb46 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -78,6 +78,7 @@ public class ConnectStandalone {
         StandaloneConfig config = new StandaloneConfig(workerProps);
 
         String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
+        log.debug("Kafka cluster ID: {}", kafkaClusterId);
 
         RestServer rest = new RestServer(config);
         URI advertisedUrl = rest.advertisedUrl();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index b34e483..e51b365 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -432,6 +432,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
         Runnable createTopics = new Runnable() {
             @Override
             public void run() {
+                log.debug("Creating admin client to manage Connect internal config topic");
                 try (TopicAdmin admin = new TopicAdmin(adminProps)) {
                     admin.createTopics(topicDescription);
                 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index f29f3c2..fb8ad97 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -94,6 +94,7 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
         Runnable createTopics = new Runnable() {
             @Override
             public void run() {
+                log.debug("Creating admin client to manage Connect internal offset topic");
                 try (TopicAdmin admin = new TopicAdmin(adminProps)) {
                     admin.createTopics(topicDescription);
                 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index 8ca21eb..6710808 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -157,6 +157,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
         Runnable createTopics = new Runnable() {
             @Override
             public void run() {
+                log.debug("Creating admin client to manage Connect internal status topic");
                 try (TopicAdmin admin = new TopicAdmin(adminProps)) {
                     admin.createTopics(topicDescription);
                 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
index 1945204..9f30236 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
@@ -40,6 +40,7 @@ public final class ConnectUtils {
     }
 
     public static String lookupKafkaClusterId(WorkerConfig config) {
+        log.info("Creating Kafka admin client");
         try (AdminClient adminClient = AdminClient.create(config.originals())) {
             return lookupKafkaClusterId(adminClient);
         }
@@ -53,13 +54,15 @@ public final class ConnectUtils {
                 log.info("Kafka cluster version is too old to return cluster ID");
                 return null;
             }
+            log.debug("Fetching Kafka cluster ID");
             String kafkaClusterId = clusterIdFuture.get();
             log.info("Kafka cluster ID: {}", kafkaClusterId);
             return kafkaClusterId;
         } catch (InterruptedException e) {
             throw new ConnectException("Unexpectedly interrupted when looking up Kafka cluster
info", e);
         } catch (ExecutionException e) {
-            throw new ConnectException("Failed to connect to and describe Kafka cluster",
e);
+            throw new ConnectException("Failed to connect to and describe Kafka cluster.
"
+                                       + "Check worker's broker connection and security properties.",
e);
         }
     }
 }
diff --git a/tests/kafkatest/tests/connect/connect_test.py b/tests/kafkatest/tests/connect/connect_test.py
index 9436119..3753876 100644
--- a/tests/kafkatest/tests/connect/connect_test.py
+++ b/tests/kafkatest/tests/connect/connect_test.py
@@ -91,7 +91,7 @@ class ConnectStandaloneFileTest(Test):
         self.source = ConnectStandaloneService(self.test_context, self.kafka, [self.INPUT_FILE,
self.OFFSETS_FILE])
         self.sink = ConnectStandaloneService(self.test_context, self.kafka, [self.OUTPUT_FILE,
self.OFFSETS_FILE])
         self.consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.TOPIC,
-                                                  consumer_timeout_ms=1000)
+                                                  consumer_timeout_ms=10000)
 
         self.zk.start()
         self.kafka.start()
diff --git a/tests/kafkatest/tests/connect/templates/connect-distributed.properties b/tests/kafkatest/tests/connect/templates/connect-distributed.properties
index 6660e6c..a1d3de2 100644
--- a/tests/kafkatest/tests/connect/templates/connect-distributed.properties
+++ b/tests/kafkatest/tests/connect/templates/connect-distributed.properties
@@ -52,3 +52,6 @@ rest.advertised.host.name = {{ node.account.hostname }}
 # Reduce session timeouts so tests that kill workers don't need to wait as long to recover
 session.timeout.ms=10000
 consumer.session.timeout.ms=10000
+
+# Reduce the admin client request timeouts so that we don't wait the default 120 sec before
failing to connect the admin client
+request.timeout.ms=30000
diff --git a/tests/kafkatest/tests/connect/templates/connect-standalone.properties b/tests/kafkatest/tests/connect/templates/connect-standalone.properties
index 09c6487..5f079f7 100644
--- a/tests/kafkatest/tests/connect/templates/connect-standalone.properties
+++ b/tests/kafkatest/tests/connect/templates/connect-standalone.properties
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 bootstrap.servers={{ kafka.bootstrap_servers(kafka.security_config.security_protocol) }}
+{{ kafka.security_config.client_config().props() }}
 {{ kafka.security_config.client_config().props("producer.") }}
 {{ kafka.security_config.client_config().props("consumer.") }}
 
@@ -32,3 +33,6 @@ internal.key.converter.schemas.enable=false
 internal.value.converter.schemas.enable=false
 
 offset.storage.file.filename={{ OFFSETS_FILE }}
+
+# Reduce the admin client request timeouts so that we don't wait the default 120 sec before
failing to connect the admin client
+request.timeout.ms=30000

-- 
To stop receiving notification emails like this one, please contact
damianguy@apache.org.

Mime
View raw message