kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject kafka git commit: KAFKA-3179; Fix seek on compressed messages
Date Fri, 05 Feb 2016 01:24:22 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.9.0 c6ecc7809 -> c3f575d5f


KAFKA-3179; Fix seek on compressed messages

The fix itself is simple.

Some explanation on unit tests. Currently we the vast majority of unit test is running with
uncompressed messages.  I was initially thinking about run all the tests using compressed
messages. But it seems uncompressed messages are necessary in a many test cases because we
need the bytes sent and appended to the log to be predictable. In most of other cases, it
does not matter whether the message is compressed or not, and compression will slow down the
unit test. So I just added one method in the BaseConsumerTest to send compressed messages
whenever we need it.

Author: Jiangjie Qin <becket.qin@gmail.com>

Reviewers: Aditya Auradkar <aauradkar@linkedin.com>, Ismael Juma <ismael@juma.me.uk>,
Joel Koshy <jjkoshy.w@gmail.com>

Closes #842 from becketqin/KAFKA-3179


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c3f575d5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c3f575d5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c3f575d5

Branch: refs/heads/0.9.0
Commit: c3f575d5f248cd60daada7037838d0c7c5be1277
Parents: c6ecc78
Author: Jiangjie Qin <becket.qin@gmail.com>
Authored: Thu Feb 4 16:08:21 2016 -0800
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Thu Feb 4 16:53:25 2016 -0800

----------------------------------------------------------------------
 .../clients/consumer/internals/Fetcher.java     |  7 +++-
 .../clients/consumer/internals/FetcherTest.java | 10 ++++-
 .../kafka/api/BaseConsumerTest.scala            | 31 ++++++++++------
 .../kafka/api/PlaintextConsumerTest.scala       | 39 ++++++++++++++++++--
 4 files changed, 69 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f575d5/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index c5d1bfe..e894102 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -561,8 +561,11 @@ public class Fetcher<K, V> {
                 MemoryRecords records = MemoryRecords.readableRecords(buffer);
                 List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
                 for (LogEntry logEntry : records) {
-                    parsed.add(parseRecord(tp, logEntry));
-                    bytes += logEntry.size();
+                    // Skip the messages earlier than current position.
+                    if (logEntry.offset() >= position) {
+                        parsed.add(parseRecord(tp, logEntry));
+                        bytes += logEntry.size();
+                    }
                 }
 
                 if (!parsed.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f575d5/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 0a976fe..57fc67f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -466,8 +466,16 @@ public class FetcherTest {
 
         // normal fetch
         for (int i = 1; i < 4; i++) {
+            // We need to make sure the message offset grows. Otherwise they will be considered
as already consumed
+            // and filtered out by consumer.
+            if (i > 1) {
+                this.records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
+                for (int v = 0; v < 3; v++) {
+                    this.records.append((long) i * 3 + v, "key".getBytes(), String.format("value-%d",
v).getBytes());
+                }
+                this.records.close();
+            }
             fetcher.initFetches(cluster);
-
             client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(),
100L, 100 * i));
             consumerClient.poll(0);
             records = fetcher.fetchedRecords().get(tp);

http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f575d5/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 819e690..eb24706 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -15,7 +15,7 @@ package kafka.api
 import java.util
 
 import org.apache.kafka.clients.consumer._
-import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
+import org.apache.kafka.clients.producer.{Producer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.{PartitionInfo, TopicPartition}
 
@@ -73,7 +73,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging
{
     assertEquals(0, this.consumers(0).assignment.size)
     this.consumers(0).assign(List(tp).asJava)
     assertEquals(1, this.consumers(0).assignment.size)
-    
+
     this.consumers(0).seek(tp, 0)
     consumeAndVerifyRecords(this.consumers(0), numRecords = numRecords, startingOffset =
0)
 
@@ -143,20 +143,20 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with
Logging {
     this.consumers(0).poll(50)
     val pos1 = this.consumers(0).position(tp)
     val pos2 = this.consumers(0).position(tp2)
-    this.consumers(0).commitSync(Map[TopicPartition,OffsetAndMetadata]((tp, new OffsetAndMetadata(3L))).asJava)
+    this.consumers(0).commitSync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(3L))).asJava)
     assertEquals(3, this.consumers(0).committed(tp).offset)
     assertNull(this.consumers(0).committed(tp2))
 
     // positions should not change
     assertEquals(pos1, this.consumers(0).position(tp))
     assertEquals(pos2, this.consumers(0).position(tp2))
-    this.consumers(0).commitSync(Map[TopicPartition,OffsetAndMetadata]((tp2, new OffsetAndMetadata(5L))).asJava)
+    this.consumers(0).commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(5L))).asJava)
     assertEquals(3, this.consumers(0).committed(tp).offset)
     assertEquals(5, this.consumers(0).committed(tp2).offset)
 
     // Using async should pick up the committed changes after commit completes
     val commitCallback = new CountConsumerCommitCallback()
