kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [1/2] kafka git commit: KAFKA-5663; Pass non-null logDirFailureChannel to Log.apply
Date Wed, 02 Aug 2017 22:54:08 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 125d69cae -> fb21209b5


http://git-wip-us.apache.org/repos/asf/kafka/blob/fb21209b/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 13ce19d..ad64e39 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -23,12 +23,11 @@ import java.nio.file.Files
 import java.util.Properties
 
 import org.apache.kafka.common.errors._
-import kafka.api.ApiVersion
 import kafka.common.KafkaException
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 import kafka.utils._
-import kafka.server.{BrokerTopicStats, KafkaConfig}
+import kafka.server.{BrokerTopicStats, KafkaConfig, LogDirFailureChannel}
 import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter
@@ -36,7 +35,7 @@ import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
 import org.apache.kafka.common.requests.IsolationLevel
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.utils.{Time, Utils}
 import org.easymock.EasyMock
 
 import scala.collection.JavaConverters._
@@ -46,9 +45,8 @@ class LogTest {
 
   val tmpDir = TestUtils.tempDir()
   val logDir = TestUtils.randomPartitionLogDir(tmpDir)
-  val time = new MockTime()
+  val mockTime = new MockTime()
   var config: KafkaConfig = null
-  val logConfig = LogConfig()
   val brokerTopicStats = new BrokerTopicStats
 
   @Before
@@ -94,23 +92,13 @@ class LogTest {
   @Test
   def testTimeBasedLogRoll() {
     def createRecords = TestUtils.singletonRecords("test".getBytes)
-
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentMsProp, 1 * 60 * 60L: java.lang.Long)
-    logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString)
+    val logConfig = createLogConfig(segmentMs = 1 * 60 * 60L)
 
     // create a log
-    val log = Log(logDir,
-      LogConfig(logProps),
-      logStartOffset = 0L,
-      recoveryPoint = 0L,
-      maxProducerIdExpirationMs = 24 * 60,
-      scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats,
-      time = time)
+    val log = createLog(logDir, logConfig, maxProducerIdExpirationMs = 24 * 60)
     assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments)
     // Test the segment rolling behavior when messages do not have a timestamp.
-    time.sleep(log.config.segmentMs + 1)
+    mockTime.sleep(log.config.segmentMs + 1)
     log.appendAsLeader(createRecords, leaderEpoch = 0)
     assertEquals("Log doesn't roll if doing so creates an empty segment.", 1, log.numberOfSegments)
 
@@ -118,56 +106,48 @@ class LogTest {
     assertEquals("Log rolls on this append since time has expired.", 2, log.numberOfSegments)
 
     for (numSegments <- 3 until 5) {
-      time.sleep(log.config.segmentMs + 1)
+      mockTime.sleep(log.config.segmentMs + 1)
       log.appendAsLeader(createRecords, leaderEpoch = 0)
       assertEquals("Changing time beyond rollMs and appending should create a new segment.", numSegments, log.numberOfSegments)
     }
 
     // Append a message with timestamp to a segment whose first message do not have a timestamp.
-    val timestamp = time.milliseconds + log.config.segmentMs + 1
+    val timestamp = mockTime.milliseconds + log.config.segmentMs + 1
     def createRecordsWithTimestamp = TestUtils.singletonRecords(value = "test".getBytes, timestamp = timestamp)
     log.appendAsLeader(createRecordsWithTimestamp, leaderEpoch = 0)
     assertEquals("Segment should not have been rolled out because the log rolling should be based on wall clock.", 4, log.numberOfSegments)
 
     // Test the segment rolling behavior when messages have timestamps.
-    time.sleep(log.config.segmentMs + 1)
+    mockTime.sleep(log.config.segmentMs + 1)
     log.appendAsLeader(createRecordsWithTimestamp, leaderEpoch = 0)
     assertEquals("A new segment should have been rolled out", 5, log.numberOfSegments)
 
     // move the wall clock beyond log rolling time
-    time.sleep(log.config.segmentMs + 1)
+    mockTime.sleep(log.config.segmentMs + 1)
     log.appendAsLeader(createRecordsWithTimestamp, leaderEpoch = 0)
     assertEquals("Log should not roll because the roll should depend on timestamp of the first message.", 5, log.numberOfSegments)
 
-    val recordWithExpiredTimestamp = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
+    val recordWithExpiredTimestamp = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
     log.appendAsLeader(recordWithExpiredTimestamp, leaderEpoch = 0)
     assertEquals("Log should roll because the timestamp in the message should make the log segment expire.", 6, log.numberOfSegments)
 
     val numSegments = log.numberOfSegments
-    time.sleep(log.config.segmentMs + 1)
+    mockTime.sleep(log.config.segmentMs + 1)
     log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE), leaderEpoch = 0)
     assertEquals("Appending an empty message set should not roll log even if sufficient time has passed.", numSegments, log.numberOfSegments)
   }
 
   @Test(expected = classOf[OutOfOrderSequenceException])
   def testNonSequentialAppend(): Unit = {
-    val logProps = new Properties()
-
     // create a log
-    val log = Log(logDir,
-      LogConfig(logProps),
-      recoveryPoint = 0L,
-      scheduler = time.scheduler,
-      time = time,
-      brokerTopicStats = new BrokerTopicStats)
-
+    val log = createLog(logDir, LogConfig())
     val pid = 1L
     val epoch: Short = 0
 
-    val records = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), producerId = pid, producerEpoch = epoch, sequence = 0)
+    val records = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)), producerId = pid, producerEpoch = epoch, sequence = 0)
     log.appendAsLeader(records, leaderEpoch = 0)
 
-    val nextRecords = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), producerId = pid, producerEpoch = epoch, sequence = 2)
+    val nextRecords = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)), producerId = pid, producerEpoch = epoch, sequence = 2)
     log.appendAsLeader(nextRecords, leaderEpoch = 0)
   }
 
@@ -175,12 +155,12 @@ class LogTest {
   def testInitializationOfProducerSnapshotsUpgradePath(): Unit = {
     // simulate the upgrade path by creating a new log with several segments, deleting the
     // snapshot files, and then reloading the log
-
-    val log = createLog(64, messagesPerSegment = 10)
+    val logConfig = createLogConfig(segmentBytes = 64 * 10)
+    val log = createLog(logDir, logConfig)
     assertEquals(None, log.oldestProducerSnapshotOffset)
 
     for (i <- 0 to 100) {
-      val record = new SimpleRecord(time.milliseconds, i.toString.getBytes)
+      val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes)
       log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0)
     }
 
@@ -191,7 +171,7 @@ class LogTest {
       Files.delete(file.toPath)
     }
 
-    val reloadedLog = createLog(64, messagesPerSegment = 10)
+    val reloadedLog = createLog(logDir, logConfig)
     val expectedSnapshotsOffsets = log.logSegments.toSeq.reverse.take(2).map(_.baseOffset) ++ Seq(reloadedLog.logEndOffset)
     expectedSnapshotsOffsets.foreach { offset =>
       assertTrue(Log.producerSnapshotFile(logDir, offset).exists)
@@ -213,8 +193,9 @@ class LogTest {
 
   @Test
   def testPidMapOffsetUpdatedForNonIdempotentData() {
-    val log = createLog(2048)
-    val records = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)))
+    val logConfig = createLogConfig(segmentBytes = 2048 * 5)
+    val log = createLog(logDir, logConfig)
+    val records = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)))
     log.appendAsLeader(records, leaderEpoch = 0)
     log.takeProducerSnapshot()
     assertEquals(Some(1), log.latestProducerSnapshotOffset)
