kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-6026; Fix for indefinite wait in KafkaFutureImpl
Date Tue, 10 Oct 2017 00:58:31 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 1f6494202 -> b411f57c1


KAFKA-6026; Fix for indefinite wait in KafkaFutureImpl

Author: bartdevylder <bartdevylder@gmail.com>
Author: Bart De Vylder <bartdevylder@gmail.com>

Reviewers: Colin P. Mccabe <cmccabe@confluent.io>, Ismael Juma <ismael@juma.me.uk>,
Jason Gustafson <jason@confluent.io>

Closes #4044 from bartdevylder/KAFKA-6026


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b411f57c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b411f57c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b411f57c

Branch: refs/heads/trunk
Commit: b411f57c1ce82d66ec2c837349a54357b322e803
Parents: 1f64942
Author: bartdevylder <bartdevylder@gmail.com>
Authored: Mon Oct 9 17:49:52 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Mon Oct 9 17:57:13 2017 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/common/internals/KafkaFutureImpl.java   | 4 ++--
 .../test/java/org/apache/kafka/common/KafkaFutureTest.java   | 8 ++++++++
 2 files changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b411f57c/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
index cb97e87..9ca019b 100644
--- a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
+++ b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
@@ -96,7 +96,7 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
         R await(long timeout, TimeUnit unit)
                 throws InterruptedException, ExecutionException, TimeoutException {
             long startMs = System.currentTimeMillis();
-            long waitTimeMs = (unit.toMillis(timeout) > 0) ? unit.toMillis(timeout) :
1;
+            long waitTimeMs = unit.toMillis(timeout);
             long delta = 0;
             synchronized (this) {
                 while (true) {
@@ -104,7 +104,7 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
                         wrapAndThrow(exception);
                     if (done)
                         return value;
-                    if (delta > waitTimeMs) {
+                    if (delta >= waitTimeMs) {
                         throw new TimeoutException();
                     }
                     this.wait(waitTimeMs - delta);

http://git-wip-us.apache.org/repos/asf/kafka/blob/b411f57c/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
index 7d29bc5..71f3c3c 100644
--- a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
@@ -173,4 +174,11 @@ public class KafkaFutureTest {
         assertFalse(allFuture.isCompletedExceptionally());
         allFuture.get();
     }
+
+    @Test(expected = TimeoutException.class)
+    public void testFutureTimeoutWithZeroWait() throws Exception {
+        final KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
+        future.get(0, TimeUnit.MILLISECONDS);
+    }
+
 }


Mime
View raw message