kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [1/2] kafka git commit: MINOR: Log append validation improvements
Date Fri, 07 Apr 2017 17:30:10 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a4c50687f -> 5cf64f06a


http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf64f06/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index 903c394..5b2c660 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -19,16 +19,15 @@ package kafka.log
 import java.nio.ByteBuffer
 
 import kafka.common.LongRef
-import kafka.message._
+import kafka.message.{DefaultCompressionCodec, GZIPCompressionCodec, NoCompressionCodec,
SnappyCompressionCodec}
 import org.apache.kafka.common.errors.InvalidTimestampException
 import org.apache.kafka.common.record._
 import org.junit.Assert._
 import org.junit.Test
-import org.scalatest.junit.JUnitSuite
 
 import scala.collection.JavaConverters._
 
-class LogValidatorTest extends JUnitSuite {
+class LogValidatorTest {
 
   @Test
   def testLogAppendTimeNonCompressedV1() {
@@ -44,9 +43,11 @@ class LogValidatorTest extends JUnitSuite {
       now = now,
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
-      messageFormatVersion = magic,
-      messageTimestampType = TimestampType.LOG_APPEND_TIME,
-      messageTimestampDiffMaxMs = 1000L)
+      compactedTopic = false,
+      magic = magic,
+      timestampType = TimestampType.LOG_APPEND_TIME,
+      timestampDiffMaxMs = 1000L,
+      partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH)
     val validatedRecords = validatedResults.validatedRecords
     assertEquals("message set size should not change", records.records.asScala.size, validatedRecords.records.asScala.size)
     validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, batch))
@@ -74,9 +75,11 @@ class LogValidatorTest extends JUnitSuite {
       now = now,
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
-      messageFormatVersion = targetMagic,
-      messageTimestampType = TimestampType.LOG_APPEND_TIME,
-      messageTimestampDiffMaxMs = 1000L)
+      compactedTopic = false,
+      magic = targetMagic,
+      timestampType = TimestampType.LOG_APPEND_TIME,
+      timestampDiffMaxMs = 1000L,
+      partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH)
     val validatedRecords = validatedResults.validatedRecords
 
     assertEquals("message set size should not change", records.records.asScala.size, validatedRecords.records.asScala.size)