@@ -248,9 +229,9 @@ class LogTest {
       config,
       logStartOffset = 0L,
       recoveryPoint = 0L,
-      scheduler = time.scheduler,
+      scheduler = mockTime.scheduler,
       brokerTopicStats = brokerTopicStats,
-      time = time,
+      time = mockTime,
       maxProducerIdExpirationMs = 300000,
       producerIdExpirationCheckIntervalMs = 30000,
       topicPartition = Log.parseTopicPartitionName(logDir),
@@ -317,9 +298,9 @@ class LogTest {
       config,
       logStartOffset = 0L,
       recoveryPoint = 0L,
-      scheduler = time.scheduler,
+      scheduler = mockTime.scheduler,
       brokerTopicStats = brokerTopicStats,
-      time = time,
+      time = mockTime,
       maxProducerIdExpirationMs = 300000,
       producerIdExpirationCheckIntervalMs = 30000,
       topicPartition = Log.parseTopicPartitionName(logDir),
@@ -352,9 +333,9 @@ class LogTest {
       config,
       logStartOffset = 0L,
       recoveryPoint = 0L,
-      scheduler = time.scheduler,
+      scheduler = mockTime.scheduler,
       brokerTopicStats = brokerTopicStats,
-      time = time,
+      time = mockTime,
       maxProducerIdExpirationMs = 300000,
       producerIdExpirationCheckIntervalMs = 30000,
       topicPartition = Log.parseTopicPartitionName(logDir),
@@ -388,9 +369,9 @@ class LogTest {
       config,
       logStartOffset = 0L,
       recoveryPoint = 0L,
-      scheduler = time.scheduler,
+      scheduler = mockTime.scheduler,
       brokerTopicStats = brokerTopicStats,
-      time = time,
+      time = mockTime,
       maxProducerIdExpirationMs = 300000,
       producerIdExpirationCheckIntervalMs = 30000,
       topicPartition = Log.parseTopicPartitionName(logDir),
@@ -403,7 +384,8 @@ class LogTest {
 
   @Test
   def testRebuildPidMapWithCompactedData() {
-    val log = createLog(2048)
+    val logConfig = createLogConfig(segmentBytes = 2048 * 5)
+    val log = createLog(logDir, logConfig)
     val pid = 1L
     val epoch = 0.toShort
     val seq = 0
@@ -448,7 +430,8 @@ class LogTest {
 
   @Test
   def testRebuildProducerStateWithEmptyCompactedBatch() {
-    val log = createLog(2048)
+    val logConfig = createLogConfig(segmentBytes = 2048 * 5)
+    val log = createLog(logDir, logConfig)
     val pid = 1L
     val epoch = 0.toShort
     val seq = 0
@@ -491,7 +474,8 @@ class LogTest {
 
   @Test
   def testUpdatePidMapWithCompactedData() {
-    val log = createLog(2048)
+    val logConfig = createLogConfig(segmentBytes = 2048 * 5)
+    val log = createLog(logDir, logConfig)
     val pid = 1L
     val epoch = 0.toShort
     val seq = 0
@@ -526,7 +510,8 @@ class LogTest {
 
   @Test
   def testPidMapTruncateTo() {
-    val log = createLog(2048)
+    val logConfig = createLogConfig(segmentBytes = 2048 * 5)
+    val log = createLog(logDir, logConfig)
     log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes))), leaderEpoch = 0)
     log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes))), leaderEpoch = 0)
     log.takeProducerSnapshot()
@@ -546,8 +531,8 @@ class LogTest {
   @Test
   def testPidMapTruncateToWithNoSnapshots() {
     // This ensures that the upgrade optimization path cannot be hit after initial loading
-
-    val log = createLog(2048)
+    val logConfig = createLogConfig(segmentBytes = 2048 * 5)
+    val log = createLog(logDir, logConfig)
     val pid = 1L
     val epoch = 0.toShort
 
@@ -573,14 +558,15 @@ class LogTest {
 
   @Test
   def testLoadProducersAfterDeleteRecordsMidSegment(): Unit = {
-    val log = createLog(2048)
+    val logConfig = createLogConfig(segmentBytes = 2048 * 5)
+    val log = createLog(logDir, logConfig)
     val pid1 = 1L
     val pid2 = 2L
     val epoch = 0.toShort
 
-    log.appendAsLeader(TestUtils.records(List(new SimpleRecord(time.milliseconds(), "a".getBytes)), producerId = pid1,
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord(mockTime.milliseconds(), "a".getBytes)), producerId = pid1,
       producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
-    log.appendAsLeader(TestUtils.records(List(new SimpleRecord(time.milliseconds(), "b".getBytes)), producerId = pid2,
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord(mockTime.milliseconds(), "b".getBytes)), producerId = pid2,
       producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
     assertEquals(2, log.activeProducers.size)
 
@@ -593,7 +579,7 @@ class LogTest {
 
     log.close()
 
-    val reloadedLog = createLog(2048, logStartOffset = 1L)
+    val reloadedLog = createLog(logDir, logConfig, logStartOffset = 1L)
     assertEquals(1, reloadedLog.activeProducers.size)
     val reloadedEntryOpt = log.activeProducers.get(pid2)
     assertEquals(retainedEntryOpt, reloadedEntryOpt)
@@ -601,15 +587,16 @@ class LogTest {
 
   @Test
   def testLoadProducersAfterDeleteRecordsOnSegment(): Unit = {
-    val log = createLog(2048)
+    val logConfig = createLogConfig(segmentBytes = 2048 * 5)
+    val log = createLog(logDir, logConfig)
     val pid1 = 1L
     val pid2 = 2L
     val epoch = 0.toShort
 
-    log.appendAsLeader(TestUtils.records(List(new SimpleRecord(time.milliseconds(), "a".getBytes)), producerId = pid1,
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord(mockTime.milliseconds(), "a".getBytes)), producerId = pid1,
       producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
     log.roll()
-    log.appendAsLeader(TestUtils.records(List(new SimpleRecord(time.milliseconds(), "b".getBytes)), producerId = pid2,
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord(mockTime.milliseconds(), "b".getBytes)), producerId = pid2,
       producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
 
     assertEquals(2, log.logSegments.size)
@@ -627,7 +614,7 @@ class LogTest {
 
     log.close()
 
-    val reloadedLog = createLog(2048, logStartOffset = 1L)
+    val reloadedLog = createLog(logDir, logConfig, logStartOffset = 1L)
     assertEquals(1, reloadedLog.activeProducers.size)
     val reloadedEntryOpt = log.activeProducers.get(pid2)
     assertEquals(retainedEntryOpt, reloadedEntryOpt)
@@ -636,7 +623,8 @@ class LogTest {
   @Test
   def testPidMapTruncateFullyAndStartAt() {
     val records = TestUtils.singletonRecords("foo".getBytes)
-    val log = createLog(records.sizeInBytes, messagesPerSegment = 1, retentionBytes = records.sizeInBytes * 2)
+    val logConfig = createLogConfig(segmentBytes = records.sizeInBytes, retentionBytes = records.sizeInBytes * 2)
+    val log = createLog(logDir, logConfig)
     log.appendAsLeader(records, leaderEpoch = 0)
     log.takeProducerSnapshot()
 
@@ -658,7 +646,8 @@ class LogTest {
   def testPidExpirationOnSegmentDeletion() {
     val pid1 = 1L
     val records = TestUtils.records(Seq(new SimpleRecord("foo".getBytes)), producerId = pid1, producerEpoch = 0, sequence = 0)
-    val log = createLog(records.sizeInBytes, messagesPerSegment = 1, retentionBytes = records.sizeInBytes * 2)
+    val logConfig = createLogConfig(segmentBytes = records.sizeInBytes, retentionBytes = records.sizeInBytes * 2)
+    val log = createLog(logDir, logConfig)
     log.appendAsLeader(records, leaderEpoch = 0)
     log.takeProducerSnapshot()
 
@@ -681,7 +670,8 @@ class LogTest {
 
   @Test
   def testTakeSnapshotOnRollAndDeleteSnapshotOnFlush() {
-    val log = createLog(2048)
+    val logConfig = createLogConfig(segmentBytes = 2048 * 5)
+    val log = createLog(logDir, logConfig)
     log.appendAsLeader(TestUtils.singletonRecords("a".getBytes), leaderEpoch = 0)
     log.roll(1L)
     assertEquals(Some(1L), log.latestProducerSnapshotOffset)
@@ -710,7 +700,8 @@ class LogTest {
 
   @Test
   def testRebuildTransactionalState(): Unit = {
-    val log = createLog(1024 * 1024)
+    val logConfig = createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    val log = createLog(logDir, logConfig)
 
     val pid = 137L
     val epoch = 5.toShort
@@ -731,7 +722,7 @@ class LogTest {
 
     log.close()
 
-    val reopenedLog = createLog(1024 * 1024)
+    val reopenedLog = createLog(logDir, logConfig)
     reopenedLog.onHighWatermarkIncremented(commitAppendInfo.lastOffset + 1)
     assertEquals(None, reopenedLog.firstUnstableOffset)
   }
@@ -743,57 +734,50 @@ class LogTest {
                             coordinatorEpoch: Int = 0,
                             partitionLeaderEpoch: Int = 0): MemoryRecords = {
     val marker = new EndTransactionMarker(controlRecordType, coordinatorEpoch)
-    MemoryRecords.withEndTransactionMarker(offset, time.milliseconds(), partitionLeaderEpoch, producerId, epoch, marker)
+    MemoryRecords.withEndTransactionMarker(offset, mockTime.milliseconds(), partitionLeaderEpoch, producerId, epoch, marker)
   }
 
   @Test
   def testPeriodicPidExpiration() {
-    val maxPidExpirationMs = 200
-    val expirationCheckInterval = 100
+    val maxProducerIdExpirationMs = 200
+    val producerIdExpirationCheckIntervalMs = 100
 
     val pid = 23L
-    val log = createLog(2048, maxPidExpirationMs = maxPidExpirationMs,
-      pidExpirationCheckIntervalMs = expirationCheckInterval)
-    val records = Seq(new SimpleRecord(time.milliseconds(), "foo".getBytes))
+    val logConfig = createLogConfig(segmentBytes = 2048 * 5)
+    val log = createLog(logDir, logConfig, maxProducerIdExpirationMs = maxProducerIdExpirationMs,
+      producerIdExpirationCheckIntervalMs = producerIdExpirationCheckIntervalMs)
+    val records = Seq(new SimpleRecord(mockTime.milliseconds(), "foo".getBytes))
     log.appendAsLeader(TestUtils.records(records, producerId = pid, producerEpoch = 0, sequence = 0), leaderEpoch = 0)
 
     assertEquals(Set(pid), log.activeProducers.keySet)
 
-    time.sleep(expirationCheckInterval)
+    mockTime.sleep(producerIdExpirationCheckIntervalMs)
     assertEquals(Set(pid), log.activeProducers.keySet)
 
-    time.sleep(expirationCheckInterval)
+    mockTime.sleep(producerIdExpirationCheckIntervalMs)
     assertEquals(Set(), log.activeProducers.keySet)
   }
 
   @Test
   def testDuplicateAppends(): Unit = {
-    val logProps = new Properties()
-
     // create a log
-    val log = Log(logDir,
-      LogConfig(logProps),
-      recoveryPoint = 0L,
-      scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats,
-      time = time)
-
+    val log = createLog(logDir, LogConfig())
     val pid = 1L
     val epoch: Short = 0
 
     var seq = 0
     // Pad the beginning of the log.
     for (_ <- 0 to 5) {
-      val record = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)),
+      val record = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)),
         producerId = pid, producerEpoch = epoch, sequence = seq)
       log.appendAsLeader(record, leaderEpoch = 0)
       seq = seq + 1
     }
     // Append an entry with multiple log records.
     def createRecords = TestUtils.records(List(
-      new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes),
-      new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes),
-      new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes)
+      new SimpleRecord(mockTime.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes),
+      new SimpleRecord(mockTime.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes),
+      new SimpleRecord(mockTime.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes)
     ), producerId = pid, producerEpoch = epoch, sequence = seq)
     val multiEntryAppendInfo = log.appendAsLeader(createRecords, leaderEpoch = 0)
     assertEquals("should have appended 3 entries", multiEntryAppendInfo.lastOffset - multiEntryAppendInfo.firstOffset + 1, 3)
@@ -811,8 +795,8 @@ class LogTest {
     try {
       val records = TestUtils.records(
         List(
-          new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes),
-          new SimpleRecord(time.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes)),
+          new SimpleRecord(mockTime.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes),
+          new SimpleRecord(mockTime.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes)),
         producerId = pid, producerEpoch = epoch, sequence = seq - 2)
       log.appendAsLeader(records, leaderEpoch = 0)
       fail ("Should have received an OutOfOrderSequenceException since we attempted to append a duplicate of a records " +
@@ -824,7 +808,7 @@ class LogTest {
     // Append a Duplicate of an entry in the middle of the log. This is not allowed.
      try {
       val records = TestUtils.records(
-        List(new SimpleRecord(time.milliseconds, s"key-1".getBytes, s"value-1".getBytes)),
+        List(new SimpleRecord(mockTime.milliseconds, s"key-1".getBytes, s"value-1".getBytes)),
         producerId = pid, producerEpoch = epoch, sequence = 1)
       log.appendAsLeader(records, leaderEpoch = 0)
       fail ("Should have received an OutOfOrderSequenceException since we attempted to append a duplicate of a records " +
@@ -834,7 +818,7 @@ class LogTest {
     }
 
     // Append a duplicate entry with a single records at the tail of the log. This should return the appendInfo of the original entry.
-    def createRecordsWithDuplicate = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)),
+    def createRecordsWithDuplicate = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)),
       producerId = pid, producerEpoch = epoch, sequence = seq)
     val origAppendInfo = log.appendAsLeader(createRecordsWithDuplicate, leaderEpoch = 0)
     val newAppendInfo = log.appendAsLeader(createRecordsWithDuplicate, leaderEpoch = 0)
@@ -844,36 +828,29 @@ class LogTest {
 
   @Test
   def testMultiplePidsPerMemoryRecord() : Unit = {
-    val logProps = new Properties()
-
     // create a log
-    val log = Log(logDir,
-      LogConfig(logProps),
-      recoveryPoint = 0L,
-      scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats,
-      time = time)
+    val log = createLog(logDir, LogConfig())
 
     val epoch: Short = 0
     val buffer = ByteBuffer.allocate(512)
 
     var builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE,
-      TimestampType.LOG_APPEND_TIME, 0L, time.milliseconds(), 1L, epoch, 0, false, 0)
+      TimestampType.LOG_APPEND_TIME, 0L, mockTime.milliseconds(), 1L, epoch, 0, false, 0)
     builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
     builder.close()
 
     builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE,
-      TimestampType.LOG_APPEND_TIME, 1L, time.milliseconds(), 2L, epoch, 0, false, 0)
+      TimestampType.LOG_APPEND_TIME, 1L, mockTime.milliseconds(), 2L, epoch, 0, false, 0)
     builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
     builder.close()
 
     builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE,
-      TimestampType.LOG_APPEND_TIME, 2L, time.milliseconds(), 3L, epoch, 0, false, 0)
+      TimestampType.LOG_APPEND_TIME, 2L, mockTime.milliseconds(), 3L, epoch, 0, false, 0)
     builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
     builder.close()
 
     builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE,
-      TimestampType.LOG_APPEND_TIME, 3L, time.milliseconds(), 4L, epoch, 0, false, 0)
+      TimestampType.LOG_APPEND_TIME, 3L, mockTime.milliseconds(), 4L, epoch, 0, false, 0)
     builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
     builder.close()
 
@@ -897,7 +874,8 @@ class LogTest {
 
   @Test(expected = classOf[DuplicateSequenceNumberException])
   def testDuplicateAppendToFollower() : Unit = {
-    val log = createLog(1024*1024)
+    val logConfig = createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    val log = createLog(logDir, logConfig)
     val epoch: Short = 0
     val pid = 1L
     val baseSequence = 0
@@ -912,7 +890,8 @@ class LogTest {
 
   @Test(expected = classOf[DuplicateSequenceNumberException])
   def testMultipleProducersWithDuplicatesInSingleAppend() : Unit = {
-    val log = createLog(1024*1024)
+    val logConfig = createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    val log = createLog(logDir, logConfig)
 
     val pid1 = 1L
     val pid2 = 2L
@@ -922,31 +901,31 @@ class LogTest {
 
     // pid1 seq = 0
     var builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
-      TimestampType.LOG_APPEND_TIME, 0L, time.milliseconds(), pid1, epoch, 0)
+      TimestampType.LOG_APPEND_TIME, 0L, mockTime.milliseconds(), pid1, epoch, 0)
     builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
     builder.close()
 
     // pid2 seq = 0
     builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
-      TimestampType.LOG_APPEND_TIME, 1L, time.milliseconds(), pid2, epoch, 0)
+      TimestampType.LOG_APPEND_TIME, 1L, mockTime.milliseconds(), pid2, epoch, 0)
     builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
     builder.close()
 
     // pid1 seq = 1
     builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
-      TimestampType.LOG_APPEND_TIME, 2L, time.milliseconds(), pid1, epoch, 1)
+      TimestampType.LOG_APPEND_TIME, 2L, mockTime.milliseconds(), pid1, epoch, 1)
     builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
     builder.close()
 
     // pid2 seq = 1
     builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
-      TimestampType.LOG_APPEND_TIME, 3L, time.milliseconds(), pid2, epoch, 1)
+      TimestampType.LOG_APPEND_TIME, 3L, mockTime.milliseconds(), pid2, epoch, 1)
     builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
     builder.close()
 
     // // pid1 seq = 1 (duplicate)
     builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
-      TimestampType.LOG_APPEND_TIME, 4L, time.milliseconds(), pid1, epoch, 1)
+      TimestampType.LOG_APPEND_TIME, 4L, mockTime.milliseconds(), pid1, epoch, 1)
     builder.append(new SimpleRecord("key".getBytes, "value".getBytes))
     builder.close()
 
@@ -961,24 +940,16 @@ class LogTest {
 
   @Test(expected = classOf[ProducerFencedException])
   def testOldProducerEpoch(): Unit = {
-    val logProps = new Properties()
-
     // create a log
-    val log = Log(logDir,
-      LogConfig(logProps),
-      recoveryPoint = 0L,
-      scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats,
-      time = time)
-
+    val log = createLog(logDir, LogConfig())
     val pid = 1L
     val newEpoch: Short = 1
     val oldEpoch: Short = 0
 
-    val records = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), producerId = pid, producerEpoch = newEpoch, sequence = 0)
+    val records = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)), producerId = pid, producerEpoch = newEpoch, sequence = 0)
     log.appendAsLeader(records, leaderEpoch = 0)
 
-    val nextRecords = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes, "value".getBytes)), producerId = pid, producerEpoch = oldEpoch, sequence = 0)
+    val nextRecords = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)), producerId = pid, producerEpoch = oldEpoch, sequence = 0)
     log.appendAsLeader(nextRecords, leaderEpoch = 0)
   }
 
