kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.1 updated: KAFKA-7616; Make MockConsumer only add entries to the partition map returned by poll() if there are any records to return
Date Wed, 12 Dec 2018 23:34:44 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 6ad0bf4  KAFKA-7616; Make MockConsumer only add entries to the partition map returned
by poll() if there are any records to return
6ad0bf4 is described below

commit 6ad0bf4dabf3d95e382a14b1601abab183cb027b
Author: Stig Rohde Døssing <stigdoessing@gmail.com>
AuthorDate: Tue Nov 20 23:16:21 2018 +0530

    KAFKA-7616; Make MockConsumer only add entries to the partition map returned by poll()
if there are any records to return
    
    …eturned by poll() if there are any records to return
    
    The MockConsumer behaves unlike the real consumer in that it can return a non-empty ConsumerRecords
from poll, that also has a count of 0. This change makes the MockConsumer only add partitions
to the ConsumerRecords if there are records to return for those partitions.
    
    A unit test in MockConsumerTest demonstrates the issue.
    
    Author: Stig Rohde Døssing <stigdoessing@gmail.com>
    
    Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
    
    Closes #5901 from srdo/KAFKA-7616
---
 .../org/apache/kafka/clients/consumer/MockConsumer.java    |  5 +----
 .../apache/kafka/clients/consumer/MockConsumerTest.java    | 14 ++++++++++++++
 2 files changed, 15 insertions(+), 4 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 4af3ba1..dccb359 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -183,9 +183,6 @@ public class MockConsumer<K, V> implements Consumer<K, V>
{
 
         // update the consumed offset
         final Map<TopicPartition, List<ConsumerRecord<K, V>>> results =
new HashMap<>();
-        for (final TopicPartition topicPartition : records.keySet()) {
-            results.put(topicPartition, new ArrayList<>());
-        }
 
         for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry
: this.records.entrySet()) {
             if (!subscriptions.isPaused(entry.getKey())) {
@@ -196,7 +193,7 @@ public class MockConsumer<K, V> implements Consumer<K, V>
{
                     }
 
                     if (assignment().contains(entry.getKey()) && rec.offset() >=
subscriptions.position(entry.getKey())) {
-                        results.get(entry.getKey()).add(rec);
+                        results.computeIfAbsent(entry.getKey(), partition -> new ArrayList<>()).add(rec);
                         subscriptions.position(entry.getKey(), rec.offset() + 1);
                     }
                 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index 1d01eb6..03013e6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -26,8 +26,10 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 
+import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 
 public class MockConsumerTest {
     
@@ -84,4 +86,16 @@ public class MockConsumerTest {
         assertEquals(2L, consumer.committed(new TopicPartition("test", 0)).offset());
     }
 
+    @Test
+    public void testConsumerRecordsIsEmptyWhenReturningNoRecords() {
+        TopicPartition partition = new TopicPartition("test", 0);
+        consumer.assign(Collections.singleton(partition));
+        consumer.addRecord(new ConsumerRecord<String, String>("test", 0, 0, null, null));
+        consumer.updateEndOffsets(Collections.singletonMap(partition, 1L));
+        consumer.seekToEnd(Collections.singleton(partition));
+        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1));
+        assertThat(records.count(), is(0));
+        assertThat(records.isEmpty(), is(true));
+    }
+
 }


Mime
View raw message