@@ -108,9 +111,11 @@ class LogValidatorTest extends JUnitSuite {
       now = now,
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
-      messageFormatVersion = magic,
-      messageTimestampType = TimestampType.LOG_APPEND_TIME,
-      messageTimestampDiffMaxMs = 1000L)
+      compactedTopic = false,
+      magic = magic,
+      timestampType = TimestampType.LOG_APPEND_TIME,
+      timestampDiffMaxMs = 1000L,
+      partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH)
     val validatedRecords = validatedResults.validatedRecords
 
     assertEquals("message set size should not change", records.records.asScala.size,
@@ -129,27 +134,32 @@ class LogValidatorTest extends JUnitSuite {
   }
 
   @Test
-  def testCreateTimeNonCompressedV1() {
-    checkCreateTimeNonCompressed(RecordBatch.MAGIC_VALUE_V1)
+  def testNonCompressedV1() {
+    checkNonCompressed(RecordBatch.MAGIC_VALUE_V1)
   }
 
-  def checkCreateTimeNonCompressed(magic: Byte) {
+  private def checkNonCompressed(magic: Byte) {
     val now = System.currentTimeMillis()
     val timestampSeq = Seq(now - 1, now + 1, now)
-    val records =
-      MemoryRecords.withRecords(magic, CompressionType.NONE,
-        new SimpleRecord(timestampSeq(0), "hello".getBytes),
-        new SimpleRecord(timestampSeq(1), "there".getBytes),
-        new SimpleRecord(timestampSeq(2), "beautiful".getBytes))
+    val producerId = if (magic >= RecordBatch.MAGIC_VALUE_V2) 1324L else RecordBatch.NO_PRODUCER_ID
+    val producerEpoch = if (magic >= RecordBatch.MAGIC_VALUE_V2) 10: Short else RecordBatch.NO_PRODUCER_EPOCH
+    val baseSequence = if (magic >= RecordBatch.MAGIC_VALUE_V2) 20 else RecordBatch.NO_SEQUENCE
+    val partitionLeaderEpoch = if (magic >= RecordBatch.MAGIC_VALUE_V2) 40 else RecordBatch.NO_PARTITION_LEADER_EPOCH
+    val records = MemoryRecords.withRecords(magic, 0L, CompressionType.NONE, producerId,
producerEpoch, baseSequence,
+      partitionLeaderEpoch, new SimpleRecord(timestampSeq(0), "hello".getBytes),
+      new SimpleRecord(timestampSeq(1), "there".getBytes), new SimpleRecord(timestampSeq(2),
"beautiful".getBytes))
+
 
     val validatingResults = LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(0),
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
-      messageFormatVersion = magic,
-      messageTimestampType = TimestampType.CREATE_TIME,
-      messageTimestampDiffMaxMs = 1000L)
+      compactedTopic = false,
+      magic = magic,
+      timestampType = TimestampType.CREATE_TIME,
+      timestampDiffMaxMs = 1000L,
+      partitionLeaderEpoch = partitionLeaderEpoch)
     val validatedRecords = validatingResults.validatedRecords
 
     var i = 0
@@ -157,6 +167,10 @@ class LogValidatorTest extends JUnitSuite {
       assertTrue(batch.isValid)
       assertEquals(batch.timestampType, TimestampType.CREATE_TIME)
       assertEquals(batch.maxTimestamp, batch.asScala.map(_.timestamp).max)
+      assertEquals(producerEpoch, batch.producerEpoch)
+      assertEquals(producerId, batch.producerId)
+      assertEquals(baseSequence, batch.baseSequence)
+      assertEquals(partitionLeaderEpoch, batch.partitionLeaderEpoch)
       for (record <- batch.asScala) {
         assertTrue(record.isValid)
         assertEquals(timestampSeq(i), record.timestamp)
@@ -169,31 +183,90 @@ class LogValidatorTest extends JUnitSuite {
   }
 
   @Test
-  def testCreateTimeNonCompressedV2() {
-    checkCreateTimeNonCompressed(RecordBatch.MAGIC_VALUE_V2)
+  def testNonCompressedV2() {
+    checkNonCompressed(RecordBatch.MAGIC_VALUE_V2)
+  }
+
+  @Test
+  def testRecompressionV1(): Unit = {
+    checkRecompression(RecordBatch.MAGIC_VALUE_V1)
+  }
+
+  private def checkRecompression(magic: Byte): Unit = {
+    val now = System.currentTimeMillis()
+    val timestampSeq = Seq(now - 1, now + 1, now)
+    val producerId = if (magic >= RecordBatch.MAGIC_VALUE_V2) 1324L else RecordBatch.NO_PRODUCER_ID
+    val producerEpoch = if (magic >= RecordBatch.MAGIC_VALUE_V2) 10: Short else RecordBatch.NO_PRODUCER_EPOCH
+    val baseSequence = if (magic >= RecordBatch.MAGIC_VALUE_V2) 20 else RecordBatch.NO_SEQUENCE
+    val partitionLeaderEpoch = if (magic >= RecordBatch.MAGIC_VALUE_V2) 40 else RecordBatch.NO_PARTITION_LEADER_EPOCH
+    val records = MemoryRecords.withRecords(magic, 0L, CompressionType.NONE, producerId,
producerEpoch, baseSequence,
+      partitionLeaderEpoch, new SimpleRecord(timestampSeq(0), "hello".getBytes),
+      new SimpleRecord(timestampSeq(1), "there".getBytes), new SimpleRecord(timestampSeq(2),
"beautiful".getBytes))
+
+
+    val validatingResults = LogValidator.validateMessagesAndAssignOffsets(records,
+      offsetCounter = new LongRef(0),
+      now = System.currentTimeMillis(),
+      sourceCodec = NoCompressionCodec,
+      targetCodec = GZIPCompressionCodec,
+      compactedTopic = false,
+      magic = magic,
+      timestampType = TimestampType.CREATE_TIME,
+      timestampDiffMaxMs = 1000L,
+      partitionLeaderEpoch = partitionLeaderEpoch)
+    val validatedRecords = validatingResults.validatedRecords
+
+    var i = 0
+    for (batch <- validatedRecords.batches.asScala) {
+      assertTrue(batch.isValid)
+      assertEquals(batch.timestampType, TimestampType.CREATE_TIME)
+      assertEquals(batch.maxTimestamp, batch.asScala.map(_.timestamp).max)
+      assertEquals(producerEpoch, batch.producerEpoch)
+      assertEquals(producerId, batch.producerId)
+      assertEquals(baseSequence, batch.baseSequence)
+      assertEquals(partitionLeaderEpoch, batch.partitionLeaderEpoch)
+      for (record <- batch.asScala) {
+        assertTrue(record.isValid)
+        assertEquals(timestampSeq(i), record.timestamp)
+        i += 1
+      }
+    }
+    assertEquals(s"Max timestamp should be ${now + 1}", now + 1, validatingResults.maxTimestamp)
+    assertEquals("Offset of max timestamp should be 2", 2, validatingResults.shallowOffsetOfMaxTimestamp)
+    assertTrue("Message size should have been changed", validatingResults.messageSizeMaybeChanged)
+  }
+
+  @Test
+  def testRecompressionV2(): Unit = {
+    checkRecompression(RecordBatch.MAGIC_VALUE_V2)
   }
 
   @Test
   def testCreateTimeUpConversionV0ToV1(): Unit = {
-    checkCreateTimeUpConvertionFromV0(RecordBatch.MAGIC_VALUE_V1)
+    checkCreateTimeUpConversionFromV0(RecordBatch.MAGIC_VALUE_V1)
   }
 
-  private def checkCreateTimeUpConvertionFromV0(toMagic: Byte) {
+  private def checkCreateTimeUpConversionFromV0(toMagic: Byte) {
     val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = CompressionType.GZIP)
     val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(0),
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
-      messageFormatVersion = toMagic,
-      messageTimestampType = TimestampType.CREATE_TIME,
-      messageTimestampDiffMaxMs = 1000L)
+      magic = toMagic,
+      compactedTopic = false,
+      timestampType = TimestampType.CREATE_TIME,
+      timestampDiffMaxMs = 1000L,
+      partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH)
     val validatedRecords = validatedResults.validatedRecords
 
     for (batch <- validatedRecords.batches.asScala) {
       assertTrue(batch.isValid)
       assertEquals(RecordBatch.NO_TIMESTAMP, batch.maxTimestamp)
       assertEquals(TimestampType.CREATE_TIME, batch.timestampType)
+      assertEquals(RecordBatch.NO_PRODUCER_EPOCH, batch.producerEpoch)
+      assertEquals(RecordBatch.NO_PRODUCER_ID, batch.producerId)
+      assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence)
     }
     assertEquals(s"Max timestamp should be ${RecordBatch.NO_TIMESTAMP}", RecordBatch.NO_TIMESTAMP,
validatedResults.maxTimestamp)
     assertEquals(s"Offset of max timestamp should be ${validatedRecords.records.asScala.size
- 1}",
@@ -203,7 +276,7 @@ class LogValidatorTest extends JUnitSuite {
 
   @Test
   def testCreateTimeUpConversionV0ToV2() {
-    checkCreateTimeUpConvertionFromV0(RecordBatch.MAGIC_VALUE_V2)
+    checkCreateTimeUpConversionFromV0(RecordBatch.MAGIC_VALUE_V2)
   }
 
   @Test
@@ -215,15 +288,20 @@ class LogValidatorTest extends JUnitSuite {
       now = timestamp,
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
-      messageFormatVersion = RecordBatch.MAGIC_VALUE_V2,
-      messageTimestampType = TimestampType.CREATE_TIME,
-      messageTimestampDiffMaxMs = 1000L)
+      magic = RecordBatch.MAGIC_VALUE_V2,
+      compactedTopic = false,
+      timestampType = TimestampType.CREATE_TIME,
+      timestampDiffMaxMs = 1000L,
+      partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH)
     val validatedRecords = validatedResults.validatedRecords
 
     for (batch <- validatedRecords.batches.asScala) {
       assertTrue(batch.isValid)
       assertEquals(timestamp, batch.maxTimestamp)
       assertEquals(TimestampType.CREATE_TIME, batch.timestampType)
+      assertEquals(RecordBatch.NO_PRODUCER_EPOCH, batch.producerEpoch)
+      assertEquals(RecordBatch.NO_PRODUCER_ID, batch.producerId)
+      assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence)
     }
     assertEquals(timestamp, validatedResults.maxTimestamp)
     assertEquals(s"Offset of max timestamp should be ${validatedRecords.records.asScala.size
- 1}",
@@ -232,27 +310,31 @@ class LogValidatorTest extends JUnitSuite {
   }
 
   @Test
-  def testCreateTimeCompressedV1() {
-    createCreateTimeCompressed(RecordBatch.MAGIC_VALUE_V1)
+  def testCompressedV1() {
+    checkCompressed(RecordBatch.MAGIC_VALUE_V1)
   }
 
-  def createCreateTimeCompressed(magic: Byte) {
+  private def checkCompressed(magic: Byte) {
     val now = System.currentTimeMillis()
     val timestampSeq = Seq(now - 1, now + 1, now)
-    val records =
-      MemoryRecords.withRecords(magic, CompressionType.GZIP,
-        new SimpleRecord(timestampSeq(0), "hello".getBytes),
-        new SimpleRecord(timestampSeq(1), "there".getBytes),
-        new SimpleRecord(timestampSeq(2), "beautiful".getBytes))
+    val producerId = if (magic >= RecordBatch.MAGIC_VALUE_V2) 1324L else RecordBatch.NO_PRODUCER_ID
+    val producerEpoch = if (magic >= RecordBatch.MAGIC_VALUE_V2) 10: Short else RecordBatch.NO_PRODUCER_EPOCH
+    val baseSequence = if (magic >= RecordBatch.MAGIC_VALUE_V2) 20 else RecordBatch.NO_SEQUENCE
+    val partitionLeaderEpoch = if (magic >= RecordBatch.MAGIC_VALUE_V2) 40 else RecordBatch.NO_PARTITION_LEADER_EPOCH
+    val records = MemoryRecords.withRecords(magic, 0L, CompressionType.GZIP, producerId,
producerEpoch, baseSequence,
+      partitionLeaderEpoch, new SimpleRecord(timestampSeq(0), "hello".getBytes),
+      new SimpleRecord(timestampSeq(1), "there".getBytes), new SimpleRecord(timestampSeq(2),
"beautiful".getBytes))
 
     val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records,
         offsetCounter = new LongRef(0),
         now = System.currentTimeMillis(),
         sourceCodec = DefaultCompressionCodec,
         targetCodec = DefaultCompressionCodec,
-        messageFormatVersion = magic,
-        messageTimestampType = TimestampType.CREATE_TIME,
-        messageTimestampDiffMaxMs = 1000L)
+        magic = magic,
+        compactedTopic = false,
+        timestampType = TimestampType.CREATE_TIME,
+        timestampDiffMaxMs = 1000L,
+        partitionLeaderEpoch = partitionLeaderEpoch)
     val validatedRecords = validatedResults.validatedRecords
 
     var i = 0
@@ -260,6 +342,10 @@ class LogValidatorTest extends JUnitSuite {
       assertTrue(batch.isValid)
       assertEquals(batch.timestampType, TimestampType.CREATE_TIME)
       assertEquals(batch.maxTimestamp, batch.asScala.map(_.timestamp).max)
+      assertEquals(producerEpoch, batch.producerEpoch)
+      assertEquals(producerId, batch.producerId)
+      assertEquals(baseSequence, batch.baseSequence)
+      assertEquals(partitionLeaderEpoch, batch.partitionLeaderEpoch)
       for (record <- batch.asScala) {
         assertTrue(record.isValid)
         assertEquals(timestampSeq(i), record.timestamp)
@@ -273,8 +359,8 @@ class LogValidatorTest extends JUnitSuite {
   }
 
   @Test
-  def testCreateTimeCompressedV2() {
-    createCreateTimeCompressed(RecordBatch.MAGIC_VALUE_V2)
+  def testCompressedV2() {
+    checkCompressed(RecordBatch.MAGIC_VALUE_V2)
   }
 
   @Test(expected = classOf[InvalidTimestampException])
@@ -288,9 +374,11 @@ class LogValidatorTest extends JUnitSuite {
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
-      messageFormatVersion = RecordBatch.MAGIC_VALUE_V1,
-      messageTimestampType = TimestampType.CREATE_TIME,
-      messageTimestampDiffMaxMs = 1000L)
+      compactedTopic = false,
+      magic = RecordBatch.MAGIC_VALUE_V1,
+      timestampType = TimestampType.CREATE_TIME,
+      timestampDiffMaxMs = 1000L,
+      partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH)
   }
 
   @Test(expected = classOf[InvalidTimestampException])
@@ -304,9 +392,11 @@ class LogValidatorTest extends JUnitSuite {
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
-      messageFormatVersion = RecordBatch.MAGIC_VALUE_V2,
-      messageTimestampType = TimestampType.CREATE_TIME,
-      messageTimestampDiffMaxMs = 1000L)
+      compactedTopic = false,
+      magic = RecordBatch.MAGIC_VALUE_V2,
+      timestampType = TimestampType.CREATE_TIME,
+      timestampDiffMaxMs = 1000L,
+      partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH)
   }
 
   @Test(expected = classOf[InvalidTimestampException])
@@ -320,9 +410,11 @@ class LogValidatorTest extends JUnitSuite {
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
-      messageFormatVersion = RecordBatch.MAGIC_VALUE_V1,
-      messageTimestampType = TimestampType.CREATE_TIME,
-      messageTimestampDiffMaxMs = 1000L)
+      magic = RecordBatch.MAGIC_VALUE_V1,
+      compactedTopic = false,
+      timestampType = TimestampType.CREATE_TIME,
+      timestampDiffMaxMs = 1000L,
+      partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH)
   }
 
   @Test(expected = classOf[InvalidTimestampException])
