kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rha...@apache.org
Subject [kafka] branch 2.6 updated: KAFKA-10531: Check for negative values to Thread.sleep call (#9347)
Date Mon, 05 Oct 2020 19:27:30 GMT
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 4576fdc  KAFKA-10531: Check for negative values to Thread.sleep call (#9347)
4576fdc is described below

commit 4576fdc42b42d8a53872ad57cc5aaee76d99d806
Author: Vikas Singh <vikas@confluent.io>
AuthorDate: Mon Oct 5 12:06:55 2020 -0700

    KAFKA-10531: Check for negative values to Thread.sleep call (#9347)
    
    System.currentTimeMillis() is not monotonic, so using that to calculate time to sleep
can result in negative values. That will throw IllegalArgumentException.
    
    This change checks for that and sleeps for a second (to avoid tight loop) if the value
returned is negative.
    
    Author: Shaik Zakir Hussain <zhussain@confluent.io>
    Reviewer: Randall Hauch <rhauch@gmail.com>
---
 .../java/org/apache/kafka/connect/util/KafkaBasedLog.java | 15 +++++++++------
 1 file changed, 9 insertions(+), 6 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index 69d2588..5248715 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -31,7 +31,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,6 +44,7 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 
 /**
@@ -70,7 +70,8 @@ import java.util.concurrent.Future;
  */
 public class KafkaBasedLog<K, V> {
     private static final Logger log = LoggerFactory.getLogger(KafkaBasedLog.class);
-    private static final long CREATE_TOPIC_TIMEOUT_MS = 30000;
+    private static final long CREATE_TOPIC_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(30);
+    private static final long MAX_SLEEP_MS = TimeUnit.SECONDS.toMillis(1);
 
     private Time time;
     private final String topic;
@@ -133,11 +134,13 @@ public class KafkaBasedLog<K, V> {
         List<TopicPartition> partitions = new ArrayList<>();
 
         // We expect that the topics will have been created either manually by the user or
automatically by the herder
-        List<PartitionInfo> partitionInfos = null;
-        long started = time.milliseconds();
-        while (partitionInfos == null && time.milliseconds() - started < CREATE_TOPIC_TIMEOUT_MS)
{
+        List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
+        long started = time.nanoseconds();
+        long sleepMs = 100;
+        while (partitionInfos == null && time.nanoseconds() - started < CREATE_TOPIC_TIMEOUT_NS)
{
+            time.sleep(sleepMs);
+            sleepMs = Math.min(2 * sleepMs, MAX_SLEEP_MS);
             partitionInfos = consumer.partitionsFor(topic);
-            Utils.sleep(Math.min(time.milliseconds() - started, 1000));
         }
         if (partitionInfos == null)
             throw new ConnectException("Could not look up partition metadata for offset backing
store topic in" +


Mime
View raw message