@@ -988,29 +959,20 @@ class LogTest {
    */
   @Test
   def testTimeBasedLogRollJitter() {
-    var set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
+    var set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
     val maxJitter = 20 * 60L
-
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentMsProp, 1 * 60 * 60L: java.lang.Long)
-    logProps.put(LogConfig.SegmentJitterMsProp, maxJitter: java.lang.Long)
     // create a log
-    val log = Log(logDir,
-      LogConfig(logProps),
-      logStartOffset = 0L,
-      recoveryPoint = 0L,
-      scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats,
-      time = time)
+    val logConfig = createLogConfig(segmentMs = 1 * 60 * 60L, segmentJitterMs = maxJitter)
+    val log = createLog(logDir, logConfig)
     assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments)
     log.appendAsLeader(set, leaderEpoch = 0)
 
-    time.sleep(log.config.segmentMs - maxJitter)
-    set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
+    mockTime.sleep(log.config.segmentMs - maxJitter)
+    set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
     log.appendAsLeader(set, leaderEpoch = 0)
     assertEquals("Log does not roll on this append because it occurs earlier than max jitter", 1, log.numberOfSegments)
-    time.sleep(maxJitter - log.activeSegment.rollJitterMs + 1)
-    set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
+    mockTime.sleep(maxJitter - log.activeSegment.rollJitterMs + 1)
+    set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
     log.appendAsLeader(set, leaderEpoch = 0)
     assertEquals("Log should roll after segmentMs adjusted by random jitter", 2, log.numberOfSegments)
   }