@@ -336,9 +428,11 @@ class LogValidatorTest extends JUnitSuite {
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
-      messageFormatVersion = RecordBatch.MAGIC_VALUE_V1,
-      messageTimestampType = TimestampType.CREATE_TIME,
-      messageTimestampDiffMaxMs = 1000L)
+      magic = RecordBatch.MAGIC_VALUE_V1,
+      compactedTopic = false,
+      timestampType = TimestampType.CREATE_TIME,
+      timestampDiffMaxMs = 1000L,
+      partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH)
   }
 
   @Test
@@ -351,9 +445,11 @@ class LogValidatorTest extends JUnitSuite {
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
-      messageFormatVersion = RecordBatch.MAGIC_VALUE_V0,
-      messageTimestampType = TimestampType.CREATE_TIME,
-      messageTimestampDiffMaxMs = 1000L).validatedRecords, offset)
+      magic = RecordBatch.MAGIC_VALUE_V0,
+      compactedTopic = false,
+      timestampType = TimestampType.CREATE_TIME,
+      timestampDiffMaxMs = 1000L,
+      partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH).validatedRecords, offset)
   }
 
   @Test
@@ -366,9 +462,11 @@ class LogValidatorTest extends JUnitSuite {
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
-      messageFormatVersion = RecordBatch.MAGIC_VALUE_V0,
-      messageTimestampType = TimestampType.CREATE_TIME,
-      messageTimestampDiffMaxMs = 1000L).validatedRecords, offset)
+      compactedTopic = false,
+      magic = RecordBatch.MAGIC_VALUE_V0,
+      timestampType = TimestampType.CREATE_TIME,
+      timestampDiffMaxMs = 1000L,
+      partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH).validatedRecords, offset)
   }
 
   @Test
