kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject kafka git commit: KAFKA-1836 metadata.fetch.timeout.ms set to zero blocks forever; reviewed by Neha Narkhede and Ewen Cheslack-Postava
Date Mon, 12 Jan 2015 23:59:27 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk b1b80860a -> ad4883a0c


KAFKA-1836 metadata.fetch.timeout.ms set to zero blocks forever; reviewed by Neha Narkhede
and Ewen Cheslack-Postava


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ad4883a0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ad4883a0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ad4883a0

Branch: refs/heads/trunk
Commit: ad4883a0cd5afc4219e28cdafbd98576eeaee2d1
Parents: b1b8086
Author: Jaikiran Pai <jai.forums2013@gmail.com>
Authored: Mon Jan 12 15:58:36 2015 -0800
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Mon Jan 12 15:59:16 2015 -0800

----------------------------------------------------------------------
 .../clients/producer/internals/Metadata.java    | 13 +++++--
 .../kafka/clients/producer/MetadataTest.java    | 41 ++++++++++++++++----
 2 files changed, 43 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ad4883a0/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
index 1d30f9e..dcf4658 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
@@ -25,7 +25,7 @@ import org.slf4j.LoggerFactory;
  * <p>
  * This class is shared by the client thread (for partitioning) and the background sender
thread.
  * 
- * Metadata is maintained for only a subset of topics, which can be added to over time. When
we request metdata for a
+ * Metadata is maintained for only a subset of topics, which can be added to over time. When
we request metadata for a
  * topic we don't have any metadata for it will trigger a metadata update.
  */
 public final class Metadata {
@@ -99,12 +99,17 @@ public final class Metadata {
     /**
      * Wait for metadata update until the current version is larger than the last version
we know of
      */
-    public synchronized void awaitUpdate(int lastVerison, long maxWaitMs) {
+    public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) {
+        if (maxWaitMs < 0) {
+            throw new IllegalArgumentException("Max time to wait for metadata updates should
not be < 0 milli seconds");
+        }
         long begin = System.currentTimeMillis();
         long remainingWaitMs = maxWaitMs;
-        while (this.version <= lastVerison) {
+        while (this.version <= lastVersion) {
             try {
-                wait(remainingWaitMs);
+                if (remainingWaitMs != 0) {
+                    wait(remainingWaitMs);
+                }
             } catch (InterruptedException e) { /* this is fine */
             }
             long elapsed = System.currentTimeMillis() - begin;

http://git-wip-us.apache.org/repos/asf/kafka/blob/ad4883a0/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
index 4547bfc..74605c3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
@@ -3,24 +3,23 @@
  * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
  * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.clients.producer;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
 import org.apache.kafka.clients.producer.internals.Metadata;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 
+import static org.junit.Assert.*;
+
 public class MetadataTest {
 
     private long refreshBackoffMs = 100;
@@ -49,13 +48,42 @@ public class MetadataTest {
         assertTrue("Update needed due to stale metadata.", metadata.timeToNextUpdate(time)
== 0);
     }
 
+    /**
+     * Tests that {@link org.apache.kafka.clients.producer.internals.Metadata#awaitUpdate(int,
long)} doesn't
+     * wait forever with a max timeout value of 0
+     *
+     * @throws Exception
+     * @see https://issues.apache.org/jira/browse/KAFKA-1836
+     */
+    @Test
+    public void testMetadataUpdateWaitTime() throws Exception {
+        long time = 0;
+        metadata.update(Cluster.empty(), time);
+        assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
+        // first try with a max wait time of 0 and ensure that this returns back without
waiting forever
+        try {
+            metadata.awaitUpdate(metadata.requestUpdate(), 0);
+            fail("Wait on metadata update was expected to timeout, but it didn't");
+        } catch (TimeoutException te) {
+            // expected
+        }
+        // now try with a higher timeout value once
+        final long TWO_SECOND_WAIT = 2000;
+        try {
+            metadata.awaitUpdate(metadata.requestUpdate(), TWO_SECOND_WAIT);
+            fail("Wait on metadata update was expected to timeout, but it didn't");
+        } catch (TimeoutException te) {
+            // expected
+        }
+    }
+
     private Thread asyncFetch(final String topic) {
         Thread thread = new Thread() {
             public void run() {
                 while (metadata.fetch().partitionsForTopic(topic) == null) {
                     try {
                         metadata.awaitUpdate(metadata.requestUpdate(), refreshBackoffMs);
-                    } catch(TimeoutException e) {
+                    } catch (TimeoutException e) {
                         // let it go
                     }
                 }
@@ -64,5 +92,4 @@ public class MetadataTest {
         thread.start();
         return thread;
     }
-
 }


Mime
View raw message