-    this.consumers(0).commitAsync(Map[TopicPartition,OffsetAndMetadata]((tp2, new OffsetAndMetadata(7L))).asJava,
commitCallback)
+    this.consumers(0).commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(7L))).asJava,
commitCallback)
     awaitCommitCallback(this.consumers(0), commitCallback)
     assertEquals(7, this.consumers(0).committed(tp2).offset)
   }
@@ -259,10 +259,12 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with
Logging {
   protected class TestConsumerReassignmentListener extends ConsumerRebalanceListener {
     var callsToAssigned = 0
     var callsToRevoked = 0
+
     def onPartitionsAssigned(partitions: java.util.Collection[TopicPartition]) {
       info("onPartitionsAssigned called.")
       callsToAssigned += 1
     }
+
     def onPartitionsRevoked(partitions: java.util.Collection[TopicPartition]) {
       info("onPartitionsRevoked called.")
       callsToRevoked += 1
@@ -274,13 +276,20 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with
Logging {
   }
 
   protected def sendRecords(numRecords: Int, tp: TopicPartition) {
-    (0 until numRecords).map { i =>
-      this.producers(0).send(new ProducerRecord(tp.topic(), tp.partition(), s"key $i".getBytes,
s"value $i".getBytes))
-    }.foreach(_.get)
+    sendRecords(this.producers(0), numRecords, tp)
+  }
+
+  protected def sendRecords(producer: Producer[Array[Byte], Array[Byte]],
+                            numRecords: Int,
+                            tp: TopicPartition) {
+    (0 until numRecords).foreach { i =>
+      producer.send(new ProducerRecord(tp.topic(), tp.partition(), s"key $i".getBytes, s"value
$i".getBytes))
+    }
+    producer.flush()
   }
 
   protected def consumeAndVerifyRecords(consumer: Consumer[Array[Byte], Array[Byte]], numRecords:
Int, startingOffset: Int,
-                                      startingKeyAndValueIndex: Int = 0) {
+                                      startingKeyAndValueIndex: Int = 0, tp: TopicPartition
= tp) {
     val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]()
     val maxIters = numRecords * 300
     var iters = 0
@@ -294,8 +303,8 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging
{
     for (i <- 0 until numRecords) {
       val record = records.get(i)
       val offset = startingOffset + i
-      assertEquals(topic, record.topic())
-      assertEquals(part, record.partition())
+      assertEquals(tp.topic(), record.topic())
+      assertEquals(tp.partition(), record.partition())
       assertEquals(offset.toLong, record.offset())
       val keyAndValueIndex = startingKeyAndValueIndex + i
       assertEquals(s"key $keyAndValueIndex", new String(record.key()))

http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f575d5/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 90e9562..c92b083 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -12,13 +12,15 @@
   */
 package kafka.api
 
+import java.util.Properties
 import java.util.regex.Pattern
 
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer._
-import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.record.CompressionType
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.errors.{InvalidTopicException, RecordTooLargeException}
 import org.junit.Assert._
@@ -266,7 +268,10 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   def testSeek() {
     val consumer = this.consumers(0)
     val totalRecords = 50L
-    sendRecords(totalRecords.toInt)
+    val mid = totalRecords / 2
+
+    // Test seek non-compressed message
+    sendRecords(totalRecords.toInt, tp)
     consumer.assign(List(tp).asJava)
 
     consumer.seekToEnd(tp)
@@ -277,10 +282,36 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(0, consumer.position(tp), 0)
     consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 0)
 
-    val mid = totalRecords / 2
     consumer.seek(tp, mid)
     assertEquals(mid, consumer.position(tp))
     consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = mid.toInt, startingKeyAndValueIndex
= mid.toInt)
+
+    // Test seek compressed message
+    sendCompressedMessages(totalRecords.toInt, tp2)
+    consumer.assign(List(tp2).asJava)
+
+    consumer.seekToEnd(tp2)
+    assertEquals(totalRecords, consumer.position(tp2))
+    assertFalse(consumer.poll(totalRecords).iterator().hasNext)
+
+    consumer.seekToBeginning(tp2)
+    assertEquals(0, consumer.position(tp2), 0)
+    consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 0, tp = tp2)
+
+    consumer.seek(tp2, mid)
+    assertEquals(mid, consumer.position(tp2))
+    consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = mid.toInt, startingKeyAndValueIndex
= mid.toInt,
+      tp = tp2)
+  }
+
+  private def sendCompressedMessages(numRecords: Int, tp: TopicPartition) {
+    val producerProps = new Properties()
+    producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.GZIP.name)
+    producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, Long.MaxValue.toString)
+    val producer = TestUtils.createNewProducer(brokerList, securityProtocol = securityProtocol,
trustStoreFile = trustStoreFile,
+        retries = 0, lingerMs = Long.MaxValue, props = Some(producerProps))
+    sendRecords(producer, numRecords, tp)
+    producer.close()
   }
 
   def testPositionAndCommit() {


Mime
View raw message