@@ -382,9 +480,11 @@ class LogValidatorTest extends JUnitSuite {
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
-      messageFormatVersion = RecordBatch.MAGIC_VALUE_V1,
-      messageTimestampType = TimestampType.CREATE_TIME,
-      messageTimestampDiffMaxMs = 5000L).validatedRecords
+      compactedTopic = false,
+      magic = RecordBatch.MAGIC_VALUE_V1,
+      timestampType = TimestampType.CREATE_TIME,
+      timestampDiffMaxMs = 5000L,
+      partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH).validatedRecords
     checkOffsets(messageWithOffset, offset)
   }
 
@@ -399,9 +499,11 @@ class LogValidatorTest extends JUnitSuite {
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
-      messageFormatVersion = RecordBatch.MAGIC_VALUE_V2,
-      messageTimestampType = TimestampType.CREATE_TIME,
-      messageTimestampDiffMaxMs = 5000L).validatedRecords
+      compactedTopic = false,
+      magic = RecordBatch.MAGIC_VALUE_V2,
+      timestampType = TimestampType.CREATE_TIME,
+      timestampDiffMaxMs = 5000L,
+      partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH).validatedRecords
     checkOffsets(messageWithOffset, offset)
   }
 
@@ -417,9 +519,11 @@ class LogValidatorTest extends JUnitSuite {
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
-      messageFormatVersion = RecordBatch.MAGIC_VALUE_V1,
-      messageTimestampType = TimestampType.CREATE_TIME,
-      messageTimestampDiffMaxMs = 5000L).validatedRecords
+      compactedTopic = false,
+      magic = RecordBatch.MAGIC_VALUE_V1,
+      timestampType = TimestampType.CREATE_TIME,
+      timestampDiffMaxMs = 5000L,
+      partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH).validatedRecords
     checkOffsets(compressedMessagesWithOffset, offset)
   }
 