@@ -1020,18 +982,13 @@ class LogTest {
    */
   @Test
   def testSizeBasedLogRoll() {
-    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
+    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
     val setSize = createRecords.sizeInBytes
     val msgPerSeg = 10
     val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
-
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
-    // We use need to use magic value 1 here because the test is message size sensitive.
-    logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
     // create a log
-    val log = Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats, time = time)
+    val logConfig = createLogConfig(segmentBytes = segmentSize)
+    val log = createLog(logDir, logConfig)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     // segments expire in size
@@ -1046,9 +1003,8 @@ class LogTest {
   @Test
   def testLoadEmptyLog() {
     createEmptyLogs(logDir, 0)
-    val log = Log(logDir, logConfig, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats, time = time)
-    log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds), leaderEpoch = 0)
+    val log = createLog(logDir, LogConfig())
+    log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds), leaderEpoch = 0)
   }
 
   /**
@@ -1056,12 +1012,8 @@ class LogTest {
    */
   @Test
   def testAppendAndReadWithSequentialOffsets() {
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer)
-    // We use need to use magic value 1 here because the test is message size sensitive.
-    logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
-    val log = Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats, time = time)
+    val logConfig = createLogConfig(segmentBytes = 71)
+    val log = createLog(logDir, logConfig)
     val values = (0 until 100 by 2).map(id => id.toString.getBytes).toArray
 
     for(value <- values)
@@ -1084,10 +1036,8 @@ class LogTest {
    */
   @Test
   def testAppendAndReadWithNonSequentialOffsets() {
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
-    val log = Log(logDir,  LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats, time = time)
+    val logConfig = createLogConfig(segmentBytes = 72)
+    val log = createLog(logDir, logConfig)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
     val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
 
@@ -1110,10 +1060,8 @@ class LogTest {
    */
   @Test
   def testReadAtLogGap() {
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer)
-    val log = Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats, time = time)
+    val logConfig = createLogConfig(segmentBytes = 300)
+    val log = createLog(logDir, logConfig)
 
     // keep appending until we have two segments with only a single message in the second segment
     while(log.numberOfSegments == 1)
@@ -1128,10 +1076,8 @@ class LogTest {
 
   @Test
   def testReadWithMinMessage() {
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
-    val log = Log(logDir,  LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats, time = time)
+    val logConfig = createLogConfig(segmentBytes = 72)
+    val log = createLog(logDir,  logConfig)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
     val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
 
@@ -1157,10 +1103,8 @@ class LogTest {
 
   @Test
   def testReadWithTooSmallMaxLength() {
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
-    val log = Log(logDir,  LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats, time = time)
+    val logConfig = createLogConfig(segmentBytes = 72)
+    val log = createLog(logDir,  logConfig)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
     val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
 
@@ -1192,12 +1136,9 @@ class LogTest {
   @Test
   def testReadOutOfRange() {
     createEmptyLogs(logDir, 1024)
-    val logProps = new Properties()
-
     // set up replica log starting with offset 1024 and with one message (at offset 1024)
-    logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
-    val log = Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats, time = time)
+    val logConfig = createLogConfig(segmentBytes = 1024)
+    val log = createLog(logDir, logConfig)
     log.appendAsLeader(TestUtils.singletonRecords(value = "42".getBytes), leaderEpoch = 0)
 
     assertEquals("Reading at the log end offset should produce 0 byte read.", 0,
@@ -1228,13 +1169,11 @@ class LogTest {
   @Test
   def testLogRolls() {
     /* create a multipart log with 100 messages */
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
-    val log = Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats, time = time)
+    val logConfig = createLogConfig(segmentBytes = 100)
+    val log = createLog(logDir, logConfig)
     val numMessages = 100
     val messageSets = (0 until numMessages).map(i => TestUtils.singletonRecords(value = i.toString.getBytes,
-                                                                                timestamp = time.milliseconds))
+                                                                                timestamp = mockTime.milliseconds))
     messageSets.foreach(log.appendAsLeader(_, leaderEpoch = 0))
     log.flush()
 
@@ -1268,10 +1207,8 @@ class LogTest {
   @Test
   def testCompressedMessages() {
     /* this log should roll after every messageset */
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, 110: java.lang.Integer)
-    val log = Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats, time = time)
+    val logConfig = createLogConfig(segmentBytes = 110)
+    val log = createLog(logDir, logConfig)
 
     /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */
     log.appendAsLeader(MemoryRecords.withRecords(CompressionType.GZIP, new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)), leaderEpoch = 0)
@@ -1294,13 +1231,10 @@ class LogTest {
     for(messagesToAppend <- List(0, 1, 25)) {
       logDir.mkdirs()
       // first test a log segment starting at 0
-      val logProps = new Properties()
-      logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
-      logProps.put(LogConfig.RetentionMsProp, 0: java.lang.Integer)
-      val log = Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
-        brokerTopicStats = brokerTopicStats, time = time)
+      val logConfig = createLogConfig(segmentBytes = 100, retentionMs = 0)
+      val log = createLog(logDir, logConfig)
       for(i <- 0 until messagesToAppend)
-        log.appendAsLeader(TestUtils.singletonRecords(value = i.toString.getBytes, timestamp = time.milliseconds - 10), leaderEpoch = 0)
+        log.appendAsLeader(TestUtils.singletonRecords(value = i.toString.getBytes, timestamp = mockTime.milliseconds - 10), leaderEpoch = 0)
 
       val currOffset = log.logEndOffset
       assertEquals(currOffset, messagesToAppend)
@@ -1315,7 +1249,7 @@ class LogTest {
       assertEquals("Still no change in the logEndOffset", currOffset, log.logEndOffset)
       assertEquals("Should still be able to append and should get the logEndOffset assigned to the new append",
                    currOffset,
-                   log.appendAsLeader(TestUtils.singletonRecords(value = "hello".getBytes, timestamp = time.milliseconds), leaderEpoch = 0).firstOffset)
+                   log.appendAsLeader(TestUtils.singletonRecords(value = "hello".getBytes, timestamp = mockTime.milliseconds), leaderEpoch = 0).firstOffset)
 
       // cleanup the log
       log.delete()
@@ -1331,12 +1265,8 @@ class LogTest {
     val messageSet = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("You".getBytes), new SimpleRecord("bethe".getBytes))
     // append messages to log
     val configSegmentSize = messageSet.sizeInBytes - 1
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, configSegmentSize: java.lang.Integer)
-    // We use need to use magic value 1 here because the test is message size sensitive.
-    logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
-    val log = Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats, time = time)
+    val logConfig = createLogConfig(segmentBytes = configSegmentSize)
+    val log = createLog(logDir, logConfig)
 
     try {
       log.appendAsLeader(messageSet, leaderEpoch = 0)
@@ -1360,11 +1290,8 @@ class LogTest {
     val messageSetWithKeyedMessage = MemoryRecords.withRecords(CompressionType.NONE, keyedMessage)
     val messageSetWithKeyedMessages = MemoryRecords.withRecords(CompressionType.NONE, keyedMessage, anotherKeyedMessage)
 
-    val logProps = new Properties()
-    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
-
-    val log = Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats, time = time)
+    val logConfig = createLogConfig(cleanupPolicy = LogConfig.Compact)
+    val log = createLog(logDir, logConfig)
 
     try {
       log.appendAsLeader(messageSetWithUnkeyedMessage, leaderEpoch = 0)
@@ -1404,10 +1331,8 @@ class LogTest {
 
     // append messages to log
     val maxMessageSize = second.sizeInBytes - 1
-    val logProps = new Properties()
-    logProps.put(LogConfig.MaxMessageBytesProp, maxMessageSize: java.lang.Integer)
-    val log = Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats, time = time)
+    val logConfig = createLogConfig(maxMessageBytes = maxMessageSize)
+    val log = createLog(logDir, logConfig)
 
     // should be able to append the small message
     log.appendAsLeader(first, leaderEpoch = 0)
@@ -1428,16 +1353,11 @@ class LogTest {
     val messageSize = 100
     val segmentSize = 7 * messageSize
     val indexInterval = 3 * messageSize
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
-    logProps.put(LogConfig.IndexIntervalBytesProp, indexInterval: java.lang.Integer)
-    logProps.put(LogConfig.SegmentIndexBytesProp, 4096: java.lang.Integer)
-    val config = LogConfig(logProps)
-    var log = Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats, time = time)
+    val logConfig = createLogConfig(segmentBytes = segmentSize, indexIntervalBytes = indexInterval, segmentIndexBytes = 4096)
+    var log = createLog(logDir, logConfig)
     for(i <- 0 until numMessages)
       log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(messageSize),
-        timestamp = time.milliseconds + i * 10), leaderEpoch = 0)
+        timestamp = mockTime.milliseconds + i * 10), leaderEpoch = 0)
     assertEquals("After appending %d messages to an empty log, the log end offset should be %d".format(numMessages, numMessages), numMessages, log.logEndOffset)
     val lastIndexOffset = log.activeSegment.index.lastOffset
     val numIndexEntries = log.activeSegment.index.entries
@@ -1460,14 +1380,12 @@ class LogTest {
       assertEquals("Should have same number of time index entries as before.", numTimeIndexEntries, log.activeSegment.timeIndex.entries)
     }
 
-    log = Log(logDir, config, logStartOffset = 0L, recoveryPoint = lastOffset, scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats, time = time)
+    log = createLog(logDir, logConfig, recoveryPoint = lastOffset)
     verifyRecoveredLog(log)
     log.close()
 
     // test recovery case
-    log = Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats, time = time)
+    log = createLog(logDir, logConfig)
     verifyRecoveredLog(log)
     log.close()
   }
@@ -1478,22 +1396,17 @@ class LogTest {
   @Test
   def testBuildTimeIndexWhenNotAssigningOffsets() {
     val numMessages = 100
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, 10000: java.lang.Integer)
-    logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
-
-    val config = LogConfig(logProps)
-    val log = Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats, time = time)
+    val logConfig = createLogConfig(segmentBytes = 10000, indexIntervalBytes = 1)
+    val log = createLog(logDir, logConfig)
 
     val messages = (0 until numMessages).map { i =>
-      MemoryRecords.withRecords(100 + i, CompressionType.NONE, 0, new SimpleRecord(time.milliseconds + i, i.toString.getBytes()))
+      MemoryRecords.withRecords(100 + i, CompressionType.NONE, 0, new SimpleRecord(mockTime.milliseconds + i, i.toString.getBytes()))
     }
     messages.foreach(log.appendAsFollower)
     val timeIndexEntries = log.logSegments.foldLeft(0) { (entries, segment) => entries + segment.timeIndex.entries }
     assertEquals(s"There should be ${numMessages - 1} time index entries", numMessages - 1, timeIndexEntries)
-    assertEquals(s"The last time index entry should have timestamp ${time.milliseconds + numMessages - 1}",
-      time.milliseconds + numMessages - 1, log.activeSegment.timeIndex.lastEntry.timestamp)
+    assertEquals(s"The last time index entry should have timestamp ${mockTime.milliseconds + numMessages - 1}",
+      mockTime.milliseconds + numMessages - 1, log.activeSegment.timeIndex.lastEntry.timestamp)
   }
 
   /**
@@ -1503,15 +1416,10 @@ class LogTest {
   def testIndexRebuild() {
     // publish the messages and close the log
     val numMessages = 200
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, 200: java.lang.Integer)
-    logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
-
-    val config = LogConfig(logProps)
-    var log = Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats, time = time)
+    val logConfig = createLogConfig(segmentBytes = 200, indexIntervalBytes = 1)
+    var log = createLog(logDir, logConfig)
     for(i <- 0 until numMessages)
-      log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10), leaderEpoch = 0)
+      log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = mockTime.milliseconds + i * 10), leaderEpoch = 0)
     val indexFiles = log.logSegments.map(_.index.file)
     val timeIndexFiles = log.logSegments.map(_.timeIndex.file)
     log.close()
@@ -1521,17 +1429,16 @@ class LogTest {
     timeIndexFiles.foreach(_.delete())
 
     // reopen the log
-    log = Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats, time = time)
+    log = createLog(logDir, logConfig)
     assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
     assertTrue("The index should have been rebuilt", log.logSegments.head.index.entries > 0)
     assertTrue("The time index should have been rebuilt", log.logSegments.head.timeIndex.entries > 0)
     for(i <- 0 until numMessages) {
       assertEquals(i, log.readUncommitted(i, 100, None).records.batches.iterator.next().lastOffset)
       if (i == 0)
-        assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10).get.offset)
+        assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(mockTime.milliseconds + i * 10).get.offset)
       else
-        assertEquals(i, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10).get.offset)
+        assertEquals(i, log.fetchOffsetsByTimestamp(mockTime.milliseconds + i * 10).get.offset)
     }
     log.close()
   }
@@ -1543,17 +1450,11 @@ class LogTest {
   def testRebuildTimeIndexForOldMessages() {
     val numMessages = 200
     val segmentSize = 200
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
-    logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
-    logProps.put(LogConfig.MessageFormatVersionProp, "0.9.0")
-
-    val config = LogConfig(logProps)
-    var log = Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats, time = time)
+    val logConfig = createLogConfig(segmentBytes = segmentSize, indexIntervalBytes = 1, messageFormatVersion = "0.9.0")
+    var log = createLog(logDir, logConfig)
     for(i <- 0 until numMessages)
       log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10),
-        timestamp = time.milliseconds + i * 10, magicValue = RecordBatch.MAGIC_VALUE_V1), leaderEpoch = 0)
+        timestamp = mockTime.milliseconds + i * 10, magicValue = RecordBatch.MAGIC_VALUE_V1), leaderEpoch = 0)
     val timeIndexFiles = log.logSegments.map(_.timeIndex.file)
     log.close()
 
@@ -1561,8 +1462,7 @@ class LogTest {
     timeIndexFiles.foreach(_.delete())
 
     // The rebuilt time index should be empty
-    log = Log(logDir, config, logStartOffset = 0L, recoveryPoint = numMessages + 1, scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats, time = time)
+    log = createLog(logDir, logConfig, recoveryPoint = numMessages + 1)
     val segArray = log.logSegments.toArray
     for (i <- segArray.indices.init) {
       assertEquals("The time index should be empty", 0, segArray(i).timeIndex.entries)
@@ -1577,15 +1477,10 @@ class LogTest {
   def testCorruptIndexRebuild() {
     // publish the messages and close the log
     val numMessages = 200
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, 200: java.lang.Integer)
-    logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
-
-    val config = LogConfig(logProps)
-    var log = Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats, time = time)
+    val logConfig = createLogConfig(segmentBytes = 200, indexIntervalBytes = 1)
+    var log = createLog(logDir, logConfig)
     for(i <- 0 until numMessages)
-      log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10), leaderEpoch = 0)
+      log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = mockTime.milliseconds + i * 10), leaderEpoch = 0)
     val indexFiles = log.logSegments.map(_.index.file)
     val timeIndexFiles = log.logSegments.map(_.timeIndex.file)
     log.close()
@@ -1605,15 +1500,14 @@ class LogTest {
     }
 
     // reopen the log
-    log = Log(logDir, config, logStartOffset = 0L, recoveryPoint = 200L, scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats, time = time)
+    log = createLog(logDir, logConfig, recoveryPoint = 200L)
     assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
     for(i <- 0 until numMessages) {
       assertEquals(i, log.readUncommitted(i, 100, None).records.batches.iterator.next().lastOffset)
       if (i == 0)
-        assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10).get.offset)
+        assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(mockTime.milliseconds + i * 10).get.offset)
       else
-        assertEquals(i, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10).get.offset)
+        assertEquals(i, log.fetchOffsetsByTimestamp(mockTime.milliseconds + i * 10).get.offset)
     }
     log.close()
   }
@@ -1623,17 +1517,14 @@ class LogTest {
    */
   @Test
   def testTruncateTo() {
-    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
+    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
     val setSize = createRecords.sizeInBytes
     val msgPerSeg = 10
     val segmentSize = msgPerSeg * setSize  // each segment will be 10 messages
 
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
-
     // create a log
-    val log = Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats, time = time)
+    val logConfig = createLogConfig(segmentBytes = segmentSize)
+    val log = createLog(logDir, logConfig)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     for (_ <- 1 to msgPerSeg)
@@ -1681,24 +1572,20 @@ class LogTest {
    */
   @Test
   def testIndexResizingAtTruncation() {
-    val setSize = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds).sizeInBytes
+    val setSize = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds).sizeInBytes
     val msgPerSeg = 10
     val segmentSize = msgPerSeg * setSize  // each segment will be 10 messages
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
-    logProps.put(LogConfig.IndexIntervalBytesProp, setSize - 1: java.lang.Integer)
-    val config = LogConfig(logProps)
-    val log = Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats, time = time)
+    val logConfig = createLogConfig(segmentBytes = segmentSize, indexIntervalBytes = setSize - 1)
+    val log = createLog(logDir, logConfig)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     for (i<- 1 to msgPerSeg)
-      log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds + i), leaderEpoch = 0)
+      log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds + i), leaderEpoch = 0)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
-    time.sleep(msgPerSeg)
+    mockTime.sleep(msgPerSeg)
     for (i<- 1 to msgPerSeg)
