kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: MINOR: Increase `awaitCommits` timeout in ExampleConnectIntegrationTest (#7061)
Date Tue, 16 Jul 2019 19:01:09 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f65c71c  MINOR: Increase `awaitCommits` timeout in ExampleConnectIntegrationTest
(#7061)
f65c71c is described below

commit f65c71cf6e13973ebff8ef590eb3d369da4d1b11
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Tue Jul 16 12:00:49 2019 -0700

    MINOR: Increase `awaitCommits` timeout in ExampleConnectIntegrationTest (#7061)
    
    The transient failures are usually caused by a timeout in `awaitCommits`. This patch increases
the timeout from 15s to 30s.
    
    Reviewers: Konstantine Karantasis <konstantine@confluent.io>, Matthias J. Sax <mjsax@apache.org>
---
 .../java/org/apache/kafka/connect/runtime/WorkerSourceTask.java    | 2 +-
 .../kafka/connect/integration/ExampleConnectIntegrationTest.java   | 7 ++++---
 2 files changed, 5 insertions(+), 4 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 9ab4f41..6e94c6f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -225,7 +225,7 @@ class WorkerSourceTask extends WorkerTask {
                 }
                 if (toSend == null)
                     continue;
-                log.debug("{} About to send " + toSend.size() + " records to Kafka", this);
+                log.trace("{} About to send {} records to Kafka", this, toSend.size());
                 if (!sendRecords())
                     stopRequestedLatch.await(SEND_FAILED_BACKOFF_MS, TimeUnit.MILLISECONDS);
             }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
index 224d6ac..35a76a5 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
@@ -55,7 +55,7 @@ public class ExampleConnectIntegrationTest {
 
     private static final int NUM_RECORDS_PRODUCED = 2000;
     private static final int NUM_TOPIC_PARTITIONS = 3;
-    private static final int RECORD_TRANSFER_DURATION_MS = 5000;
+    private static final int RECORD_TRANSFER_DURATION_MS = 30000;
     private static final int CONNECTOR_SETUP_DURATION_MS = 15000;
     private static final int NUM_TASKS = 3;
     private static final int NUM_WORKERS = 3;
@@ -142,7 +142,7 @@ public class ExampleConnectIntegrationTest {
         connectorHandle.awaitRecords(RECORD_TRANSFER_DURATION_MS);
 
         // wait for the connector tasks to commit all records.
-        connectorHandle.awaitCommits(CONNECTOR_SETUP_DURATION_MS);
+        connectorHandle.awaitCommits(RECORD_TRANSFER_DURATION_MS);
 
         // delete connector
         connect.deleteConnector(CONNECTOR_NAME);
@@ -162,6 +162,7 @@ public class ExampleConnectIntegrationTest {
         props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName());
         props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
         props.put("topic", "test-topic");
+        props.put("throughput", String.valueOf(500));
         props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
         props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
 
@@ -178,7 +179,7 @@ public class ExampleConnectIntegrationTest {
         connectorHandle.awaitRecords(RECORD_TRANSFER_DURATION_MS);
 
         // wait for the connector tasks to commit enough records
-        connectorHandle.awaitCommits(CONNECTOR_SETUP_DURATION_MS);
+        connectorHandle.awaitCommits(RECORD_TRANSFER_DURATION_MS);
 
         // consume all records from the source topic or fail, to ensure that they were correctly
produced
         int recordNum = connect.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS,
"test-topic").count();


Mime
View raw message