@@ -435,9 +539,11 @@ class LogValidatorTest extends JUnitSuite {
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
-      messageFormatVersion = RecordBatch.MAGIC_VALUE_V2,
-      messageTimestampType = TimestampType.CREATE_TIME,
-      messageTimestampDiffMaxMs = 5000L).validatedRecords
+      compactedTopic = false,
+      magic = RecordBatch.MAGIC_VALUE_V2,
+      timestampType = TimestampType.CREATE_TIME,
+      timestampDiffMaxMs = 5000L,
+      partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH).validatedRecords
     checkOffsets(compressedMessagesWithOffset, offset)
   }
 
@@ -451,9 +557,11 @@ class LogValidatorTest extends JUnitSuite {
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
-      messageFormatVersion = RecordBatch.MAGIC_VALUE_V1,
-      messageTimestampType = TimestampType.LOG_APPEND_TIME,
-      messageTimestampDiffMaxMs = 1000L).validatedRecords, offset)
+      compactedTopic = false,
+      magic = RecordBatch.MAGIC_VALUE_V1,
+      timestampType = TimestampType.LOG_APPEND_TIME,
+      timestampDiffMaxMs = 1000L,
+      partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH).validatedRecords, offset)
   }
 
   @Test
@@ -466,9 +574,11 @@ class LogValidatorTest extends JUnitSuite {
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
-      messageFormatVersion = RecordBatch.MAGIC_VALUE_V2,
-      messageTimestampType = TimestampType.LOG_APPEND_TIME,
-      messageTimestampDiffMaxMs = 1000L).validatedRecords, offset)
+      compactedTopic = false,
+      magic = RecordBatch.MAGIC_VALUE_V2,
+      timestampType = TimestampType.LOG_APPEND_TIME,
+      timestampDiffMaxMs = 1000L,
+      partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH).validatedRecords, offset)
   }
 
   @Test