-      log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds + i), leaderEpoch = 0)
+      log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds + i), leaderEpoch = 0)
     assertEquals("There should be exactly 2 segment.", 2, log.numberOfSegments)
     val expectedEntries = msgPerSeg - 1
 
@@ -1710,9 +1597,9 @@ class LogTest {
     assertEquals("The index of segment 1 should be resized to maxIndexSize", log.config.maxIndexSize/8, log.logSegments.toList.head.index.maxEntries)
     assertEquals("The time index of segment 1 should be resized to maxIndexSize", log.config.maxIndexSize/12, log.logSegments.toList.head.timeIndex.maxEntries)
 
-    time.sleep(msgPerSeg)
+    mockTime.sleep(msgPerSeg)
     for (i<- 1 to msgPerSeg)
-      log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds + i), leaderEpoch = 0)
+      log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds + i), leaderEpoch = 0)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
   }
 
@@ -1726,18 +1613,9 @@ class LogTest {
     val bogusIndex2 = Log.offsetIndexFile(logDir, 5)
     val bogusTimeIndex2 = Log.timeIndexFile(logDir, 5)
 
-    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, createRecords.sizeInBytes * 5: java.lang.Integer)
-    logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
-    logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
-    val log = Log(logDir,
-      LogConfig(logProps),
-      logStartOffset = 0L,
-      recoveryPoint = 0L,
-      scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats,
-      time = time)
+    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
+    val logConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, indexIntervalBytes = 1)
+    val log = createLog(logDir, logConfig)
 
     assertTrue("The first index file should have been replaced with a larger file", bogusIndex1.length > 0)
     assertTrue("The first time index file should have been replaced with a larger file", bogusTimeIndex1.length > 0)
