This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.1 by this push: new 6ad0bf4 KAFKA-7616; Make MockConsumer only add entries to the partition map returned by poll() if there are any records to return 6ad0bf4 is described below commit 6ad0bf4dabf3d95e382a14b1601abab183cb027b Author: Stig Rohde Døssing AuthorDate: Tue Nov 20 23:16:21 2018 +0530 KAFKA-7616; Make MockConsumer only add entries to the partition map returned by poll() if there are any records to return …eturned by poll() if there are any records to return The MockConsumer behaves unlike the real consumer in that it can return a non-empty ConsumerRecords from poll, that also has a count of 0. This change makes the MockConsumer only add partitions to the ConsumerRecords if there are records to return for those partitions. A unit test in MockConsumerTest demonstrates the issue. Author: Stig Rohde Døssing Reviewers: Manikumar Reddy Closes #5901 from srdo/KAFKA-7616 --- .../org/apache/kafka/clients/consumer/MockConsumer.java | 5 +---- .../apache/kafka/clients/consumer/MockConsumerTest.java | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index 4af3ba1..dccb359 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -183,9 +183,6 @@ public class MockConsumer implements Consumer { // update the consumed offset final Map>> results = new HashMap<>(); - for (final TopicPartition topicPartition : records.keySet()) { - results.put(topicPartition, new ArrayList<>()); - } for (Map.Entry>> entry : this.records.entrySet()) { if (!subscriptions.isPaused(entry.getKey())) { @@ -196,7 +193,7 @@ public class MockConsumer implements Consumer { } if (assignment().contains(entry.getKey()) && rec.offset() >= subscriptions.position(entry.getKey())) { - results.get(entry.getKey()).add(rec); + results.computeIfAbsent(entry.getKey(), partition -> new ArrayList<>()).add(rec); subscriptions.position(entry.getKey(), rec.offset() + 1); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java index 1d01eb6..03013e6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java @@ -26,8 +26,10 @@ import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; public class MockConsumerTest { @@ -84,4 +86,16 @@ public class MockConsumerTest { assertEquals(2L, consumer.committed(new TopicPartition("test", 0)).offset()); } + @Test + public void testConsumerRecordsIsEmptyWhenReturningNoRecords() { + TopicPartition partition = new TopicPartition("test", 0); + consumer.assign(Collections.singleton(partition)); + consumer.addRecord(new ConsumerRecord("test", 0, 0, null, null)); + consumer.updateEndOffsets(Collections.singletonMap(partition, 1L)); + consumer.seekToEnd(Collections.singleton(partition)); + ConsumerRecords records = consumer.poll(Duration.ofMillis(1)); + assertThat(records.count(), is(0)); + assertThat(records.isEmpty(), is(true)); + } + }