@@ -481,9 +591,11 @@ class LogValidatorTest extends JUnitSuite {
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
-      messageFormatVersion = RecordBatch.MAGIC_VALUE_V1,
-      messageTimestampType = TimestampType.LOG_APPEND_TIME,
-      messageTimestampDiffMaxMs = 1000L).validatedRecords, offset)
+      compactedTopic = false,
+      magic = RecordBatch.MAGIC_VALUE_V1,
+      timestampType = TimestampType.LOG_APPEND_TIME,
+      timestampDiffMaxMs = 1000L,
+      partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH).validatedRecords, offset)
   }
 
   @Test
@@ -496,9 +608,11 @@ class LogValidatorTest extends JUnitSuite {
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
-      messageFormatVersion = RecordBatch.MAGIC_VALUE_V2,
-      messageTimestampType = TimestampType.LOG_APPEND_TIME,
-      messageTimestampDiffMaxMs = 1000L).validatedRecords, offset)
+      compactedTopic = false,
+      magic = RecordBatch.MAGIC_VALUE_V2,
+      timestampType = TimestampType.LOG_APPEND_TIME,
+      timestampDiffMaxMs = 1000L,
+      partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH).validatedRecords, offset)
   }
 
   @Test
@@ -512,9 +626,11 @@ class LogValidatorTest extends JUnitSuite {
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
-      messageFormatVersion = RecordBatch.MAGIC_VALUE_V0,
-      messageTimestampType = TimestampType.CREATE_TIME,
-      messageTimestampDiffMaxMs = 5000L).validatedRecords, offset)
+      compactedTopic = false,
+      magic = RecordBatch.MAGIC_VALUE_V0,
+      timestampType = TimestampType.CREATE_TIME,
+      timestampDiffMaxMs = 5000L,
+      partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH).validatedRecords, offset)
   }
 
   @Test
