kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: add test case for fetching from a compacted topic
Date Thu, 05 Nov 2015 01:25:51 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk ef5d168cc -> 68f42210a


MINOR: add test case for fetching from a compacted topic

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Guozhang Wang

Closes #426 from hachikuji/compacted-topics


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/68f42210
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/68f42210
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/68f42210

Branch: refs/heads/trunk
Commit: 68f42210a1c8ce64846ffdc2cdbecc6fa5b87739
Parents: ef5d168
Author: Jason Gustafson <jason@confluent.io>
Authored: Wed Nov 4 17:31:34 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Nov 4 17:31:34 2015 -0800

----------------------------------------------------------------------
 .../clients/consumer/internals/FetcherTest.java | 29 ++++++++++++++++++++
 1 file changed, 29 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/68f42210/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 8711830..7b1e4cb 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -128,6 +128,35 @@ public class FetcherTest {
         }
     }
 
+    @Test
+    public void testFetchNonContinuousRecords() {
+        // if we are fetching from a compacted topic, there may be gaps in the returned records
+        // this test verifies the fetcher updates the current fetched/consumed positions
correctly for this case
+
+        MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
+        records.append(15L, "key".getBytes(), "value-1".getBytes());
+        records.append(20L, "key".getBytes(), "value-2".getBytes());
+        records.append(30L, "key".getBytes(), "value-3".getBytes());
+        records.close();
+
+        List<ConsumerRecord<byte[], byte[]>> consumerRecords;
+        subscriptions.assignFromUser(Arrays.asList(tp));
+        subscriptions.seek(tp, 0);
+
+        // normal fetch
+        fetcher.initFetches(cluster);
+        client.prepareResponse(fetchResponse(records.buffer(), Errors.NONE.code(), 100L,
0));
+        consumerClient.poll(0);
+        consumerRecords = fetcher.fetchedRecords().get(tp);
+        assertEquals(3, consumerRecords.size());
+        assertEquals(31L, (long) subscriptions.fetched(tp)); // this is the next fetching
position
+        assertEquals(31L, (long) subscriptions.consumed(tp));
+
+        assertEquals(15L, consumerRecords.get(0).offset());
+        assertEquals(20L, consumerRecords.get(1).offset());
+        assertEquals(30L, consumerRecords.get(2).offset());
+    }
+
     @Test(expected = RecordTooLargeException.class)
     public void testFetchRecordTooLarge() {
         subscriptions.assignFromUser(Arrays.asList(tp));


Mime
View raw message