kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5353; baseTimestamp should always have a create timestamp
Date Wed, 31 May 2017 23:17:00 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk eeb8f6781 -> 647afeff6


KAFKA-5353; baseTimestamp should always have a create timestamp

This makes the case where we build the records from scratch consistent
with the case where update the batch header "in place". Thanks to
edenhill who found the issue while testing librdkafka.

The reason our tests don’t catch this is that we rely on the maxTimestamp
to compute the record level timestamps if log append time is used.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #3177 from ijuma/set-base-sequence-for-log-append-time


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/647afeff
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/647afeff
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/647afeff

Branch: refs/heads/trunk
Commit: 647afeff6a2e3fd78328f6989e8d9f96bcde5121
Parents: eeb8f67
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Thu Jun 1 00:16:55 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Thu Jun 1 00:16:55 2017 +0100

----------------------------------------------------------------------
 .../kafka/common/record/DefaultRecordBatch.java |  8 ++++-
 .../common/record/MemoryRecordsBuilder.java     |  8 ++---
 .../kafka/common/record/MutableRecordBatch.java |  8 +++--
 .../apache/kafka/common/record/RecordBatch.java |  6 ++--
 .../scala/unit/kafka/log/LogValidatorTest.scala | 35 +++++++++++++++-----
 5 files changed, 45 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/647afeff/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index bdba860..7a0e530 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -131,7 +131,13 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements
MutableRe
                     + ", computed crc = " + computeChecksum() + ")");
     }
 