@@ -528,9 +644,11 @@ class LogValidatorTest extends JUnitSuite {
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
-      messageFormatVersion = RecordBatch.MAGIC_VALUE_V0,
-      messageTimestampType = TimestampType.CREATE_TIME,
-      messageTimestampDiffMaxMs = 5000L).validatedRecords, offset)
+      compactedTopic = false,
+      magic = RecordBatch.MAGIC_VALUE_V0,
+      timestampType = TimestampType.CREATE_TIME,
+      timestampDiffMaxMs = 5000L,
+      partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH).validatedRecords, offset)
   }
 
   @Test
@@ -543,9 +661,11 @@ class LogValidatorTest extends JUnitSuite {
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
-      messageFormatVersion = RecordBatch.MAGIC_VALUE_V2,
-      messageTimestampType = TimestampType.LOG_APPEND_TIME,
-      messageTimestampDiffMaxMs = 1000L).validatedRecords, offset)
+      compactedTopic = false,
+      magic = RecordBatch.MAGIC_VALUE_V2,
+      timestampType = TimestampType.LOG_APPEND_TIME,
+      timestampDiffMaxMs = 1000L,
+      partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH).validatedRecords, offset)
   }
 
   @Test
@@ -558,9 +678,11 @@ class LogValidatorTest extends JUnitSuite {
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
-      messageFormatVersion = RecordBatch.MAGIC_VALUE_V2,
-      messageTimestampType = TimestampType.LOG_APPEND_TIME,
-      messageTimestampDiffMaxMs = 1000L).validatedRecords, offset)
+      compactedTopic = false,
+      magic = RecordBatch.MAGIC_VALUE_V2,
+      timestampType = TimestampType.LOG_APPEND_TIME,
+      timestampDiffMaxMs = 1000L,
+      partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH).validatedRecords, offset)
   }
 
   @Test