@@ -1756,33 +1634,16 @@ class LogTest {
    */
   @Test
   def testReopenThenTruncate() {
-    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, createRecords.sizeInBytes * 5: java.lang.Integer)
-    logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
-    logProps.put(LogConfig.IndexIntervalBytesProp, 10000: java.lang.Integer)
-    val config = LogConfig(logProps)
-
+    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
     // create a log
-    var log = Log(logDir,
-      config,
-      logStartOffset = 0L,
-      recoveryPoint = 0L,
-      scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats,
-      time = time)
+    val logConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, indexIntervalBytes = 10000)
+    var log = createLog(logDir, logConfig)
 
     // add enough messages to roll over several segments then close and re-open and attempt to truncate
     for (_ <- 0 until 100)
       log.appendAsLeader(createRecords, leaderEpoch = 0)
     log.close()
-    log = Log(logDir,
-      config,
-      logStartOffset = 0L,
-      recoveryPoint = 0L,
-      scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats,
-      time = time)
+    log = createLog(logDir, logConfig)
     log.truncateTo(3)
     assertEquals("All but one segment should be deleted.", 1, log.numberOfSegments)
     assertEquals("Log end offset should be 3.", 3, log.logEndOffset)
@@ -1793,23 +1654,11 @@ class LogTest {
    */
   @Test
   def testAsyncDelete() {
-    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds - 1000L)
+    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000L)
     val asyncDeleteMs = 1000
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, createRecords.sizeInBytes * 5: java.lang.Integer)
-    logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
-    logProps.put(LogConfig.IndexIntervalBytesProp, 10000: java.lang.Integer)
-    logProps.put(LogConfig.FileDeleteDelayMsProp, asyncDeleteMs: java.lang.Integer)
-    logProps.put(LogConfig.RetentionMsProp, 999: java.lang.Integer)
-    val config = LogConfig(logProps)
-
-    val log = Log(logDir,
-      config,
-      logStartOffset = 0L,
-      recoveryPoint = 0L,
-      scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats,
-      time = time)
+    val logConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, indexIntervalBytes = 10000,
+                                    retentionMs = 999, fileDeleteDelayMs = asyncDeleteMs)
+    val log = createLog(logDir, logConfig)
 
     // append some messages to create some segments
     for (_ <- 0 until 100)
@@ -1831,7 +1680,7 @@ class LogTest {
 
     // when enough time passes the files should be deleted
     val deletedFiles = segments.map(_.log.file) ++ segments.map(_.index.file)
-    time.sleep(asyncDeleteMs + 1)
+    mockTime.sleep(asyncDeleteMs + 1)
     assertTrue("Files should all be gone.", deletedFiles.forall(!_.exists))
   }
 
@@ -1840,19 +1689,9 @@ class LogTest {
    */
   @Test
   def testOpenDeletesObsoleteFiles() {
-    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds - 1000)
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, createRecords.sizeInBytes * 5: java.lang.Integer)
-    logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
-    logProps.put(LogConfig.RetentionMsProp, 999: java.lang.Integer)
-    val config = LogConfig(logProps)
-    var log = Log(logDir,
-      config,
-      logStartOffset = 0L,
-      recoveryPoint = 0L,
-      scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats,
-      time = time)
+    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000)
+    val logConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999)
+    var log = createLog(logDir, logConfig)
 
     // append some messages to create some segments
     for (_ <- 0 until 100)
@@ -1862,26 +1701,13 @@ class LogTest {
     log.onHighWatermarkIncremented(log.logEndOffset)
     log.deleteOldSegments()
     log.close()
-
-    log = Log(logDir,
-      config,
-      logStartOffset = 0L,
-      recoveryPoint = 0L,
-      scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats,
-      time = time)
+    log = createLog(logDir, logConfig)
     assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
   }
 
   @Test
   def testAppendMessageWithNullPayload() {
-    val log = Log(logDir,
-      LogConfig(),
-      logStartOffset = 0L,
-      recoveryPoint = 0L,
-      scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats,
-      time = time)
+    val log = createLog(logDir, LogConfig())
     log.appendAsLeader(TestUtils.singletonRecords(value = null), leaderEpoch = 0)
     val head = log.readUncommitted(0, 4096, None).records.records.iterator.next()
     assertEquals(0, head.offset)
@@ -1890,13 +1716,7 @@ class LogTest {
 
   @Test(expected = classOf[IllegalArgumentException])
   def testAppendWithOutOfOrderOffsetsThrowsException() {
-    val log = Log(logDir,
-      LogConfig(),
-      logStartOffset = 0L,
-      recoveryPoint = 0L,
-      scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats,
-      time = time)
+    val log = createLog(logDir, LogConfig())
     val records = (0 until 2).map(id => new SimpleRecord(id.toString.getBytes)).toArray
     records.foreach(record => log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, record), leaderEpoch = 0))
     val invalidRecord = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(1.toString.getBytes))
@@ -1905,13 +1725,7 @@ class LogTest {
 
   @Test
   def testAppendWithNoTimestamp(): Unit = {
-    val log = Log(logDir,
-      LogConfig(),
-      logStartOffset = 0L,
-      recoveryPoint = 0L,
-      scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats,
-      time = time)
+    val log = createLog(logDir, LogConfig())
     log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE,
       new SimpleRecord(RecordBatch.NO_TIMESTAMP, "key".getBytes, "value".getBytes)), leaderEpoch = 0)
   }