-    private long baseTimestamp() {
+    /**
+     * Get the timestamp of the first record in this batch. It is always the create time
of the record even if the
+     * timestamp type of the batch is log append time.
+     *
+     * @return The base timestamp
+     */
+    public long baseTimestamp() {
         return buffer.getLong(BASE_TIMESTAMP_OFFSET);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/647afeff/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index aaca851..66560ca 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -331,15 +331,11 @@ public class MemoryRecordsBuilder {
         int writtenCompressed = size - DefaultRecordBatch.RECORD_BATCH_OVERHEAD;
         int offsetDelta = (int) (lastOffset - baseOffset);
 
-        final long baseTimestamp;
         final long maxTimestamp;
-        if (timestampType == TimestampType.LOG_APPEND_TIME) {
-            baseTimestamp = logAppendTime;
+        if (timestampType == TimestampType.LOG_APPEND_TIME)
             maxTimestamp = logAppendTime;
-        } else {
-            baseTimestamp = this.baseTimestamp;
+        else
             maxTimestamp = this.maxTimestamp;
-        }
 
         DefaultRecordBatch.writeHeader(buffer, baseOffset, offsetDelta, size, magic, compressionType,
timestampType,
                 baseTimestamp, maxTimestamp, producerId, producerEpoch, baseSequence, isTransactional,
isControlBatch,

http://git-wip-us.apache.org/repos/asf/kafka/blob/647afeff/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java
b/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java
index 728b6eb..8049469 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java
@@ -32,8 +32,12 @@ public interface MutableRecordBatch extends RecordBatch {
 
     /**
      * Set the max timestamp for this batch. When using log append time, this effectively
overrides the individual
-     * timestamps of all the records contained in the batch. Note that this typically requires
re-computation
-     * of the batch's CRC.
+     * timestamps of all the records contained in the batch. To avoid recompression, the
record fields are not updated
+     * by this method, but clients ignore them if the timestamp time is log append time.
Note that baseTimestamp is not
+     * updated by this method.
+     *
+     * This typically requires re-computation of the batch's CRC.
+     *
      * @param timestampType The timestamp type
      * @param maxTimestamp The maximum timestamp
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/647afeff/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
index ef773da..65a6a95 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
@@ -80,8 +80,10 @@ public interface RecordBatch extends Iterable<Record> {
     long checksum();
 
     /**
-     * Get the timestamp of this record batch. This is the max timestamp among all records
contained in this batch.
-     * This value is updated during compaction.
+     * Get the max timestamp or log append time of this record batch.
+     *
+     * If the timestamp type is create time, this is the max timestamp among all records
contained in this batch and
+     * the value is updated during compaction.
      *
      * @return The max timestamp
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/647afeff/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index f40745d..3ab9732 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -38,7 +38,7 @@ class LogValidatorTest {
   private def checkLogAppendTimeNonCompressed(magic: Byte) {
     val now = System.currentTimeMillis()
     // The timestamps should be overwritten
-    val records = createRecords(magicValue = magic, timestamp = 0L, codec = CompressionType.NONE)
+    val records = createRecords(magicValue = magic, timestamp = 1234L, codec = CompressionType.NONE)
     val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(0),
       now = now,
@@ -52,7 +52,7 @@ class LogValidatorTest {
       isFromClient = true)
     val validatedRecords = validatedResults.validatedRecords
     assertEquals("message set size should not change", records.records.asScala.size, validatedRecords.records.asScala.size)
-    validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, batch))
+    validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, 1234L,
batch))
     assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp)
     assertEquals(s"The offset of max timestamp should be 0", 0, validatedResults.shallowOffsetOfMaxTimestamp)
     assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged)
@@ -86,7 +86,7 @@ class LogValidatorTest {
     val validatedRecords = validatedResults.validatedRecords
 
     assertEquals("message set size should not change", records.records.asScala.size, validatedRecords.records.asScala.size)
-    validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, batch))
+    validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, -1, batch))
     assertTrue("MessageSet should still valid", validatedRecords.batches.iterator.next().isValid)
     assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp)
     assertEquals(s"The offset of max timestamp should be ${records.records.asScala.size -
1}",
@@ -107,7 +107,7 @@ class LogValidatorTest {
   private def checkLogAppendTimeWithoutRecompression(magic: Byte) {
     val now = System.currentTimeMillis()
     // The timestamps should be overwritten
-    val records = createRecords(magicValue = magic, timestamp = 0L, codec = CompressionType.GZIP)
+    val records = createRecords(magicValue = magic, timestamp = 1234L, codec = CompressionType.GZIP)
     val validatedResults = LogValidator.validateMessagesAndAssignOffsets(
       records,
       offsetCounter = new LongRef(0),
@@ -124,7 +124,7 @@ class LogValidatorTest {
 
     assertEquals("message set size should not change", records.records.asScala.size,
       validatedRecords.records.asScala.size)
-    validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, batch))
+    validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, 1234L,
batch))
     assertTrue("MessageSet should still valid", validatedRecords.batches.iterator.next().isValid)
     assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp)
     assertEquals(s"The offset of max timestamp should be ${records.records.asScala.size -
1}",
@@ -176,6 +176,7 @@ class LogValidatorTest {
     for (batch <- validatedRecords.batches.asScala) {
       assertTrue(batch.isValid)
       assertEquals(batch.timestampType, TimestampType.CREATE_TIME)
+      maybeCheckBaseTimestamp(timestampSeq(0), batch)
       assertEquals(batch.maxTimestamp, batch.asScala.map(_.timestamp).max)
       assertEquals(producerEpoch, batch.producerEpoch)
       assertEquals(producerId, batch.producerId)
@@ -237,6 +238,7 @@ class LogValidatorTest {
     for (batch <- validatedRecords.batches.asScala) {
       assertTrue(batch.isValid)
       assertEquals(batch.timestampType, TimestampType.CREATE_TIME)
+      maybeCheckBaseTimestamp(timestampSeq(0), batch)
       assertEquals(batch.maxTimestamp, batch.asScala.map(_.timestamp).max)
       assertEquals(producerEpoch, batch.producerEpoch)
       assertEquals(producerId, batch.producerId)
@@ -280,6 +282,7 @@ class LogValidatorTest {
 
     for (batch <- validatedRecords.batches.asScala) {
       assertTrue(batch.isValid)
+      maybeCheckBaseTimestamp(RecordBatch.NO_TIMESTAMP, batch)
       assertEquals(RecordBatch.NO_TIMESTAMP, batch.maxTimestamp)
       assertEquals(TimestampType.CREATE_TIME, batch.timestampType)
       assertEquals(RecordBatch.NO_PRODUCER_EPOCH, batch.producerEpoch)
@@ -316,6 +319,7 @@ class LogValidatorTest {
 
     for (batch <- validatedRecords.batches.asScala) {
       assertTrue(batch.isValid)
+      maybeCheckBaseTimestamp(timestamp, batch)
       assertEquals(timestamp, batch.maxTimestamp)
       assertEquals(TimestampType.CREATE_TIME, batch.timestampType)
       assertEquals(RecordBatch.NO_PRODUCER_EPOCH, batch.producerEpoch)
@@ -367,6 +371,7 @@ class LogValidatorTest {
     for (batch <- validatedRecords.batches.asScala) {
       assertTrue(batch.isValid)
       assertEquals(batch.timestampType, TimestampType.CREATE_TIME)
+      maybeCheckBaseTimestamp(timestampSeq(0), batch)
       assertEquals(batch.maxTimestamp, batch.asScala.map(_.timestamp).max)
       assertEquals(producerEpoch, batch.producerEpoch)
       assertEquals(producerId, batch.producerId)
@@ -945,13 +950,25 @@ class LogValidatorTest {
     builder.build()
   }
 
-  def validateLogAppendTime(now: Long, batch: RecordBatch) {
+  def maybeCheckBaseTimestamp(expected: Long, batch: RecordBatch): Unit = {
+    batch match {
+      case b: DefaultRecordBatch =>
+        assertEquals(s"Unexpected base timestamp of batch $batch", expected, b.baseTimestamp)
+      case _ => // no-op
+    }
+  }
+
+  /**
+    * expectedLogAppendTime is only checked if batch.magic is V2 or higher
+    */
+  def validateLogAppendTime(expectedLogAppendTime: Long, expectedBaseTimestamp: Long, batch:
RecordBatch) {
     assertTrue(batch.isValid)
-    assertTrue(batch.timestampType() == TimestampType.LOG_APPEND_TIME)
-    assertEquals(s"Timestamp of message $batch should be $now", now, batch.maxTimestamp)
+    assertTrue(batch.timestampType == TimestampType.LOG_APPEND_TIME)
+    assertEquals(s"Unexpected max timestamp of batch $batch", expectedLogAppendTime, batch.maxTimestamp)
+    maybeCheckBaseTimestamp(expectedBaseTimestamp, batch)
     for (record <- batch.asScala) {
       assertTrue(record.isValid)
-      assertEquals(s"Timestamp of message $record should be $now", now, record.timestamp)
+      assertEquals(s"Unexpected timestamp of record $record", expectedLogAppendTime, record.timestamp)
     }
   }
 


Mime
View raw message