@@ -574,9 +696,11 @@ class LogValidatorTest extends JUnitSuite {
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
-      messageFormatVersion = RecordBatch.MAGIC_VALUE_V1,
-      messageTimestampType = TimestampType.CREATE_TIME,
-      messageTimestampDiffMaxMs = 5000L).validatedRecords, offset)
+      compactedTopic = false,
+      magic = RecordBatch.MAGIC_VALUE_V1,
+      timestampType = TimestampType.CREATE_TIME,
+      timestampDiffMaxMs = 5000L,
+      partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH).validatedRecords, offset)
   }
 
   @Test
@@ -590,9 +714,11 @@ class LogValidatorTest extends JUnitSuite {
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
-      messageFormatVersion = RecordBatch.MAGIC_VALUE_V1,
-      messageTimestampType = TimestampType.CREATE_TIME,
-      messageTimestampDiffMaxMs = 5000L).validatedRecords, offset)
+      compactedTopic = false,
+      magic = RecordBatch.MAGIC_VALUE_V1,
+      timestampType = TimestampType.CREATE_TIME,
+      timestampDiffMaxMs = 5000L,
+      partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH).validatedRecords, offset)
   }
 
   @Test
@@ -606,9 +732,11 @@ class LogValidatorTest extends JUnitSuite {
       now = System.currentTimeMillis(),
       sourceCodec = NoCompressionCodec,
       targetCodec = NoCompressionCodec,
-      messageFormatVersion = RecordBatch.MAGIC_VALUE_V0,
-      messageTimestampType = TimestampType.CREATE_TIME,
-      messageTimestampDiffMaxMs = 5000L).validatedRecords, offset)
+      compactedTopic = false,
+      magic = RecordBatch.MAGIC_VALUE_V0,
+      timestampType = TimestampType.CREATE_TIME,
+      timestampDiffMaxMs = 5000L,
+      partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH).validatedRecords, offset)
   }
 
   @Test
@@ -622,9 +750,11 @@ class LogValidatorTest extends JUnitSuite {
       now = System.currentTimeMillis(),
       sourceCodec = DefaultCompressionCodec,
       targetCodec = DefaultCompressionCodec,
-      messageFormatVersion = RecordBatch.MAGIC_VALUE_V0,
-      messageTimestampType = TimestampType.CREATE_TIME,
-      messageTimestampDiffMaxMs = 5000L).validatedRecords, offset)
+      compactedTopic = false,
+      magic = RecordBatch.MAGIC_VALUE_V0,
+      timestampType = TimestampType.CREATE_TIME,
+      timestampDiffMaxMs = 5000L,
+      partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH).validatedRecords, offset)
   }
 
   @Test(expected = classOf[InvalidRecordException])
@@ -636,13 +766,15 @@ class LogValidatorTest extends JUnitSuite {
       now = System.currentTimeMillis(),
       sourceCodec = SnappyCompressionCodec,
       targetCodec = SnappyCompressionCodec,
-      messageFormatVersion = RecordBatch.MAGIC_VALUE_V1,
-      messageTimestampType = TimestampType.CREATE_TIME,
-      messageTimestampDiffMaxMs = 5000L)
+      compactedTopic = false,
+      magic = RecordBatch.MAGIC_VALUE_V1,
+      timestampType = TimestampType.CREATE_TIME,
+      timestampDiffMaxMs = 5000L,
+      partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH)
   }
 
-  private def createRecords(magicValue: Byte = Message.CurrentMagicValue,
-                            timestamp: Long = Message.NoTimestamp,
+  private def createRecords(magicValue: Byte = RecordBatch.CURRENT_MAGIC_VALUE,
+                            timestamp: Long = RecordBatch.NO_TIMESTAMP,
                             codec: CompressionType = CompressionType.NONE): MemoryRecords
= {
     val buf = ByteBuffer.allocate(512)
     val builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME,
0L)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf64f06/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
index a09e1cc..4edfbaf 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
@@ -18,7 +18,6 @@
 package kafka.server.epoch
 
 import java.io.{File, RandomAccessFile}
-import java.util
 import java.util.Properties
 
 import kafka.admin.AdminUtils
@@ -407,4 +406,4 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
 
     override def close(): Unit = {}
   }
-}
\ No newline at end of file
+}


Mime
View raw message