@@ -1919,23 +1733,13 @@ class LogTest {
   @Test
   def testCorruptLog() {
     // append some messages to create some segments
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, 1000: java.lang.Integer)
-    logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
-    logProps.put(LogConfig.MaxMessageBytesProp, 64*1024: java.lang.Integer)
-    val config = LogConfig(logProps)
-    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
+    val logConfig = createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
+    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
     val recoveryPoint = 50L
     for (_ <- 0 until 10) {
       // create a log and write some messages to it
       logDir.mkdirs()
-      var log = Log(logDir,
-        config,
-        logStartOffset = 0L,
-        recoveryPoint = 0L,
-        scheduler = time.scheduler,
-        brokerTopicStats = brokerTopicStats,
-        time = time)
+      var log = createLog(logDir, logConfig)
       val numMessages = 50 + TestUtils.random.nextInt(50)
       for (_ <- 0 until numMessages)
         log.appendAsLeader(createRecords, leaderEpoch = 0)
@@ -1947,7 +1751,7 @@ class LogTest {
       TestUtils.appendNonsenseToFile(log.activeSegment.log.file, TestUtils.random.nextInt(1024) + 1)
 
       // attempt recovery
-      log = Log(logDir, config, 0L, recoveryPoint, time.scheduler, brokerTopicStats, time)
+      log = createLog(logDir, logConfig, 0L, recoveryPoint)
       assertEquals(numMessages, log.logEndOffset)
 
       val recovered = log.logSegments.flatMap(_.log.records.asScala.toList).toList
@@ -1968,18 +1772,8 @@ class LogTest {
   @Test
   def testOverCompactedLogRecovery(): Unit = {
     // append some messages to create some segments
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, 1000: java.lang.Integer)
-    logProps.put(LogConfig.MaxMessageBytesProp, 64*1024: java.lang.Integer)
-    logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
-    val config = LogConfig(logProps)
-    val log = Log(logDir,
-      config,
-      logStartOffset = 0L,
-      recoveryPoint = 0L,
-      scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats,
-      time = time)
+    val logConfig = createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
+    val log = createLog(logDir, logConfig)
     val set1 = MemoryRecords.withRecords(0, CompressionType.NONE, 0, new SimpleRecord("v1".getBytes(), "k1".getBytes()))
     val set2 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 2, CompressionType.NONE, 0, new SimpleRecord("v3".getBytes(), "k3".getBytes()))
     val set3 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 3, CompressionType.NONE, 0, new SimpleRecord("v4".getBytes(), "k4".getBytes()))
@@ -2012,24 +1806,14 @@ class LogTest {
   @Test
   def testCleanShutdownFile() {
     // append some messages to create some segments
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, 1000: java.lang.Integer)
-    logProps.put(LogConfig.MaxMessageBytesProp, 64*1024: java.lang.Integer)
-    logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
-    val config = LogConfig(logProps)
-    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
+    val logConfig = createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
+    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
 
     val cleanShutdownFile = createCleanShutdownFile()
     assertTrue(".kafka_cleanshutdown must exist", cleanShutdownFile.exists())
     var recoveryPoint = 0L
     // create a log and write some messages to it
-    var log = Log(logDir,
-      config,
-      logStartOffset = 0L,
-      recoveryPoint = 0L,
-      scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats,
-      time = time)
+    var log = createLog(logDir, logConfig)
     for (_ <- 0 until 100)
       log.appendAsLeader(createRecords, leaderEpoch = 0)
     log.close()
@@ -2037,7 +1821,7 @@ class LogTest {
     // check if recovery was attempted. Even if the recovery point is 0L, recovery should not be attempted as the
     // clean shutdown file exists.
     recoveryPoint = log.logEndOffset
-    log = Log(logDir, config, 0L, 0L, time.scheduler, brokerTopicStats, time)
+    log = createLog(logDir, logConfig)
     assertEquals(recoveryPoint, log.logEndOffset)
     cleanShutdownFile.delete()
   }
@@ -2196,19 +1980,9 @@ class LogTest {
 
   @Test
   def testDeleteOldSegments() {
-    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds - 1000)
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, createRecords.sizeInBytes * 5: java.lang.Integer)
-    logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
-    logProps.put(LogConfig.RetentionMsProp, 999: java.lang.Integer)
-    val config = LogConfig(logProps)
-    val log = Log(logDir,
-      config,
-      logStartOffset = 0L,
-      recoveryPoint = 0L,
-      scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats,
-      time = time)
+    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000)
+    val logConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999)
+    val log = createLog(logDir, logConfig)
 
     // append some messages to create some segments
     for (_ <- 0 until 100)
@@ -2257,7 +2031,8 @@ class LogTest {
   @Test
   def testLogDeletionAfterDeleteRecords() {
     def createRecords = TestUtils.singletonRecords("test".getBytes)
-    val log = createLog(createRecords.sizeInBytes)
+    val logConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5)
+    val log = createLog(logDir, logConfig)
 
     for (_ <- 0 until 15)
       log.appendAsLeader(createRecords, leaderEpoch = 0)
@@ -2288,7 +2063,8 @@ class LogTest {
   @Test
   def shouldDeleteSizeBasedSegments() {
     def createRecords = TestUtils.singletonRecords("test".getBytes)
-    val log = createLog(createRecords.sizeInBytes, retentionBytes = createRecords.sizeInBytes * 10)
+    val logConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10)
+    val log = createLog(logDir, logConfig)
 
     // append some messages to create some segments
     for (_ <- 0 until 15)
@@ -2302,7 +2078,8 @@ class LogTest {
   @Test
   def shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize() {
     def createRecords = TestUtils.singletonRecords("test".getBytes)
-    val log = createLog(createRecords.sizeInBytes, retentionBytes = createRecords.sizeInBytes * 15)
+    val logConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 15)
+    val log = createLog(logDir, logConfig)
 
     // append some messages to create some segments
     for (_ <- 0 until 15)
@@ -2316,7 +2093,8 @@ class LogTest {
   @Test
   def shouldDeleteTimeBasedSegmentsReadyToBeDeleted() {
     def createRecords = TestUtils.singletonRecords("test".getBytes, timestamp = 10)
-    val log = createLog(createRecords.sizeInBytes, retentionMs = 10000)
+    val logConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000)
+    val log = createLog(logDir, logConfig)
 
     // append some messages to create some segments
     for (_ <- 0 until 15)
@@ -2329,8 +2107,9 @@ class LogTest {
 
   @Test
   def shouldNotDeleteTimeBasedSegmentsWhenNoneReadyToBeDeleted() {
-    def createRecords = TestUtils.singletonRecords("test".getBytes, timestamp = time.milliseconds)
-    val log = createLog(createRecords.sizeInBytes, retentionMs = 10000000)
+    def createRecords = TestUtils.singletonRecords("test".getBytes, timestamp = mockTime.milliseconds)
+    val logConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000000)
+    val log = createLog(logDir, logConfig)
 
     // append some messages to create some segments
     for (_ <- 0 until 15)
@@ -2344,16 +2123,15 @@ class LogTest {
   @Test
   def shouldNotDeleteSegmentsWhenPolicyDoesNotIncludeDelete() {
     def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes(), timestamp = 10L)
-    val log = createLog(createRecords.sizeInBytes,
-      retentionMs = 10000,
-      cleanupPolicy = "compact")
+    val logConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000, cleanupPolicy = "compact")
+    val log = createLog(logDir, logConfig)
 
     // append some messages to create some segments
     for (_ <- 0 until 15)
       log.appendAsLeader(createRecords, leaderEpoch = 0)
 
     // mark oldest segment as older the retention.ms
-    log.logSegments.head.lastModified = time.milliseconds - 20000
+    log.logSegments.head.lastModified = mockTime.milliseconds - 20000
 
     val segments = log.numberOfSegments
     log.onHighWatermarkIncremented(log.logEndOffset)
@@ -2364,9 +2142,8 @@ class LogTest {
   @Test
   def shouldDeleteSegmentsReadyToBeDeletedWhenCleanupPolicyIsCompactAndDelete() {
     def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes, timestamp = 10L)
-    val log = createLog(createRecords.sizeInBytes,
-      retentionMs = 10000,
-      cleanupPolicy = "compact,delete")
+    val logConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000, cleanupPolicy = "compact,delete")
+    val log = createLog(logDir, logConfig)
 
     // append some messages to create some segments
     for (_ <- 0 until 15)
@@ -2383,8 +2160,7 @@ class LogTest {
 
     //Given this partition is on leader epoch 72
     val epoch = 72
-    val log = Log(logDir, LogConfig(), recoveryPoint = 0L, scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats, time = time)
+    val log = createLog(logDir, LogConfig())
     log.leaderEpochCache.assign(epoch, records.size)
 
     //When appending messages as a leader (i.e. assignOffsets = true)
@@ -2416,8 +2192,7 @@ class LogTest {
       recs
     }
 
-    val log = Log(logDir, LogConfig(), recoveryPoint = 0L, scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats, time = time)
+    val log = createLog(logDir, LogConfig())
 
     //When appending as follower (assignOffsets = false)
     for (i <- records.indices)
@@ -2429,7 +2204,8 @@ class LogTest {
   @Test
   def shouldTruncateLeaderEpochsWhenDeletingSegments() {
     def createRecords = TestUtils.singletonRecords("test".getBytes)
-    val log = createLog(createRecords.sizeInBytes, retentionBytes = createRecords.sizeInBytes * 10)
+    val logConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10)
+    val log = createLog(logDir, logConfig)
     val cache = epochCache(log)
 
     // Given three segments of 5 messages each
@@ -2453,7 +2229,8 @@ class LogTest {
   @Test
   def shouldUpdateOffsetForLeaderEpochsWhenDeletingSegments() {
     def createRecords = TestUtils.singletonRecords("test".getBytes)
-    val log = createLog(createRecords.sizeInBytes, retentionBytes = createRecords.sizeInBytes * 10)
+    val logConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10)
+    val log = createLog(logDir, logConfig)
     val cache = epochCache(log)
 
     // Given three segments of 5 messages each
@@ -2476,10 +2253,9 @@ class LogTest {
 
   @Test
   def shouldTruncateLeaderEpochFileWhenTruncatingLog() {
-    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
-    val logProps = CoreUtils.propsWith(LogConfig.SegmentBytesProp, (10 * createRecords.sizeInBytes).toString)
-    val log = Log(logDir, LogConfig( logProps), recoveryPoint = 0L, scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats, time = time)
+    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
+    val logConfig = createLogConfig(segmentBytes = 10 * createRecords.sizeInBytes)
+    val log = createLog(logDir, logConfig)
     val cache = epochCache(log)
 
     //Given 2 segments, 10 messages per segment
@@ -2524,8 +2300,7 @@ class LogTest {
     */
   @Test
   def testLogRecoversForLeaderEpoch() {
-    val log = Log(logDir, LogConfig(new Properties()), recoveryPoint = 0L, scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats, time = time)
+    val log = createLog(logDir, LogConfig())
     val leaderEpochCache = epochCache(log)
     val firstBatch = singletonRecordsWithLeaderEpoch(value = "random".getBytes, leaderEpoch = 1, offset = 0)
     log.appendAsFollower(records = firstBatch)
@@ -2547,8 +2322,7 @@ class LogTest {
     log.close()
 
     // reopen the log and recover from the beginning
-    val recoveredLog = Log(logDir, LogConfig(new Properties()), recoveryPoint = 0L, scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats, time = time)
+    val recoveredLog = createLog(logDir, LogConfig())
     val recoveredLeaderEpochCache = epochCache(recoveredLog)
 
     // epoch entries should be recovered
@@ -2576,7 +2350,8 @@ class LogTest {
   }
 
   def testFirstUnstableOffsetNoTransactionalData() {
-    val log = createLog(1024 * 1024)
+    val logConfig = createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    val log = createLog(logDir, logConfig)
 
     val records = MemoryRecords.withRecords(CompressionType.NONE,
       new SimpleRecord("foo".getBytes),
@@ -2589,7 +2364,8 @@ class LogTest {
 
   @Test
   def testFirstUnstableOffsetWithTransactionalData() {
-    val log = createLog(1024 * 1024)
+    val logConfig = createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    val log = createLog(logDir, logConfig)
 
     val pid = 137L
     val epoch = 5.toShort
@@ -2626,7 +2402,8 @@ class LogTest {
 
   @Test
   def testTransactionIndexUpdated(): Unit = {
-    val log = createLog(1024 * 1024)
+    val logConfig = createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    val log = createLog(logDir, logConfig)
     val epoch = 0.toShort
 
     val pid1 = 1L
@@ -2666,7 +2443,8 @@ class LogTest {
 
   @Test
   def testFullTransactionIndexRecovery(): Unit = {
-    val log = createLog(128)
+    val logConfig = createLogConfig(segmentBytes = 128 * 5)
+    val log = createLog(logDir, logConfig)
     val epoch = 0.toShort
 
     val pid1 = 1L
@@ -2708,14 +2486,16 @@ class LogTest {
 
     log.close()
 
-    val reloadedLog = createLog(1024)
+    val reloadedLogConfig = createLogConfig(segmentBytes = 1024 * 5)
+    val reloadedLog = createLog(logDir, reloadedLogConfig)
     val abortedTransactions = allAbortedTransactions(reloadedLog)
     assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions)
   }
 
   @Test
   def testRecoverOnlyLastSegment(): Unit = {
-    val log = createLog(128)
+    val logConfig = createLogConfig(segmentBytes = 128 * 5)
+    val log = createLog(logDir, logConfig)
     val epoch = 0.toShort
 
     val pid1 = 1L
@@ -2757,14 +2537,16 @@ class LogTest {
 
     log.close()
 
-    val reloadedLog = createLog(1024, recoveryPoint = recoveryPoint)
+    val reloadedLogConfig = createLogConfig(segmentBytes = 1024 * 5)
+    val reloadedLog = createLog(logDir, reloadedLogConfig, recoveryPoint = recoveryPoint)
     val abortedTransactions = allAbortedTransactions(reloadedLog)
     assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions)
   }
 
   @Test
   def testRecoverLastSegmentWithNoSnapshots(): Unit = {
-    val log = createLog(128)
+    val logConfig = createLogConfig(segmentBytes = 128 * 5)
+    val log = createLog(logDir, logConfig)
     val epoch = 0.toShort
 
     val pid1 = 1L
@@ -2812,7 +2594,8 @@ class LogTest {
 
     log.close()
 
-    val reloadedLog = createLog(1024, recoveryPoint = recoveryPoint)
+    val reloadedLogConfig = createLogConfig(segmentBytes = 1024 * 5)
+    val reloadedLog = createLog(logDir, reloadedLogConfig, recoveryPoint = recoveryPoint)
     val abortedTransactions = allAbortedTransactions(reloadedLog)
     assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions)
   }
@@ -2820,7 +2603,8 @@ class LogTest {
   @Test
   def testTransactionIndexUpdatedThroughReplication(): Unit = {
     val epoch = 0.toShort
-    val log = createLog(1024 * 1024)
+    val logConfig = createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    val log = createLog(logDir, logConfig)
     val buffer = ByteBuffer.allocate(2048)
 
     val pid1 = 1L
@@ -2865,7 +2649,8 @@ class LogTest {
   def testZombieCoordinatorFenced(): Unit = {
     val pid = 1L
     val epoch = 0.toShort
-    val log = createLog(1024 * 1024)
+    val logConfig = createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    val log = createLog(logDir, logConfig)
 
     val append = appendTransactionalAsLeader(log, pid, epoch)
 
@@ -2880,7 +2665,8 @@ class LogTest {
 
   @Test
   def testFirstUnstableOffsetDoesNotExceedLogStartOffsetMidSegment(): Unit = {
-    val log = createLog(1024 * 1024)
+    val logConfig = createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    val log = createLog(logDir, logConfig)
     val epoch = 0.toShort
     val pid = 1L
     val appendPid = appendTransactionalAsLeader(log, pid, epoch)
@@ -2903,7 +2689,8 @@ class LogTest {
 
   @Test
   def testFirstUnstableOffsetDoesNotExceedLogStartOffsetAfterSegmentDeletion(): Unit = {
-    val log = createLog(1024 * 1024)
+    val logConfig = createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    val log = createLog(logDir, logConfig)
     val epoch = 0.toShort
     val pid = 1L
     val appendPid = appendTransactionalAsLeader(log, pid, epoch)
@@ -2929,7 +2716,8 @@ class LogTest {
 
   @Test
   def testLastStableOffsetWithMixedProducerData() {
-    val log = createLog(1024 * 1024)
+    val logConfig = createLogConfig(segmentBytes = 1024 * 1024 * 5)
+    val log = createLog(logDir, logConfig)
 
     // for convenience, both producers share the same epoch
     val epoch = 5.toShort
@@ -2989,7 +2777,8 @@ class LogTest {
       new SimpleRecord("b".getBytes),
       new SimpleRecord("c".getBytes))
 
-    val log = createLog(messageSizeInBytes = records.sizeInBytes, messagesPerSegment = 1)
+    val logConfig = createLogConfig(segmentBytes = records.sizeInBytes)
+    val log = createLog(logDir, logConfig)
 
     val firstAppendInfo = log.appendAsLeader(records, leaderEpoch = 0)
     assertEquals(Some(firstAppendInfo.firstOffset), log.firstUnstableOffset.map(_.messageOffset))
@@ -3019,26 +2808,52 @@ class LogTest {
     assertEquals(new AbortedTransaction(pid, 0), fetchDataInfo.abortedTransactions.get.head)
   }
 
-  private def createLog(messageSizeInBytes: Int, retentionMs: Int = -1, retentionBytes: Int = -1,
-                        cleanupPolicy: String = "delete", messagesPerSegment: Int = 5,
-                        maxPidExpirationMs: Int = 300000, pidExpirationCheckIntervalMs: Int = 30000,
-                        recoveryPoint: Long = 0L, logStartOffset: Long = 0L): Log = {
+  def createLogConfig(segmentMs: Long = Defaults.SegmentMs,
+                      segmentBytes: Int = Defaults.SegmentSize,
+                      retentionMs: Long = Defaults.RetentionMs,
+                      retentionBytes: Long = Defaults.RetentionSize,
+                      segmentJitterMs: Long = Defaults.SegmentJitterMs,
+                      cleanupPolicy: String = Defaults.CleanupPolicy,
+                      maxMessageBytes: Int = Defaults.MaxMessageSize,
+                      indexIntervalBytes: Int = Defaults.IndexInterval,
+                      segmentIndexBytes: Int = Defaults.MaxIndexSize,
+                      messageFormatVersion: String = Defaults.MessageFormatVersion,
+                      fileDeleteDelayMs: Long = Defaults.FileDeleteDelayMs): LogConfig = {
     val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, messageSizeInBytes * messagesPerSegment: Integer)
-    logProps.put(LogConfig.RetentionMsProp, retentionMs: Integer)
-    logProps.put(LogConfig.RetentionBytesProp, retentionBytes: Integer)
+
+    logProps.put(LogConfig.SegmentMsProp, segmentMs: java.lang.Long)
+    logProps.put(LogConfig.SegmentBytesProp, segmentBytes: Integer)
+    logProps.put(LogConfig.RetentionMsProp, retentionMs: java.lang.Long)
+    logProps.put(LogConfig.RetentionBytesProp, retentionBytes: java.lang.Long)
+    logProps.put(LogConfig.SegmentJitterMsProp, segmentJitterMs: java.lang.Long)
     logProps.put(LogConfig.CleanupPolicyProp, cleanupPolicy)
-    logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString)
-    val config = LogConfig(logProps)
-    Log(logDir,
-      config,
-      logStartOffset = logStartOffset,
-      recoveryPoint = recoveryPoint,
-      scheduler = time.scheduler,
-      brokerTopicStats = brokerTopicStats,
-      time = time,
-      maxProducerIdExpirationMs = maxPidExpirationMs,
-      producerIdExpirationCheckIntervalMs = pidExpirationCheckIntervalMs)
+    logProps.put(LogConfig.MaxMessageBytesProp, maxMessageBytes: Integer)
+    logProps.put(LogConfig.IndexIntervalBytesProp, indexIntervalBytes: Integer)
+    logProps.put(LogConfig.SegmentIndexBytesProp, segmentIndexBytes: Integer)
+    logProps.put(LogConfig.MessageFormatVersionProp, messageFormatVersion)
+    logProps.put(LogConfig.FileDeleteDelayMsProp, fileDeleteDelayMs: java.lang.Long)
+    LogConfig(logProps)
+  }
+
+  def createLog(dir: File,
+                config: LogConfig,
+                logStartOffset: Long = 0L,
+                recoveryPoint: Long = 0L,
+                scheduler: Scheduler = mockTime.scheduler,
+                brokerTopicStats: BrokerTopicStats = brokerTopicStats,
+                time: Time = mockTime,
+                maxProducerIdExpirationMs: Int = 60 * 60 * 1000,
+                producerIdExpirationCheckIntervalMs: Int = LogManager.ProducerIdExpirationCheckIntervalMs): Log = {
+    Log(dir = dir,
+        config = config,
+        logStartOffset = logStartOffset,
+        recoveryPoint = recoveryPoint,
+        scheduler = scheduler,
+        brokerTopicStats = brokerTopicStats,
+        time = time,
+        maxProducerIdExpirationMs = maxProducerIdExpirationMs,
+        producerIdExpirationCheckIntervalMs = producerIdExpirationCheckIntervalMs,
+        logDirFailureChannel = new LogDirFailureChannel(10))
   }
 
   private def allAbortedTransactions(log: Log) = log.logSegments.flatMap(_.txnIndex.allAbortedTxns)
@@ -3087,7 +2902,7 @@ class LogTest {
   private def appendEndTxnMarkerToBuffer(buffer: ByteBuffer, producerId: Long, producerEpoch: Short, offset: Long,
                                          controlType: ControlRecordType, coordinatorEpoch: Int = 0): Unit = {
     val marker = new EndTransactionMarker(controlType, coordinatorEpoch)
-    MemoryRecords.writeEndTransactionalMarker(buffer, offset, time.milliseconds(), 0, producerId, producerEpoch, marker)
+    MemoryRecords.writeEndTransactionalMarker(buffer, offset, mockTime.milliseconds(), 0, producerId, producerEpoch, marker)
   }
 
   private def appendNonTransactionalToBuffer(buffer: ByteBuffer, offset: Long, numRecords: Int): Unit = {
@@ -3111,5 +2926,4 @@ class LogTest {
     assertTrue(".kafka_cleanshutdown must exist", cleanShutdownFile.exists())
     cleanShutdownFile
   }
-
 }


Mime
View raw message