kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch 2.5 updated: KAFKA-9233: Fix IllegalStateException in Fetcher retrieval of beginning or end offsets for duplicate TopicPartition values (#7755)
Date Wed, 01 Apr 2020 18:10:50 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 4b2268b  KAFKA-9233: Fix IllegalStateException in Fetcher retrieval of beginning
or end offsets for duplicate TopicPartition values (#7755)
4b2268b is described below

commit 4b2268bd296e348f5a1cbe02cfc763167ea304e2
Author: Andrew Olson <930946+noslowerdna@users.noreply.github.com>
AuthorDate: Wed Apr 1 11:45:45 2020 -0500

    KAFKA-9233: Fix IllegalStateException in Fetcher retrieval of beginning or end offsets
for duplicate TopicPartition values (#7755)
    
    Co-authored-by: Andrew Olson <aolson1@cerner.com>
    
    Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>,
Guozhang Wang <wangguoz@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, Rajini
Sivaram <rajinisivaram@googlemail.com>
---
 .../kafka/clients/consumer/internals/Fetcher.java  |  1 +
 .../clients/consumer/internals/FetcherTest.java    | 81 ++++++++++++++++++++++
 2 files changed, 82 insertions(+)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 8e474fa..3537f60 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -570,6 +570,7 @@ public class Fetcher<K, V> implements Closeable {
         metadata.addTransientTopics(topicsForPartitions(partitions));
         try {
             Map<TopicPartition, Long> timestampsToSearch = partitions.stream()
+                    .distinct()
                     .collect(Collectors.toMap(Function.identity(), tp -> timestamp));
 
             ListOffsetResult result = fetchOffsetsByTimes(timestampsToSearch, timer, false);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index b757e88..72e1370 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -118,6 +118,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
 import static java.util.Collections.singleton;
 import static java.util.Collections.singletonMap;
 import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
@@ -3909,6 +3912,74 @@ public class FetcherTest {
         assertEquals(1, fetcher.sendFetches());
     }
 
+    @Test
+    public void testBeginningOffsets() {
+        buildFetcher();
+        assignFromUser(singleton(tp0));
+        client.prepareResponse(listOffsetResponse(tp0, Errors.NONE, ListOffsetRequest.EARLIEST_TIMESTAMP,
2L));
+        assertEquals(singletonMap(tp0, 2L), fetcher.beginningOffsets(singleton(tp0), time.timer(5000L)));
+    }
+
+    @Test
+    public void testBeginningOffsetsDuplicateTopicPartition() {
+        buildFetcher();
+        assignFromUser(singleton(tp0));
+        client.prepareResponse(listOffsetResponse(tp0, Errors.NONE, ListOffsetRequest.EARLIEST_TIMESTAMP,
2L));
+        assertEquals(singletonMap(tp0, 2L), fetcher.beginningOffsets(asList(tp0, tp0), time.timer(5000L)));
+    }
+
+    @Test
+    public void testBeginningOffsetsMultipleTopicPartitions() {
+        buildFetcher();
+        Map<TopicPartition, Long> expectedOffsets = new HashMap<>();
+        expectedOffsets.put(tp0, 2L);
+        expectedOffsets.put(tp1, 4L);
+        expectedOffsets.put(tp2, 6L);
+        assignFromUser(expectedOffsets.keySet());
+        client.prepareResponse(listOffsetResponse(Errors.NONE, ListOffsetRequest.EARLIEST_TIMESTAMP,
expectedOffsets));
+        assertEquals(expectedOffsets, fetcher.beginningOffsets(asList(tp0, tp1, tp2), time.timer(5000L)));
+    }
+
+    @Test
+    public void testBeginningOffsetsEmpty() {
+        buildFetcher();
+        assertEquals(emptyMap(), fetcher.beginningOffsets(emptyList(), time.timer(5000L)));
+    }
+
+    @Test
+    public void testEndOffsets() {
+        buildFetcher();
+        assignFromUser(singleton(tp0));
+        client.prepareResponse(listOffsetResponse(tp0, Errors.NONE, ListOffsetRequest.LATEST_TIMESTAMP,
5L));
+        assertEquals(singletonMap(tp0, 5L), fetcher.endOffsets(singleton(tp0), time.timer(5000L)));
+    }
+
+    @Test
+    public void testEndOffsetsDuplicateTopicPartition() {
+        buildFetcher();
+        assignFromUser(singleton(tp0));
+        client.prepareResponse(listOffsetResponse(tp0, Errors.NONE, ListOffsetRequest.LATEST_TIMESTAMP,
5L));
+        assertEquals(singletonMap(tp0, 5L), fetcher.endOffsets(asList(tp0, tp0), time.timer(5000L)));
+    }
+
+    @Test
+    public void testEndOffsetsMultipleTopicPartitions() {
+        buildFetcher();
+        Map<TopicPartition, Long> expectedOffsets = new HashMap<>();
+        expectedOffsets.put(tp0, 5L);
+        expectedOffsets.put(tp1, 7L);
+        expectedOffsets.put(tp2, 9L);
+        assignFromUser(expectedOffsets.keySet());
+        client.prepareResponse(listOffsetResponse(Errors.NONE, ListOffsetRequest.LATEST_TIMESTAMP,
expectedOffsets));
+        assertEquals(expectedOffsets, fetcher.endOffsets(asList(tp0, tp1, tp2), time.timer(5000L)));
+    }
+
+    @Test
+    public void testEndOffsetsEmpty() {
+        buildFetcher();
+        assertEquals(emptyMap(), fetcher.endOffsets(emptyList(), time.timer(5000L)));
+    }
+
     private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp) {
         // matches any list offset request with the provided timestamp
         return body -> {
@@ -3930,6 +4001,16 @@ public class FetcherTest {
         return new ListOffsetResponse(allPartitionData);
     }
 
+    private ListOffsetResponse listOffsetResponse(Errors error, long timestamp, Map<TopicPartition,
Long> offsets) {
+        Map<TopicPartition, ListOffsetResponse.PartitionData> allPartitionData = new
HashMap<>();
+        for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
+            ListOffsetResponse.PartitionData partitionData = new ListOffsetResponse.PartitionData(error,
timestamp,
+                entry.getValue(), Optional.empty());
+            allPartitionData.put(entry.getKey(), partitionData);
+        }
+        return new ListOffsetResponse(allPartitionData);
+    }
+
     private FetchResponse<MemoryRecords> fullFetchResponseWithAbortedTransactions(MemoryRecords
records,
                                                                                   List<FetchResponse.AbortedTransaction>
abortedTransactions,
                                                                                   Errors
error,


Mime
View raw message