kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-4274; Consumer `offsetForTimes` times out on empty map
Date Sat, 08 Oct 2016 01:45:11 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.1 1a23ac5cb -> cbf16eac7


KAFKA-4274; Consumer `offsetForTimes` times out on empty map

Author: Jiangjie Qin <becket.qin@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>

Closes #1993 from becketqin/KAFKA-4274

(cherry picked from commit 6b91f83fbaef26fefb9ec221529c29872a04c004)
Signed-off-by: Jason Gustafson <jason@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cbf16eac
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cbf16eac
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cbf16eac

Branch: refs/heads/0.10.1
Commit: cbf16eac7b8db0d7335989c2ecc4a057f1debfe8
Parents: 1a23ac5
Author: Jiangjie Qin <becket.qin@gmail.com>
Authored: Fri Oct 7 18:43:24 2016 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Fri Oct 7 18:45:30 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/kafka/clients/consumer/internals/Fetcher.java | 3 +++
 .../org/apache/kafka/clients/consumer/internals/FetcherTest.java  | 2 ++
 core/src/main/scala/kafka/api/ApiVersion.scala                    | 2 +-
 3 files changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/cbf16eac/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 17ab398..9e9ae92 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -362,6 +362,9 @@ public class Fetcher<K, V> {
 
     public Map<TopicPartition, OffsetAndTimestamp> getOffsetsByTimes(Map<TopicPartition,
Long> timestampsToSearch,
                                                                      long timeout) {
+        if (timestampsToSearch.isEmpty())
+            return Collections.emptyMap();
+
         long startMs = time.milliseconds();
         long remaining = timeout;
         do {

http://git-wip-us.apache.org/repos/asf/kafka/blob/cbf16eac/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 4f56796..faf6efa 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -648,6 +648,8 @@ public class FetcherTest {
 
     @Test
     public void testGetOffsetsForTimes() {
+        // Empty map
+        assertTrue(fetcher.getOffsetsByTimes(new HashMap<TopicPartition, Long>(), 100L).isEmpty());
         // Error code none with unknown offset
         testGetOffsetsForTimesWithError(Errors.NONE, Errors.NONE, -1L, 100L, null, 100L);
         // Error code none with known offset

http://git-wip-us.apache.org/repos/asf/kafka/blob/cbf16eac/core/src/main/scala/kafka/api/ApiVersion.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index 0d9775a..895c1b1 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -57,7 +57,7 @@ object ApiVersion {
     "0.10.1-IV0" -> KAFKA_0_10_1_IV0,
     // 0.10.1-IV1 is introduced for KIP-74(fetch response size limit).
     "0.10.1-IV1" -> KAFKA_0_10_1_IV1,
-    // introduced ListGroupRequest v1 in KIP-79
+    // introduced ListOffsetRequest v1 in KIP-79
     "0.10.1-IV2" -> KAFKA_0_10_1_IV2,
     "0.10.1" -> KAFKA_0_10_1_IV2
 


Mime
View raw message