This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new 9328a13 KAFKA-7616; Make MockConsumer only add entries to the partition map returned
by poll() if there are any records to return
9328a13 is described below
commit 9328a137dfbe56fc2e8ad41540ed02f29d778e83
Author: Stig Rohde Døssing <stigdoessing@gmail.com>
AuthorDate: Tue Nov 20 23:16:21 2018 +0530
KAFKA-7616; Make MockConsumer only add entries to the partition map returned by poll()
if there are any records to return
…eturned by poll() if there are any records to return
The MockConsumer behaves unlike the real consumer in that it can return a non-empty ConsumerRecords
from poll, that also has a count of 0. This change makes the MockConsumer only add partitions
to the ConsumerRecords if there are records to return for those partitions.
A unit test in MockConsumerTest demonstrates the issue.
Author: Stig Rohde Døssing <stigdoessing@gmail.com>
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
Closes #5901 from srdo/KAFKA-7616
---
.../org/apache/kafka/clients/consumer/MockConsumer.java | 5 +----
.../apache/kafka/clients/consumer/MockConsumerTest.java | 14 ++++++++++++++
2 files changed, 15 insertions(+), 4 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 6f455e4..452ad84 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -183,9 +183,6 @@ public class MockConsumer<K, V> implements Consumer<K, V>
{
// update the consumed offset
final Map<TopicPartition, List<ConsumerRecord<K, V>>> results =
new HashMap<>();
- for (final TopicPartition topicPartition : records.keySet()) {
- results.put(topicPartition, new ArrayList<ConsumerRecord<K, V>>());
- }
for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry
: this.records.entrySet()) {
if (!subscriptions.isPaused(entry.getKey())) {
@@ -196,7 +193,7 @@ public class MockConsumer<K, V> implements Consumer<K, V>
{
}
if (assignment().contains(entry.getKey()) && rec.offset() >=
subscriptions.position(entry.getKey())) {
- results.get(entry.getKey()).add(rec);
+ results.computeIfAbsent(entry.getKey(), partition -> new ArrayList<>()).add(rec);
subscriptions.position(entry.getKey(), rec.offset() + 1);
}
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index 1d01eb6..03013e6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -26,8 +26,10 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
+import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
public class MockConsumerTest {
@@ -84,4 +86,16 @@ public class MockConsumerTest {
assertEquals(2L, consumer.committed(new TopicPartition("test", 0)).offset());
}
+ @Test
+ public void testConsumerRecordsIsEmptyWhenReturningNoRecords() {
+ TopicPartition partition = new TopicPartition("test", 0);
+ consumer.assign(Collections.singleton(partition));
+ consumer.addRecord(new ConsumerRecord<String, String>("test", 0, 0, null, null));
+ consumer.updateEndOffsets(Collections.singletonMap(partition, 1L));
+ consumer.seekToEnd(Collections.singleton(partition));
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1));
+ assertThat(records.count(), is(0));
+ assertThat(records.isEmpty(), is(true));
+ }
+
}
|