kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [kafka] branch trunk updated: KAFKA-13068: Rename Log to UnifiedLog (#11154)
Date Thu, 12 Aug 2021 23:11:53 GMT
This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new db1f581  KAFKA-13068: Rename Log to UnifiedLog (#11154)
db1f581 is described below

commit db1f581da7f3440cfd5be93800b4a9a2d7327a35
Author: Kowshik Prakasam <kprakasam@confluent.io>
AuthorDate: Thu Aug 12 16:10:19 2021 -0700

    KAFKA-13068: Rename Log to UnifiedLog (#11154)
    
    In this PR, I've renamed kafka.log.Log to kafka.log.UnifiedLog. With the advent of KIP-405, going forward the existing Log class would present a unified view of local and tiered log segments, so we rename it to UnifiedLog. The motivation for this PR is also the same as outlined in this design document: https://docs.google.com/document/d/1dQJL4MCwqQJSPmZkVmVzshFZKuFy_bCPtubav4wBfHQ/edit.
    This PR is a follow-up to #10280 where we had refactored the Log layer introducing a new kafka.log.LocalLog class.
    
    Note: the Log class name had to be hardcoded to ensure metrics are defined under the Log class (for backwards compatibility). Please refer to the newly introduced UnifiedLog.metricName() method.
    
    Reviewers: Cong Ding <cong@ccding.com>, Satish Duggana <satishd@apache.org>, Jun Rao <junrao@gmail.com>
---
 core/src/main/scala/kafka/cluster/Partition.scala  | 32 ++++----
 core/src/main/scala/kafka/cluster/Replica.scala    |  4 +-
 core/src/main/scala/kafka/log/LogCleaner.scala     | 14 ++--
 .../main/scala/kafka/log/LogCleanerManager.scala   | 18 ++--
 core/src/main/scala/kafka/log/LogLoader.scala      | 28 +++----
 core/src/main/scala/kafka/log/LogManager.scala     | 70 ++++++++--------
 core/src/main/scala/kafka/log/LogSegment.scala     | 16 ++--
 .../scala/kafka/log/ProducerStateManager.scala     |  8 +-
 .../kafka/log/{Log.scala => UnifiedLog.scala}      | 59 +++++++------
 .../main/scala/kafka/raft/KafkaMetadataLog.scala   |  8 +-
 core/src/main/scala/kafka/raft/RaftManager.scala   |  4 +-
 .../main/scala/kafka/server/KafkaRaftServer.scala  |  4 +-
 .../scala/kafka/server/LogOffsetMetadata.scala     |  6 +-
 .../main/scala/kafka/server/ReplicaManager.scala   | 30 +++----
 .../server/metadata/BrokerMetadataPublisher.scala  |  4 +-
 .../main/scala/kafka/tools/DumpLogSegments.scala   | 18 ++--
 .../api/GroupCoordinatorIntegrationTest.scala      |  4 +-
 .../scala/kafka/raft/KafkaMetadataLogTest.scala    |  4 +-
 .../src/test/scala/other/kafka/StressTestLog.scala |  6 +-
 .../scala/other/kafka/TestLinearWriteSpeed.scala   |  2 +-
 .../scala/unit/kafka/admin/DeleteTopicTest.scala   |  4 +-
 .../unit/kafka/cluster/PartitionLockTest.scala     |  8 +-
 .../scala/unit/kafka/cluster/PartitionTest.scala   | 28 +++----
 .../scala/unit/kafka/cluster/ReplicaTest.scala     |  6 +-
 .../AbstractCoordinatorConcurrencyTest.scala       | 12 +--
 .../group/GroupMetadataManagerTest.scala           | 12 +--
 .../TransactionCoordinatorConcurrencyTest.scala    |  4 +-
 .../transaction/TransactionStateManagerTest.scala  |  8 +-
 .../log/AbstractLogCleanerIntegrationTest.scala    | 10 +--
 .../unit/kafka/log/BrokerCompressionTest.scala     |  2 +-
 .../unit/kafka/log/LogCleanerIntegrationTest.scala |  4 +-
 .../kafka/log/LogCleanerLagIntegrationTest.scala   |  4 +-
 .../unit/kafka/log/LogCleanerManagerTest.scala     | 62 +++++++-------
 .../LogCleanerParameterizedIntegrationTest.scala   |  8 +-
 .../test/scala/unit/kafka/log/LogCleanerTest.scala | 56 ++++++-------
 .../scala/unit/kafka/log/LogConcurrencyTest.scala  | 12 +--
 .../test/scala/unit/kafka/log/LogLoaderTest.scala  | 96 +++++++++++-----------
 .../test/scala/unit/kafka/log/LogManagerTest.scala |  6 +-
 .../test/scala/unit/kafka/log/LogSegmentTest.scala |  2 +-
 .../scala/unit/kafka/log/LogSegmentsTest.scala     |  2 +-
 .../test/scala/unit/kafka/log/LogTestUtils.scala   | 56 ++++++-------
 .../unit/kafka/log/ProducerStateManagerTest.scala  | 22 ++---
 .../log/{LogTest.scala => UnifiedLogTest.scala}    | 68 +++++++--------
 .../unit/kafka/server/IsrExpirationTest.scala      |  8 +-
 .../unit/kafka/server/KafkaRaftServerTest.scala    |  4 +-
 .../scala/unit/kafka/server/LogOffsetTest.scala    |  8 +-
 .../server/ReplicaAlterLogDirsThreadTest.scala     | 44 +++++-----
 .../kafka/server/ReplicaFetcherThreadTest.scala    | 28 +++----
 .../kafka/server/ReplicaManagerQuotasTest.scala    |  4 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     |  4 +-
 ...chDrivenReplicationProtocolAcceptanceTest.scala |  6 +-
 .../server/epoch/OffsetsForLeaderEpochTest.scala   |  4 +-
 .../metadata/BrokerMetadataPublisherTest.scala     |  6 +-
 .../unit/kafka/tools/DumpLogSegmentsTest.scala     |  6 +-
 .../scala/unit/kafka/utils/SchedulerTest.scala     |  8 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  2 +-
 56 files changed, 486 insertions(+), 477 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 506e665..76d4193 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -243,9 +243,9 @@ class Partition(val topicPartition: TopicPartition,
   // is getting changed (as a result of ReplicaAlterLogDirs command), we may have two logs until copy
   // completes and a switch to new location is performed.
   // log and futureLog variables defined below are used to capture this
-  @volatile var log: Option[Log] = None
+  @volatile var log: Option[UnifiedLog] = None
   // If ReplicaAlterLogDir command is in progress, this is future location of the log
-  @volatile var futureLog: Option[Log] = None
+  @volatile var futureLog: Option[UnifiedLog] = None
 
   /* Epoch of the controller that last changed the leader. This needs to be initialized correctly upon broker startup.
    * One way of doing that is through the controller's start replica state change command. When a new broker starts up
@@ -313,10 +313,10 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   def createLogIfNotExists(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): Unit = {
-    def maybeCreate(logOpt: Option[Log]): Log = {
+    def maybeCreate(logOpt: Option[UnifiedLog]): UnifiedLog = {
       logOpt match {
         case Some(log) =>
-          trace(s"${if (isFutureReplica) "Future Log" else "Log"} already exists.")
+          trace(s"${if (isFutureReplica) "Future UnifiedLog" else "UnifiedLog"} already exists.")
           if (log.topicId.isEmpty)
             topicId.foreach(log.assignTopicId)
           log
@@ -333,8 +333,8 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   // Visible for testing
-  private[cluster] def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): Log = {
-    def updateHighWatermark(log: Log) = {
+  private[cluster] def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): UnifiedLog = {
+    def updateHighWatermark(log: UnifiedLog) = {
       val checkpointHighWatermark = offsetCheckpoints.fetch(log.parentDir, topicPartition).getOrElse {
         info(s"No checkpointed highwatermark is found for partition $topicPartition")
         0L
@@ -344,7 +344,7 @@ class Partition(val topicPartition: TopicPartition,
     }
 
     logManager.initializingLog(topicPartition)
-    var maybeLog: Option[Log] = None
+    var maybeLog: Option[UnifiedLog] = None
     try {
       val log = logManager.getOrCreateLog(topicPartition, isNew, isFutureReplica, topicId)
       maybeLog = Some(log)
@@ -373,7 +373,7 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   private def getLocalLog(currentLeaderEpoch: Optional[Integer],
-                          requireLeader: Boolean): Either[Log, Errors] = {
+                          requireLeader: Boolean): Either[UnifiedLog, Errors] = {
     checkCurrentLeaderEpoch(currentLeaderEpoch) match {
       case Errors.NONE =>
         if (requireLeader && !isLeader) {
@@ -391,17 +391,17 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  def localLogOrException: Log = log.getOrElse {
+  def localLogOrException: UnifiedLog = log.getOrElse {
     throw new NotLeaderOrFollowerException(s"Log for partition $topicPartition is not available " +
       s"on broker $localBrokerId")
   }
 
-  def futureLocalLogOrException: Log = futureLog.getOrElse {
+  def futureLocalLogOrException: UnifiedLog = futureLog.getOrElse {
     throw new NotLeaderOrFollowerException(s"Future log for partition $topicPartition is not available " +
       s"on broker $localBrokerId")
   }
 
-  def leaderLogIfLocal: Option[Log] = {
+  def leaderLogIfLocal: Option[UnifiedLog] = {
     log.filter(_ => isLeader)
   }
 
@@ -411,7 +411,7 @@ class Partition(val topicPartition: TopicPartition,
   def isLeader: Boolean = leaderReplicaIdOpt.contains(localBrokerId)
 
   private def localLogWithEpochOrException(currentLeaderEpoch: Optional[Integer],
-                                           requireLeader: Boolean): Log = {
+                                           requireLeader: Boolean): UnifiedLog = {
     getLocalLog(currentLeaderEpoch, requireLeader) match {
       case Left(localLog) => localLog
       case Right(error) =>
@@ -422,7 +422,7 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   // Visible for testing -- Used by unit tests to set log for this partition
-  def setLog(log: Log, isFutureLog: Boolean): Unit = {
+  def setLog(log: UnifiedLog, isFutureLog: Boolean): Unit = {
     if (isFutureLog)
       futureLog = Some(log)
     else
@@ -576,9 +576,9 @@ class Partition(val topicPartition: TopicPartition,
         remoteReplicas.foreach { replica =>
           replica.updateFetchState(
             followerFetchOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata,
-            followerStartOffset = Log.UnknownOffset,
+            followerStartOffset = UnifiedLog.UnknownOffset,
             followerFetchTimeMs = 0L,
-            leaderEndOffset = Log.UnknownOffset)
+            leaderEndOffset = UnifiedLog.UnknownOffset)
         }
       }
       // we may need to increment high watermark since ISR could be down to 1
@@ -843,7 +843,7 @@ class Partition(val topicPartition: TopicPartition,
    *
    * @return true if the HW was incremented, and false otherwise.
    */
-  private def maybeIncrementLeaderHW(leaderLog: Log, curTime: Long = time.milliseconds): Boolean = {
+  private def maybeIncrementLeaderHW(leaderLog: UnifiedLog, curTime: Long = time.milliseconds): Boolean = {
     // maybeIncrementLeaderHW is in the hot path, the following code is written to
     // avoid unnecessary collection generation
     var newHighWatermark = leaderLog.logEndOffsetMetadata
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index ffc1c99..921faef 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -17,7 +17,7 @@
 
 package kafka.cluster
 
-import kafka.log.Log
+import kafka.log.UnifiedLog
 import kafka.server.LogOffsetMetadata
 import kafka.utils.Logging
 import org.apache.kafka.common.TopicPartition
@@ -28,7 +28,7 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log
   @volatile private[this] var _logEndOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata
   // the log start offset value, kept in all replicas;
   // for local replica it is the log's start offset, for remote replicas its value is only updated by follower fetch
-  @volatile private[this] var _logStartOffset = Log.UnknownOffset
+  @volatile private[this] var _logStartOffset = UnifiedLog.UnknownOffset
 
   // The log end offset value at the time the leader received the last FetchRequest from this follower
   // This is used to determine the lastCaughtUpTimeMs of the follower
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 42eb2a6..1655dfb 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -90,7 +90,7 @@ import scala.util.control.ControlThrowable
  */
 class LogCleaner(initialConfig: CleanerConfig,
                  val logDirs: Seq[File],
-                 val logs: Pool[TopicPartition, Log],
+                 val logs: Pool[TopicPartition, UnifiedLog],
                  val logDirFailureChannel: LogDirFailureChannel,
                  time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup with BrokerReconfigurable
 {
@@ -272,7 +272,7 @@ class LogCleaner(initialConfig: CleanerConfig,
     * retention threads need to make this call to obtain:
     * @return A list of log partitions that retention threads can safely work on
     */
-  def pauseCleaningForNonCompactedPartitions(): Iterable[(TopicPartition, Log)] = {
+  def pauseCleaningForNonCompactedPartitions(): Iterable[(TopicPartition, UnifiedLog)] = {
     cleanerManager.pauseCleaningForNonCompactedPartitions()
   }
 
@@ -356,7 +356,7 @@ class LogCleaner(initialConfig: CleanerConfig,
             case e: Exception => throw new LogCleaningException(cleanable.log, e.getMessage, e)
           }
       }
-      val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs()
+      val deletable: Iterable[(TopicPartition, UnifiedLog)] = cleanerManager.deletableLogs()
       try {
         deletable.foreach { case (_, log) =>
           try {
@@ -549,14 +549,14 @@ private[log] class Cleaner(val id: Int,
    * @param transactionMetadata State of ongoing transactions which is carried between the cleaning
    *                            of the grouped segments
    */
-  private[log] def cleanSegments(log: Log,
+  private[log] def cleanSegments(log: UnifiedLog,
                                  segments: Seq[LogSegment],
                                  map: OffsetMap,
                                  deleteHorizonMs: Long,
                                  stats: CleanerStats,
                                  transactionMetadata: CleanedTransactionMetadata): Unit = {
     // create a new segment with a suffix appended to the name of the log and indexes
-    val cleaned = Log.createNewCleanedSegment(log.dir, log.config, segments.head.baseOffset)
+    val cleaned = UnifiedLog.createNewCleanedSegment(log.dir, log.config, segments.head.baseOffset)
     transactionMetadata.cleanedIndex = Some(cleaned.txnIndex)
 
     try {
@@ -876,7 +876,7 @@ private[log] class Cleaner(val id: Int,
    * @param map The map in which to store the mappings
    * @param stats Collector for cleaning statistics
    */
-  private[log] def buildOffsetMap(log: Log,
+  private[log] def buildOffsetMap(log: UnifiedLog,
                                   start: Long,
                                   end: Long,
                                   map: OffsetMap,
@@ -1063,7 +1063,7 @@ private class CleanerStats(time: Time = Time.SYSTEM) {
   * and whether it needs compaction immediately.
   */
 private case class LogToClean(topicPartition: TopicPartition,
-                              log: Log,
+                              log: UnifiedLog,
                               firstDirtyOffset: Long,
                               uncleanableOffset: Long,
                               needCompactionNow: Boolean = false) extends Ordered[LogToClean] {
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index eea889a..7e14184 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -38,7 +38,7 @@ private[log] case object LogCleaningInProgress extends LogCleaningState
 private[log] case object LogCleaningAborted extends LogCleaningState
 private[log] case class LogCleaningPaused(pausedCount: Int) extends LogCleaningState
 
-private[log] class LogCleaningException(val log: Log,
+private[log] class LogCleaningException(val log: UnifiedLog,
                                         private val message: String,
                                         private val cause: Throwable) extends KafkaException(message, cause)
 
@@ -59,7 +59,7 @@ private[log] class LogCleaningException(val log: Log,
   *                              Valid previous state is LogCleaningPaused(i-1) or LogCleaningPaused(i+1).
   */
 private[log] class LogCleanerManager(val logDirs: Seq[File],
-                                     val logs: Pool[TopicPartition, Log],
+                                     val logs: Pool[TopicPartition, UnifiedLog],
                                      val logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup {
   import LogCleanerManager._
 
@@ -216,7 +216,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
     * switch topic configuration between compacted and non-compacted topic.
     * @return retention logs that have log cleaning successfully paused
     */
-  def pauseCleaningForNonCompactedPartitions(): Iterable[(TopicPartition, Log)] = {
+  def pauseCleaningForNonCompactedPartitions(): Iterable[(TopicPartition, UnifiedLog)] = {
     inLock(lock) {
       val deletableLogs = logs.filter {
         case (_, log) => !log.config.compact // pick non-compacted logs
@@ -236,7 +236,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
     * Include logs without delete enabled, as they may have segments
     * that precede the start offset.
     */
-  def deletableLogs(): Iterable[(TopicPartition, Log)] = {
+  def deletableLogs(): Iterable[(TopicPartition, UnifiedLog)] = {
     inLock(lock) {
       val toClean = logs.filter { case (topicPartition, log) =>
         !inProgress.contains(topicPartition) && log.config.compact &&
@@ -506,7 +506,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
     }
   }
 
-  private def isUncleanablePartition(log: Log, topicPartition: TopicPartition): Boolean = {
+  private def isUncleanablePartition(log: UnifiedLog, topicPartition: TopicPartition): Boolean = {
     inLock(lock) {
       uncleanablePartitions.get(log.parentDir).exists(partitions => partitions.contains(topicPartition))
     }
@@ -529,7 +529,7 @@ private case class OffsetsToClean(firstDirtyOffset: Long,
 
 private[log] object LogCleanerManager extends Logging {
 
-  def isCompactAndDelete(log: Log): Boolean = {
+  def isCompactAndDelete(log: UnifiedLog): Boolean = {
     log.config.compact && log.config.delete
   }
 
@@ -537,7 +537,7 @@ private[log] object LogCleanerManager extends Logging {
     * get max delay between the time when log is required to be compacted as determined
     * by maxCompactionLagMs and the current time.
     */
-  def maxCompactionDelay(log: Log, firstDirtyOffset: Long, now: Long) : Long = {
+  def maxCompactionDelay(log: UnifiedLog, firstDirtyOffset: Long, now: Long) : Long = {
     val dirtyNonActiveSegments = log.nonActiveLogSegmentsFrom(firstDirtyOffset)
     val firstBatchTimestamps = log.getFirstBatchTimestampForSegments(dirtyNonActiveSegments).filter(_ > 0)
 
@@ -564,7 +564,7 @@ private[log] object LogCleanerManager extends Logging {
     * @param now the current time in milliseconds of the cleaning operation
     * @return OffsetsToClean containing offsets for cleanable portion of log and whether the log checkpoint needs updating
     */
-  def cleanableOffsets(log: Log, lastCleanOffset: Option[Long], now: Long): OffsetsToClean = {
+  def cleanableOffsets(log: UnifiedLog, lastCleanOffset: Option[Long], now: Long): OffsetsToClean = {
     // If the log segments are abnormally truncated and hence the checkpointed offset is no longer valid;
     // reset to the log starting offset and log the error
     val (firstDirtyOffset, forceUpdateCheckpoint) = {
@@ -626,7 +626,7 @@ private[log] object LogCleanerManager extends Logging {
    * Given the first dirty offset and an uncleanable offset, calculates the total cleanable bytes for this log
    * @return the biggest uncleanable offset and the total amount of cleanable bytes
    */
-  def calculateCleanableBytes(log: Log, firstDirtyOffset: Long, uncleanableOffset: Long): (Long, Long) = {
+  def calculateCleanableBytes(log: UnifiedLog, firstDirtyOffset: Long, uncleanableOffset: Long): (Long, Long) = {
     val firstUncleanableSegment = log.nonActiveLogSegmentsFrom(uncleanableOffset).headOption.getOrElse(log.activeSegment)
     val firstUncleanableOffset = firstUncleanableSegment.baseOffset
     val cleanableBytes = log.logSegments(math.min(firstDirtyOffset, firstUncleanableOffset), firstUncleanableOffset).map(_.size.toLong).sum
diff --git a/core/src/main/scala/kafka/log/LogLoader.scala b/core/src/main/scala/kafka/log/LogLoader.scala
index 1eb6b9e..b075069 100644
--- a/core/src/main/scala/kafka/log/LogLoader.scala
+++ b/core/src/main/scala/kafka/log/LogLoader.scala
@@ -21,7 +21,7 @@ import java.io.{File, IOException}
 import java.nio.file.{Files, NoSuchFileException}
 
 import kafka.common.LogSegmentOffsetOverflowException
-import kafka.log.Log.{CleanedFileSuffix, DeletedFileSuffix, SwapFileSuffix, isIndexFile, isLogFile, offsetFromFile}
+import kafka.log.UnifiedLog.{CleanedFileSuffix, DeletedFileSuffix, SwapFileSuffix, isIndexFile, isLogFile, offsetFromFile}
 import kafka.server.{LogDirFailureChannel, LogOffsetMetadata}
 import kafka.server.epoch.LeaderEpochFileCache
 import kafka.utils.{CoreUtils, Logging, Scheduler}
@@ -111,14 +111,14 @@ object LogLoader extends Logging {
     // We store segments that require renaming in this code block, and do the actual renaming later.
     var minSwapFileOffset = Long.MaxValue
     var maxSwapFileOffset = Long.MinValue
-    swapFiles.filter(f => Log.isLogFile(new File(CoreUtils.replaceSuffix(f.getPath, SwapFileSuffix, "")))).foreach { f =>
+    swapFiles.filter(f => UnifiedLog.isLogFile(new File(CoreUtils.replaceSuffix(f.getPath, SwapFileSuffix, "")))).foreach { f =>
       val baseOffset = offsetFromFile(f)
       val segment = LogSegment.open(f.getParentFile,
         baseOffset = baseOffset,
         params.config,
         time = params.time,
-        fileSuffix = Log.SwapFileSuffix)
-      info(s"${params.logIdentifier}Found log file ${f.getPath} from interrupted swap operation, which is recoverable from ${Log.SwapFileSuffix} files by renaming.")
+        fileSuffix = UnifiedLog.SwapFileSuffix)
+      info(s"${params.logIdentifier}Found log file ${f.getPath} from interrupted swap operation, which is recoverable from ${UnifiedLog.SwapFileSuffix} files by renaming.")
       minSwapFileOffset = Math.min(segment.baseOffset, minSwapFileOffset)
       maxSwapFileOffset = Math.max(segment.readNextOffset, maxSwapFileOffset)
     }
@@ -145,8 +145,8 @@ object LogLoader extends Logging {
     // Third pass: rename all swap files.
     for (file <- params.dir.listFiles if file.isFile) {
       if (file.getName.endsWith(SwapFileSuffix)) {
-        info(s"${params.logIdentifier}Recovering file ${file.getName} by renaming from ${Log.SwapFileSuffix} files.")
-        file.renameTo(new File(CoreUtils.replaceSuffix(file.getPath, Log.SwapFileSuffix, "")))
+        info(s"${params.logIdentifier}Recovering file ${file.getName} by renaming from ${UnifiedLog.SwapFileSuffix} files.")
+        file.renameTo(new File(CoreUtils.replaceSuffix(file.getPath, UnifiedLog.SwapFileSuffix, "")))
       }
     }
 
@@ -163,7 +163,7 @@ object LogLoader extends Logging {
     })
 
     val (newRecoveryPoint: Long, nextOffset: Long) = {
-      if (!params.dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) {
+      if (!params.dir.getAbsolutePath.endsWith(UnifiedLog.DeleteDirSuffix)) {
         val (newRecoveryPoint, nextOffset) = retryOnOffsetOverflow(params, {
           recoverLog(params)
         })
@@ -199,7 +199,7 @@ object LogLoader extends Logging {
     // during log recovery may have deleted some files without the LogLoader.producerStateManager instance witnessing the
     // deletion.
     params.producerStateManager.removeStraySnapshots(params.segments.baseOffsets.toSeq)
-    Log.rebuildProducerState(
+    UnifiedLog.rebuildProducerState(
       params.producerStateManager,
       params.segments,
       newLogStartOffset,
@@ -281,7 +281,7 @@ object LogLoader extends Logging {
       } catch {
         case e: LogSegmentOffsetOverflowException =>
           info(s"${params.logIdentifier}Caught segment overflow error: ${e.getMessage}. Split segment and retry.")
-          val result = Log.splitOverflowedSegment(
+          val result = UnifiedLog.splitOverflowedSegment(
             e.segment,
             params.segments,
             params.dir,
@@ -314,7 +314,7 @@ object LogLoader extends Logging {
       if (isIndexFile(file)) {
         // if it is an index file, make sure it has a corresponding .log file
         val offset = offsetFromFile(file)
-        val logFile = Log.logFile(params.dir, offset)
+        val logFile = UnifiedLog.logFile(params.dir, offset)
         if (!logFile.exists) {
           warn(s"${params.logIdentifier}Found an orphaned index file ${file.getAbsolutePath}, with no corresponding log file.")
           Files.deleteIfExists(file.toPath)
@@ -322,7 +322,7 @@ object LogLoader extends Logging {
       } else if (isLogFile(file)) {
         // if it's a log file, load the corresponding log segment
         val baseOffset = offsetFromFile(file)
-        val timeIndexFileNewlyCreated = !Log.timeIndexFile(params.dir, baseOffset).exists()
+        val timeIndexFileNewlyCreated = !UnifiedLog.timeIndexFile(params.dir, baseOffset).exists()
         val segment = LogSegment.open(
           dir = params.dir,
           baseOffset = baseOffset,
@@ -363,7 +363,7 @@ object LogLoader extends Logging {
       params.dir,
       params.maxProducerIdExpirationMs,
       params.time)
-    Log.rebuildProducerState(
+    UnifiedLog.rebuildProducerState(
       producerStateManager,
       params.segments,
       params.logStartOffsetCheckpoint,
@@ -497,7 +497,7 @@ object LogLoader extends Logging {
       toDelete.foreach { segment =>
         params.segments.remove(segment.baseOffset)
       }
-      Log.deleteSegmentFiles(
+      UnifiedLog.deleteSegmentFiles(
         toDelete,
         asyncDelete = true,
         params.dir,
@@ -512,7 +512,7 @@ object LogLoader extends Logging {
 
   private def deleteProducerSnapshotsAsync(segments: Iterable[LogSegment],
                                            params: LoadLogParams): Unit = {
-    Log.deleteProducerSnapshots(segments,
+    UnifiedLog.deleteProducerSnapshots(segments,
       params.producerStateManager,
       asyncDelete = true,
       params.scheduler,
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 424ff65..695d963 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -77,13 +77,13 @@ class LogManager(logDirs: Seq[File],
   val InitialTaskDelayMs = 30 * 1000
 
   private val logCreationOrDeletionLock = new Object
-  private val currentLogs = new Pool[TopicPartition, Log]()
+  private val currentLogs = new Pool[TopicPartition, UnifiedLog]()
   // Future logs are put in the directory with "-future" suffix. Future log is created when user wants to move replica
   // from one log directory to another log directory on the same broker. The directory of the future log will be renamed
   // to replace the current log of the partition after the future log catches up with the current log
-  private val futureLogs = new Pool[TopicPartition, Log]()
+  private val futureLogs = new Pool[TopicPartition, UnifiedLog]()
   // Each element in the queue contains the log object to be deleted and the time it is scheduled for deletion.
-  private val logsToBeDeleted = new LinkedBlockingQueue[(Log, Long)]()
+  private val logsToBeDeleted = new LinkedBlockingQueue[(UnifiedLog, Long)]()
 
   private val _liveLogDirs: ConcurrentLinkedQueue[File] = createAndValidateLogDirs(logDirs, initialOfflineDirs)
   @volatile private var _currentDefaultConfig = initialDefaultConfig
@@ -206,7 +206,7 @@ class LogManager(logDirs: Seq[File],
       if (cleaner != null)
         cleaner.handleLogDirFailure(dir)
 
-      def removeOfflineLogs(logs: Pool[TopicPartition, Log]): Iterable[TopicPartition] = {
+      def removeOfflineLogs(logs: Pool[TopicPartition, UnifiedLog]): Iterable[TopicPartition] = {
         val offlineTopicPartitions: Iterable[TopicPartition] = logs.collect {
           case (tp, log) if log.parentDir == dir => tp
         }
@@ -248,7 +248,7 @@ class LogManager(logDirs: Seq[File],
     }
   }
 
-  private def addLogToBeDeleted(log: Log): Unit = {
+  private def addLogToBeDeleted(log: UnifiedLog): Unit = {
     this.logsToBeDeleted.add((log, time.milliseconds()))
   }
 
@@ -260,13 +260,13 @@ class LogManager(logDirs: Seq[File],
                            recoveryPoints: Map[TopicPartition, Long],
                            logStartOffsets: Map[TopicPartition, Long],
                            defaultConfig: LogConfig,
-                           topicConfigOverrides: Map[String, LogConfig]): Log = {
-    val topicPartition = Log.parseTopicPartitionName(logDir)
+                           topicConfigOverrides: Map[String, LogConfig]): UnifiedLog = {
+    val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
     val config = topicConfigOverrides.getOrElse(topicPartition.topic, defaultConfig)
     val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
     val logStartOffset = logStartOffsets.getOrElse(topicPartition, 0L)
 
-    val log = Log(
+    val log = UnifiedLog(
       dir = logDir,
       config = config,
       logStartOffset = logStartOffset,
@@ -281,7 +281,7 @@ class LogManager(logDirs: Seq[File],
       topicId = None,
       keepPartitionMetadataFile = keepPartitionMetadataFile)
 
-    if (logDir.getName.endsWith(Log.DeleteDirSuffix)) {
+    if (logDir.getName.endsWith(UnifiedLog.DeleteDirSuffix)) {
       addLogToBeDeleted(log)
     } else {
       val previous = {
@@ -354,7 +354,7 @@ class LogManager(logDirs: Seq[File],
         }
 
         val logsToLoad = Option(dir.listFiles).getOrElse(Array.empty).filter(logDir =>
-          logDir.isDirectory && Log.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic)
+          logDir.isDirectory && UnifiedLog.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic)
         val numLogsLoaded = new AtomicInteger(0)
         numTotalLogs += logsToLoad.length
 
@@ -567,7 +567,7 @@ class LogManager(logDirs: Seq[File],
    * @param isFuture True iff the truncation should be performed on the future log of the specified partitions
    */
   def truncateTo(partitionOffsets: Map[TopicPartition, Long], isFuture: Boolean): Unit = {
-    val affectedLogs = ArrayBuffer.empty[Log]
+    val affectedLogs = ArrayBuffer.empty[UnifiedLog]
     for ((topicPartition, truncateOffset) <- partitionOffsets) {
       val log = {
         if (isFuture)
@@ -668,7 +668,7 @@ class LogManager(logDirs: Seq[File],
    * @param logDir the directory in which the logs are
    * @param logsToCheckpoint the logs to be checkpointed
    */
-  private def checkpointRecoveryOffsetsInDir(logDir: File, logsToCheckpoint: Map[TopicPartition, Log]): Unit = {
+  private def checkpointRecoveryOffsetsInDir(logDir: File, logsToCheckpoint: Map[TopicPartition, UnifiedLog]): Unit = {
     try {
       recoveryPointCheckpoints.get(logDir).foreach { checkpoint =>
         val recoveryOffsets = logsToCheckpoint.map { case (tp, log) => tp -> log.recoveryPoint }
@@ -691,7 +691,7 @@ class LogManager(logDirs: Seq[File],
    * @param logDir the directory in which logs are checkpointed
    * @param logsToCheckpoint the logs to be checkpointed
    */
-  private def checkpointLogStartOffsetsInDir(logDir: File, logsToCheckpoint: Map[TopicPartition, Log]): Unit = {
+  private def checkpointLogStartOffsetsInDir(logDir: File, logsToCheckpoint: Map[TopicPartition, UnifiedLog]): Unit = {
     try {
       logStartOffsetCheckpoints.get(logDir).foreach { checkpoint =>
         val logStartOffsets = logsToCheckpoint.collect {
@@ -737,7 +737,7 @@ class LogManager(logDirs: Seq[File],
    * Truncate the cleaner's checkpoint to the based offset of the active segment of
    * the provided log.
    */
-  private def maybeTruncateCleanerCheckpointToActiveSegmentBaseOffset(log: Log, topicPartition: TopicPartition): Unit = {
+  private def maybeTruncateCleanerCheckpointToActiveSegmentBaseOffset(log: UnifiedLog, topicPartition: TopicPartition): Unit = {
     if (cleaner != null) {
       cleaner.maybeTruncateCheckpoint(log.parentDirFile, topicPartition, log.activeSegment.baseOffset)
     }
@@ -749,7 +749,7 @@ class LogManager(logDirs: Seq[File],
    * @param topicPartition the partition of the log
    * @param isFuture True iff the future log of the specified partition should be returned
    */
-  def getLog(topicPartition: TopicPartition, isFuture: Boolean = false): Option[Log] = {
+  def getLog(topicPartition: TopicPartition, isFuture: Boolean = false): Option[UnifiedLog] = {
     if (isFuture)
       Option(futureLogs.get(topicPartition))
     else
@@ -792,7 +792,7 @@ class LogManager(logDirs: Seq[File],
    * relevant log was being loaded.
    */
   def finishedInitializingLog(topicPartition: TopicPartition,
-                              maybeLog: Option[Log]): Unit = {
+                              maybeLog: Option[UnifiedLog]): Unit = {
     val removedValue = partitionsInitializing.remove(topicPartition)
     if (removedValue.contains(true))
       maybeLog.foreach(_.updateConfig(fetchLogConfig(topicPartition.topic)))
@@ -810,7 +810,7 @@ class LogManager(logDirs: Seq[File],
    * @throws KafkaStorageException if isNew=false, log is not found in the cache and there is offline log directory on the broker
    * @throws InconsistentTopicIdException if the topic ID in the log does not match the topic ID provided
    */
-  def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false, isFuture: Boolean = false, topicId: Option[Uuid]): Log = {
+  def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false, isFuture: Boolean = false, topicId: Option[Uuid]): UnifiedLog = {
     logCreationOrDeletionLock synchronized {
       val log = getLog(topicPartition, isFuture).getOrElse {
         // create the log if it has not already been created in another thread
@@ -835,9 +835,9 @@ class LogManager(logDirs: Seq[File],
 
         val logDirName = {
           if (isFuture)
-            Log.logFutureDirName(topicPartition)
+            UnifiedLog.logFutureDirName(topicPartition)
           else
-            Log.logDirName(topicPartition)
+            UnifiedLog.logDirName(topicPartition)
         }
 
         val logDir = logDirs
@@ -848,7 +848,7 @@ class LogManager(logDirs: Seq[File],
           .get // If Failure, will throw
 
         val config = fetchLogConfig(topicPartition.topic)
-        val log = Log(
+        val log = UnifiedLog(
           dir = logDir,
           config = config,
           logStartOffset = 0L,
@@ -976,7 +976,7 @@ class LogManager(logDirs: Seq[File],
       if (destLog == null)
         throw new KafkaStorageException(s"The future replica for $topicPartition is offline")
 
-      destLog.renameDir(Log.logDirName(topicPartition))
+      destLog.renameDir(UnifiedLog.logDirName(topicPartition))
       destLog.updateHighWatermark(sourceLog.highWatermark)
 
       // Now that future replica has been successfully renamed to be the current replica
@@ -989,7 +989,7 @@ class LogManager(logDirs: Seq[File],
       }
 
       try {
-        sourceLog.renameDir(Log.logDeleteDirName(topicPartition))
+        sourceLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition))
         // Now that replica in source log directory has been successfully renamed for deletion.
         // Close the log, update checkpoint files, and enqueue this log to be deleted.
         sourceLog.close()
@@ -1023,8 +1023,8 @@ class LogManager(logDirs: Seq[File],
     */
   def asyncDelete(topicPartition: TopicPartition,
                   isFuture: Boolean = false,
-                  checkpoint: Boolean = true): Option[Log] = {
-    val removedLog: Option[Log] = logCreationOrDeletionLock synchronized {
+                  checkpoint: Boolean = true): Option[UnifiedLog] = {
+    val removedLog: Option[UnifiedLog] = logCreationOrDeletionLock synchronized {
       removeLogAndMetrics(if (isFuture) futureLogs else currentLogs, topicPartition)
     }
     removedLog match {
@@ -1036,7 +1036,7 @@ class LogManager(logDirs: Seq[File],
             cleaner.updateCheckpoints(removedLog.parentDirFile, partitionToRemove = Option(topicPartition))
           }
         }
-        removedLog.renameDir(Log.logDeleteDirName(topicPartition))
+        removedLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition))
         if (checkpoint) {
           val logDir = removedLog.parentDirFile
           val logsToCheckpoint = logsInDir(logDir)
@@ -1159,9 +1159,9 @@ class LogManager(logDirs: Seq[File],
   /**
    * Get all the partition logs
    */
-  def allLogs: Iterable[Log] = currentLogs.values ++ futureLogs.values
+  def allLogs: Iterable[UnifiedLog] = currentLogs.values ++ futureLogs.values
 
-  def logsByTopic(topic: String): Seq[Log] = {
+  def logsByTopic(topic: String): Seq[UnifiedLog] = {
     (currentLogs.toList ++ futureLogs.toList).collect {
       case (topicPartition, log) if topicPartition.topic == topic => log
     }
@@ -1170,25 +1170,25 @@ class LogManager(logDirs: Seq[File],
   /**
    * Map of log dir to logs by topic and partitions in that dir
    */
-  private def logsByDir: Map[String, Map[TopicPartition, Log]] = {
+  private def logsByDir: Map[String, Map[TopicPartition, UnifiedLog]] = {
     // This code is called often by checkpoint processes and is written in a way that reduces
     // allocations and CPU with many topic partitions.
     // When changing this code please measure the changes with org.apache.kafka.jmh.server.CheckpointBench
-    val byDir = new mutable.AnyRefMap[String, mutable.AnyRefMap[TopicPartition, Log]]()
-    def addToDir(tp: TopicPartition, log: Log): Unit = {
-      byDir.getOrElseUpdate(log.parentDir, new mutable.AnyRefMap[TopicPartition, Log]()).put(tp, log)
+    val byDir = new mutable.AnyRefMap[String, mutable.AnyRefMap[TopicPartition, UnifiedLog]]()
+    def addToDir(tp: TopicPartition, log: UnifiedLog): Unit = {
+      byDir.getOrElseUpdate(log.parentDir, new mutable.AnyRefMap[TopicPartition, UnifiedLog]()).put(tp, log)
     }
     currentLogs.foreachEntry(addToDir)
     futureLogs.foreachEntry(addToDir)
     byDir
   }
 
-  private def logsInDir(dir: File): Map[TopicPartition, Log] = {
+  private def logsInDir(dir: File): Map[TopicPartition, UnifiedLog] = {
     logsByDir.getOrElse(dir.getAbsolutePath, Map.empty)
   }
 
-  private def logsInDir(cachedLogsByDir: Map[String, Map[TopicPartition, Log]],
-                        dir: File): Map[TopicPartition, Log] = {
+  private def logsInDir(cachedLogsByDir: Map[String, Map[TopicPartition, UnifiedLog]],
+                        dir: File): Map[TopicPartition, UnifiedLog] = {
     cachedLogsByDir.getOrElse(dir.getAbsolutePath, Map.empty)
   }
 
@@ -1221,7 +1221,7 @@ class LogManager(logDirs: Seq[File],
     }
   }
 
-  private def removeLogAndMetrics(logs: Pool[TopicPartition, Log], tp: TopicPartition): Option[Log] = {
+  private def removeLogAndMetrics(logs: Pool[TopicPartition, UnifiedLog], tp: TopicPartition): Option[UnifiedLog] = {
     val removedLog = logs.remove(tp)
     if (removedLog != null) {
       removedLog.removeLogMetrics()
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 8681562..7daf9c4 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -668,10 +668,10 @@ object LogSegment {
            initFileSize: Int = 0, preallocate: Boolean = false, fileSuffix: String = ""): LogSegment = {
     val maxIndexSize = config.maxIndexSize
     new LogSegment(
-      FileRecords.open(Log.logFile(dir, baseOffset, fileSuffix), fileAlreadyExists, initFileSize, preallocate),
-      LazyIndex.forOffset(Log.offsetIndexFile(dir, baseOffset, fileSuffix), baseOffset = baseOffset, maxIndexSize = maxIndexSize),
-      LazyIndex.forTime(Log.timeIndexFile(dir, baseOffset, fileSuffix), baseOffset = baseOffset, maxIndexSize = maxIndexSize),
-      new TransactionIndex(baseOffset, Log.transactionIndexFile(dir, baseOffset, fileSuffix)),
+      FileRecords.open(UnifiedLog.logFile(dir, baseOffset, fileSuffix), fileAlreadyExists, initFileSize, preallocate),
+      LazyIndex.forOffset(UnifiedLog.offsetIndexFile(dir, baseOffset, fileSuffix), baseOffset = baseOffset, maxIndexSize = maxIndexSize),
+      LazyIndex.forTime(UnifiedLog.timeIndexFile(dir, baseOffset, fileSuffix), baseOffset = baseOffset, maxIndexSize = maxIndexSize),
+      new TransactionIndex(baseOffset, UnifiedLog.transactionIndexFile(dir, baseOffset, fileSuffix)),
       baseOffset,
       indexIntervalBytes = config.indexInterval,
       rollJitterMs = config.randomSegmentJitter,
@@ -679,10 +679,10 @@ object LogSegment {
   }
 
   def deleteIfExists(dir: File, baseOffset: Long, fileSuffix: String = ""): Unit = {
-    Log.deleteFileIfExists(Log.offsetIndexFile(dir, baseOffset, fileSuffix))
-    Log.deleteFileIfExists(Log.timeIndexFile(dir, baseOffset, fileSuffix))
-    Log.deleteFileIfExists(Log.transactionIndexFile(dir, baseOffset, fileSuffix))
-    Log.deleteFileIfExists(Log.logFile(dir, baseOffset, fileSuffix))
+    UnifiedLog.deleteFileIfExists(UnifiedLog.offsetIndexFile(dir, baseOffset, fileSuffix))
+    UnifiedLog.deleteFileIfExists(UnifiedLog.timeIndexFile(dir, baseOffset, fileSuffix))
+    UnifiedLog.deleteFileIfExists(UnifiedLog.transactionIndexFile(dir, baseOffset, fileSuffix))
+    UnifiedLog.deleteFileIfExists(UnifiedLog.logFile(dir, baseOffset, fileSuffix))
   }
 }
 
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index b499e99..5f5c225 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
 import java.nio.channels.FileChannel
 import java.nio.file.{Files, NoSuchFileException, StandardOpenOption}
 import java.util.concurrent.ConcurrentSkipListMap
-import kafka.log.Log.offsetFromFile
+import kafka.log.UnifiedLog.offsetFromFile
 import kafka.server.LogOffsetMetadata
 import kafka.utils.{CoreUtils, Logging, nonthreadsafe, threadsafe}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
@@ -452,7 +452,7 @@ object ProducerStateManager {
     }
   }
 
-  private def isSnapshotFile(file: File): Boolean = file.getName.endsWith(Log.ProducerSnapshotFileSuffix)
+  private def isSnapshotFile(file: File): Boolean = file.getName.endsWith(UnifiedLog.ProducerSnapshotFileSuffix)
 
   // visible for testing
   private[log] def listSnapshotFiles(dir: File): Seq[SnapshotFile] = {
@@ -717,7 +717,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
   def takeSnapshot(): Unit = {
     // If not a new offset, then it is not worth taking another snapshot
     if (lastMapOffset > lastSnapOffset) {
-      val snapshotFile = SnapshotFile(Log.producerSnapshotFile(_logDir, lastMapOffset))
+      val snapshotFile = SnapshotFile(UnifiedLog.producerSnapshotFile(_logDir, lastMapOffset))
       val start = time.hiResClockMs()
       writeSnapshot(snapshotFile.file, producers)
       info(s"Wrote producer snapshot at offset $lastMapOffset with ${producers.size} producer ids in ${time.hiResClockMs() - start} ms.")
@@ -857,7 +857,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
       // deletion, so ignoring the exception here just means that the intended operation was
       // already completed.
       try {
-        snapshot.renameTo(Log.DeletedFileSuffix)
+        snapshot.renameTo(UnifiedLog.DeletedFileSuffix)
         Some(snapshot)
       } catch {
         case _: NoSuchFileException =>
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala
similarity index 97%
rename from core/src/main/scala/kafka/log/Log.scala
rename to core/src/main/scala/kafka/log/UnifiedLog.scala
index 7fa5351..58ce6bd 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -17,6 +17,8 @@
 
 package kafka.log
 
+import com.yammer.metrics.core.MetricName
+
 import java.io.{File, IOException}
 import java.nio.file.Files
 import java.util.Optional
@@ -136,7 +138,7 @@ case class LogAppendInfo(var firstOffset: Option[LogOffsetMetadata],
 /**
  * Container class which represents a snapshot of the significant offsets for a partition. This allows fetching
  * of these offsets atomically without the possibility of a leader change affecting their consistency relative
- * to each other. See [[Log.fetchOffsetSnapshot()]].
+ * to each other. See [[UnifiedLog.fetchOffsetSnapshot()]].
  */
 case class LogOffsetSnapshot(logStartOffset: Long,
                              logEndOffset: LogOffsetMetadata,
@@ -248,18 +250,18 @@ case object SnapshotGenerated extends LogStartOffsetIncrementReason {
  *                                  will be deleted to avoid ID conflicts upon re-upgrade.
  */
 @threadsafe
-class Log(@volatile var logStartOffset: Long,
-          private val localLog: LocalLog,
-          brokerTopicStats: BrokerTopicStats,
-          val producerIdExpirationCheckIntervalMs: Int,
-          @volatile var leaderEpochCache: Option[LeaderEpochFileCache],
-          val producerStateManager: ProducerStateManager,
-          @volatile private var _topicId: Option[Uuid],
-          val keepPartitionMetadataFile: Boolean) extends Logging with KafkaMetricsGroup {
+class UnifiedLog(@volatile var logStartOffset: Long,
+                 private val localLog: LocalLog,
+                 brokerTopicStats: BrokerTopicStats,
+                 val producerIdExpirationCheckIntervalMs: Int,
+                 @volatile var leaderEpochCache: Option[LeaderEpochFileCache],
+                 val producerStateManager: ProducerStateManager,
+                 @volatile private var _topicId: Option[Uuid],
+                 val keepPartitionMetadataFile: Boolean) extends Logging with KafkaMetricsGroup {
 
-  import kafka.log.Log._
+  import kafka.log.UnifiedLog._
 
-  this.logIdent = s"[Log partition=$topicPartition, dir=$parentDir] "
+  this.logIdent = s"[UnifiedLog partition=$topicPartition, dir=$parentDir] "
 
   /* A lock that guards all modifications to the log */
   private val lock = new Object
@@ -540,6 +542,13 @@ class Log(@volatile var logStartOffset: Long,
     }
   }, period = producerIdExpirationCheckIntervalMs, delay = producerIdExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS)
 
+  // For compatibility, metrics are defined to be under `Log` class
+  override def metricName(name: String, tags: scala.collection.Map[String, String]): MetricName = {
+    val pkg = getClass.getPackage
+    val pkgStr = if (pkg == null) "" else pkg.getName
+    explicitMetricName(pkgStr, "Log", name, tags)
+  }
+
   private def recordVersion: RecordVersion = config.recordVersion
 
   private def initializePartitionMetadata(): Unit = lock synchronized {
@@ -572,7 +581,7 @@ class Log(@volatile var logStartOffset: Long,
   }
 
   private def initializeLeaderEpochCache(): Unit = lock synchronized {
-    leaderEpochCache = Log.maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, recordVersion, logIdent)
+    leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, recordVersion, logIdent)
   }
 
   private def updateHighWatermarkWithLogEndOffset(): Unit = {
@@ -600,7 +609,7 @@ class Log(@volatile var logStartOffset: Long,
   private def rebuildProducerState(lastOffset: Long,
                                    producerStateManager: ProducerStateManager): Unit = lock synchronized {
     localLog.checkIfMemoryMappedBufferClosed()
-    Log.rebuildProducerState(producerStateManager, localLog.segments, logStartOffset, lastOffset, recordVersion, time,
+    UnifiedLog.rebuildProducerState(producerStateManager, localLog.segments, logStartOffset, lastOffset, recordVersion, time,
       reloadFromCleanShutdown = false, logIdent)
   }
 
@@ -1658,7 +1667,7 @@ class Log(@volatile var logStartOffset: Long,
   private[log] def replaceSegments(newSegments: Seq[LogSegment], oldSegments: Seq[LogSegment]): Unit = {
     lock synchronized {
       localLog.checkIfMemoryMappedBufferClosed()
-      val deletedSegments = Log.replaceSegments(localLog.segments, newSegments, oldSegments, dir, topicPartition,
+      val deletedSegments = UnifiedLog.replaceSegments(localLog.segments, newSegments, oldSegments, dir, topicPartition,
         config, scheduler, logDirFailureChannel, logIdent)
       deleteProducerSnapshots(deletedSegments, asyncDelete = true)
     }
@@ -1699,17 +1708,17 @@ class Log(@volatile var logStartOffset: Long,
   }
 
   private[log] def splitOverflowedSegment(segment: LogSegment): List[LogSegment] = lock synchronized {
-    val result = Log.splitOverflowedSegment(segment, localLog.segments, dir, topicPartition, config, scheduler, logDirFailureChannel, logIdent)
+    val result = UnifiedLog.splitOverflowedSegment(segment, localLog.segments, dir, topicPartition, config, scheduler, logDirFailureChannel, logIdent)
     deleteProducerSnapshots(result.deletedSegments, asyncDelete = true)
     result.newSegments.toList
   }
 
   private[log] def deleteProducerSnapshots(segments: Iterable[LogSegment], asyncDelete: Boolean): Unit = {
-    Log.deleteProducerSnapshots(segments, producerStateManager, asyncDelete, scheduler, config, logDirFailureChannel, parentDir, topicPartition)
+    UnifiedLog.deleteProducerSnapshots(segments, producerStateManager, asyncDelete, scheduler, config, logDirFailureChannel, parentDir, topicPartition)
   }
 }
 
-object Log extends Logging {
+object UnifiedLog extends Logging {
   val LogFileSuffix = LocalLog.LogFileSuffix
 
   val IndexFileSuffix = LocalLog.IndexFileSuffix
@@ -1747,17 +1756,17 @@ object Log extends Logging {
             logDirFailureChannel: LogDirFailureChannel,
             lastShutdownClean: Boolean = true,
             topicId: Option[Uuid],
-            keepPartitionMetadataFile: Boolean): Log = {
+            keepPartitionMetadataFile: Boolean): UnifiedLog = {
     // create the log directory if it doesn't exist
     Files.createDirectories(dir.toPath)
-    val topicPartition = Log.parseTopicPartitionName(dir)
+    val topicPartition = UnifiedLog.parseTopicPartitionName(dir)
     val segments = new LogSegments(topicPartition)
-    val leaderEpochCache = Log.maybeCreateLeaderEpochCache(
+    val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(
       dir,
       topicPartition,
       logDirFailureChannel,
       config.recordVersion,
-      s"[Log partition=$topicPartition, dir=${dir.getParent}] ")
+      s"[UnifiedLog partition=$topicPartition, dir=${dir.getParent}] ")
     val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs)
     val offsets = LogLoader.load(LoadLogParams(
       dir,
@@ -1775,7 +1784,7 @@ object Log extends Logging {
       producerStateManager))
     val localLog = new LocalLog(dir, config, segments, offsets.recoveryPoint,
       offsets.nextOffsetMetadata, scheduler, time, topicPartition, logDirFailureChannel)
-    new Log(offsets.logStartOffset,
+    new UnifiedLog(offsets.logStartOffset,
       localLog,
       brokerTopicStats,
       producerIdExpirationCheckIntervalMs,
@@ -2076,7 +2085,7 @@ object LogMetricNames {
   }
 }
 
-case class RetentionMsBreach(log: Log) extends SegmentDeletionReason {
+case class RetentionMsBreach(log: UnifiedLog) extends SegmentDeletionReason {
   override def logReason(toDelete: List[LogSegment]): Unit = {
     val retentionMs = log.config.retentionMs
     toDelete.foreach { segment =>
@@ -2092,7 +2101,7 @@ case class RetentionMsBreach(log: Log) extends SegmentDeletionReason {
   }
 }
 
-case class RetentionSizeBreach(log: Log) extends SegmentDeletionReason {
+case class RetentionSizeBreach(log: UnifiedLog) extends SegmentDeletionReason {
   override def logReason(toDelete: List[LogSegment]): Unit = {
     var size = log.size
     toDelete.foreach { segment =>
@@ -2103,7 +2112,7 @@ case class RetentionSizeBreach(log: Log) extends SegmentDeletionReason {
   }
 }
 
-case class StartOffsetBreach(log: Log) extends SegmentDeletionReason {
+case class StartOffsetBreach(log: UnifiedLog) extends SegmentDeletionReason {
   override def logReason(toDelete: List[LogSegment]): Unit = {
     log.info(s"Deleting segments due to log start offset ${log.logStartOffset} breach: ${toDelete.mkString(",")}")
   }
diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
index bbe0b10..c83aec6 100644
--- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
+++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
@@ -16,7 +16,7 @@
  */
 package kafka.raft
 
-import kafka.log.{AppendOrigin, Defaults, Log, LogConfig, LogOffsetSnapshot, SnapshotGenerated}
+import kafka.log.{AppendOrigin, Defaults, UnifiedLog, LogConfig, LogOffsetSnapshot, SnapshotGenerated}
 import kafka.server.KafkaConfig.{MetadataLogSegmentBytesProp, MetadataLogSegmentMinBytesProp}
 import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, KafkaConfig, LogDirFailureChannel, RequestLocal}
 import kafka.utils.{CoreUtils, Logging, Scheduler}
@@ -36,7 +36,7 @@ import scala.collection.mutable
 import scala.compat.java8.OptionConverters._
 
 final class KafkaMetadataLog private (
-  val log: Log,
+  val log: UnifiedLog,
   time: Time,
   scheduler: Scheduler,
   // Access to this object needs to be synchronized because it is used by the snapshotting thread to notify the
@@ -563,7 +563,7 @@ object KafkaMetadataLog {
       throw new InvalidConfigurationException(s"Cannot set $MetadataLogSegmentBytesProp below ${config.logSegmentMinBytes}")
     }
 
-    val log = Log(
+    val log = UnifiedLog(
       dir = dataDir,
       config = defaultLogConfig,
       logStartOffset = 0L,
@@ -602,7 +602,7 @@ object KafkaMetadataLog {
   }
 
   private def recoverSnapshots(
-    log: Log
+    log: UnifiedLog
   ): mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]] = {
     val snapshots = mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]]
     // Scan the log directory; deleting partial snapshots and older snapshot, only remembering immutable snapshots start
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala
index 806b62d..279e596 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -21,7 +21,7 @@ import java.nio.file.Files
 import java.util
 import java.util.OptionalInt
 import java.util.concurrent.CompletableFuture
-import kafka.log.Log
+import kafka.log.UnifiedLog
 import kafka.raft.KafkaRaftManager.RaftIoThread
 import kafka.server.{KafkaConfig, MetaProperties}
 import kafka.server.KafkaRaftServer.ControllerRole
@@ -210,7 +210,7 @@ class KafkaRaftManager[T](
   }
 
   private def createDataDir(): File = {
-    val logDirName = Log.logDirName(topicPartition)
+    val logDirName = UnifiedLog.logDirName(topicPartition)
     KafkaRaftManager.createLogDirectory(new File(config.metadataLogDir), logDirName)
   }
 
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index 4e7e853..cda545d 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -19,7 +19,7 @@ package kafka.server
 import java.io.File
 import java.util.concurrent.CompletableFuture
 import kafka.common.{InconsistentNodeIdException, KafkaException}
-import kafka.log.Log
+import kafka.log.UnifiedLog
 import kafka.metrics.{KafkaMetricsReporter, KafkaYammerMetrics}
 import kafka.raft.KafkaRaftManager
 import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole}
@@ -155,7 +155,7 @@ object KafkaRaftServer {
         s"loaded from ${config.metadataLogDir}")
     }
 
-    val metadataPartitionDirName = Log.logDirName(MetadataPartition)
+    val metadataPartitionDirName = UnifiedLog.logDirName(MetadataPartition)
     val onlineNonMetadataDirs = logDirs.diff(offlineDirs :+ config.metadataLogDir)
     onlineNonMetadataDirs.foreach { logDir =>
       val metadataDir = new File(logDir, metadataPartitionDirName)
diff --git a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
index 6423cfc..9400260 100644
--- a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
+++ b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
@@ -17,7 +17,7 @@
 
 package kafka.server
 
-import kafka.log.Log
+import kafka.log.UnifiedLog
 import org.apache.kafka.common.KafkaException
 
 object LogOffsetMetadata {
@@ -39,7 +39,7 @@ object LogOffsetMetadata {
  *  3. the physical position on the located segment
  */
 case class LogOffsetMetadata(messageOffset: Long,
-                             segmentBaseOffset: Long = Log.UnknownOffset,
+                             segmentBaseOffset: Long = UnifiedLog.UnknownOffset,
                              relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition) {
 
   // check if this offset is already on an older segment compared with the given offset
@@ -76,7 +76,7 @@ case class LogOffsetMetadata(messageOffset: Long,
 
   // decide if the offset metadata only contains message offset info
   def messageOffsetOnly: Boolean = {
-    segmentBaseOffset == Log.UnknownOffset && relativePositionInSegment == LogOffsetMetadata.UnknownFilePosition
+    segmentBaseOffset == UnifiedLog.UnknownOffset && relativePositionInSegment == LogOffsetMetadata.UnknownFilePosition
   }
 
   override def toString = s"(offset=$messageOffset segment=[$segmentBaseOffset:$relativePositionInSegment])"
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index be835a8..bfce8c4 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -295,7 +295,7 @@ class ReplicaManager(val config: KafkaConfig,
     replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
   }
 
-  def getLog(topicPartition: TopicPartition): Option[Log] = logManager.getLog(topicPartition)
+  def getLog(topicPartition: TopicPartition): Option[UnifiedLog] = logManager.getLog(topicPartition)
 
   def hasDelayedElectionOperations: Boolean = delayedElectLeaderPurgatory.numDelayed != 0
 
@@ -542,11 +542,11 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  def localLogOrException(topicPartition: TopicPartition): Log = {
+  def localLogOrException(topicPartition: TopicPartition): UnifiedLog = {
     getPartitionOrException(topicPartition).localLogOrException
   }
 
-  def futureLocalLogOrException(topicPartition: TopicPartition): Log = {
+  def futureLocalLogOrException(topicPartition: TopicPartition): UnifiedLog = {
     getPartitionOrException(topicPartition).futureLocalLogOrException
   }
 
@@ -554,7 +554,7 @@ class ReplicaManager(val config: KafkaConfig,
     getPartitionOrException(topicPartition).futureLog.isDefined
   }
 
-  def localLog(topicPartition: TopicPartition): Option[Log] = {
+  def localLog(topicPartition: TopicPartition): Option[UnifiedLog] = {
     onlinePartition(topicPartition).flatMap(_.log)
   }
 
@@ -715,7 +715,7 @@ class ReplicaManager(val config: KafkaConfig,
           /* If the topic name is exceptionally long, we can't support altering the log directory.
            * See KAFKA-4893 for details.
            * TODO: fix this by implementing topic IDs. */
-          if (Log.logFutureDirName(topicPartition).size > 255)
+          if (UnifiedLog.logFutureDirName(topicPartition).size > 255)
             throw new InvalidTopicException("The topic name is too long.")
           if (!logManager.isLogDirOnline(destinationDir))
             throw new KafkaStorageException(s"Log directory $destinationDir is offline")
@@ -1190,10 +1190,10 @@ class ReplicaManager(val config: KafkaConfig,
                  _: InconsistentTopicIdException) =>
           LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
             divergingEpoch = None,
-            highWatermark = Log.UnknownOffset,
-            leaderLogStartOffset = Log.UnknownOffset,
-            leaderLogEndOffset = Log.UnknownOffset,
-            followerLogStartOffset = Log.UnknownOffset,
+            highWatermark = UnifiedLog.UnknownOffset,
+            leaderLogStartOffset = UnifiedLog.UnknownOffset,
+            leaderLogEndOffset = UnifiedLog.UnknownOffset,
+            followerLogStartOffset = UnifiedLog.UnknownOffset,
             fetchTimeMs = -1L,
             lastStableOffset = None,
             exception = Some(e))
@@ -1207,10 +1207,10 @@ class ReplicaManager(val config: KafkaConfig,
 
           LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
             divergingEpoch = None,
-            highWatermark = Log.UnknownOffset,
-            leaderLogStartOffset = Log.UnknownOffset,
-            leaderLogEndOffset = Log.UnknownOffset,
-            followerLogStartOffset = Log.UnknownOffset,
+            highWatermark = UnifiedLog.UnknownOffset,
+            leaderLogStartOffset = UnifiedLog.UnknownOffset,
+            leaderLogEndOffset = UnifiedLog.UnknownOffset,
+            followerLogStartOffset = UnifiedLog.UnknownOffset,
             fetchTimeMs = -1L,
             lastStableOffset = None,
             exception = Some(e))
@@ -1760,7 +1760,7 @@ class ReplicaManager(val config: KafkaConfig,
    * diverging epoch is returned in the response, avoiding the need for a separate
    * OffsetForLeaderEpoch request.
    */
-  protected def initialFetchOffset(log: Log): Long = {
+  protected def initialFetchOffset(log: UnifiedLog): Long = {
     if (ApiVersion.isTruncationOnFetchSupported(config.interBrokerProtocolVersion) && log.latestEpoch.nonEmpty)
       log.logEndOffset
     else
@@ -1831,7 +1831,7 @@ class ReplicaManager(val config: KafkaConfig,
   // Flushes the highwatermark value for all partitions to the highwatermark file
   def checkpointHighWatermarks(): Unit = {
     def putHw(logDirToCheckpoints: mutable.AnyRefMap[String, mutable.AnyRefMap[TopicPartition, Long]],
-              log: Log): Unit = {
+              log: UnifiedLog): Unit = {
       val checkpoints = logDirToCheckpoints.getOrElseUpdate(log.parentDir,
         new mutable.AnyRefMap[TopicPartition, Long]())
       checkpoints.put(log.topicPartition, log.highWatermark)
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 939251b..3f5ad74 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -19,7 +19,7 @@ package kafka.server.metadata
 
 import kafka.coordinator.group.GroupCoordinator
 import kafka.coordinator.transaction.TransactionCoordinator
-import kafka.log.{Log, LogManager}
+import kafka.log.{UnifiedLog, LogManager}
 import kafka.server.ConfigType
 import kafka.server.{ConfigEntityName, ConfigHandler, FinalizedFeatureCache, KafkaConfig, ReplicaManager, RequestLocal}
 import kafka.utils.Logging
@@ -64,7 +64,7 @@ object BrokerMetadataPublisher extends Logging {
    */
   def findStrayPartitions(brokerId: Int,
                           newTopicsImage: TopicsImage,
-                          logs: Iterable[Log]): Iterable[TopicPartition] = {
+                          logs: Iterable[UnifiedLog]): Iterable[TopicPartition] = {
     logs.flatMap { log =>
       val topicId = log.topicId.getOrElse {
         throw new RuntimeException(s"The log dir $log does not have a topic ID, " +
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index e097206..b20c06a 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -57,16 +57,16 @@ object DumpLogSegments {
       val filename = file.getName
       val suffix = filename.substring(filename.lastIndexOf("."))
       suffix match {
-        case Log.LogFileSuffix =>
+        case UnifiedLog.LogFileSuffix =>
           dumpLog(file, opts.shouldPrintDataLog, nonConsecutivePairsForLogFilesMap, opts.isDeepIteration,
             opts.maxMessageSize, opts.messageParser, opts.skipRecordMetadata)
-        case Log.IndexFileSuffix =>
+        case UnifiedLog.IndexFileSuffix =>
           dumpIndex(file, opts.indexSanityOnly, opts.verifyOnly, misMatchesForIndexFilesMap, opts.maxMessageSize)
-        case Log.TimeIndexFileSuffix =>
+        case UnifiedLog.TimeIndexFileSuffix =>
           dumpTimeIndex(file, opts.indexSanityOnly, opts.verifyOnly, timeIndexDumpErrors, opts.maxMessageSize)
-        case Log.ProducerSnapshotFileSuffix =>
+        case UnifiedLog.ProducerSnapshotFileSuffix =>
           dumpProducerIdSnapshot(file)
-        case Log.TxnIndexFileSuffix =>
+        case UnifiedLog.TxnIndexFileSuffix =>
           dumpTxnIndex(file)
         case _ =>
           System.err.println(s"Ignoring unknown file $file")
@@ -91,7 +91,7 @@ object DumpLogSegments {
   }
 
   private def dumpTxnIndex(file: File): Unit = {
-    val index = new TransactionIndex(Log.offsetFromFile(file), file)
+    val index = new TransactionIndex(UnifiedLog.offsetFromFile(file), file)
     for (abortedTxn <- index.allAbortedTxns) {
       println(s"version: ${abortedTxn.version} producerId: ${abortedTxn.producerId} firstOffset: ${abortedTxn.firstOffset} " +
         s"lastOffset: ${abortedTxn.lastOffset} lastStableOffset: ${abortedTxn.lastStableOffset}")
@@ -123,7 +123,7 @@ object DumpLogSegments {
                                misMatchesForIndexFilesMap: mutable.Map[String, List[(Long, Long)]],
                                maxMessageSize: Int): Unit = {
     val startOffset = file.getName.split("\\.")(0).toLong
-    val logFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + Log.LogFileSuffix)
+    val logFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + UnifiedLog.LogFileSuffix)
     val fileRecords = FileRecords.open(logFile, false)
     val index = new OffsetIndex(file, baseOffset = startOffset, writable = false)
 
@@ -165,9 +165,9 @@ object DumpLogSegments {
                                    timeIndexDumpErrors: TimeIndexDumpErrors,
                                    maxMessageSize: Int): Unit = {
     val startOffset = file.getName.split("\\.")(0).toLong
-    val logFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + Log.LogFileSuffix)
+    val logFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + UnifiedLog.LogFileSuffix)
     val fileRecords = FileRecords.open(logFile, false)
-    val indexFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + Log.IndexFileSuffix)
+    val indexFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + UnifiedLog.IndexFileSuffix)
     val index = new OffsetIndex(indexFile, baseOffset = startOffset, writable = false)
     val timeIndex = new TimeIndex(file, baseOffset = startOffset, writable = false)
 
diff --git a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
index 333b56e..a6b59f0 100644
--- a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
@@ -13,7 +13,7 @@
 package kafka.api
 
 import kafka.integration.KafkaServerTestHarness
-import kafka.log.Log
+import kafka.log.UnifiedLog
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer.OffsetAndMetadata
@@ -45,7 +45,7 @@ class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness {
     ).asJava
     consumer.commitSync(offsetMap)
     val logManager = servers.head.getLogManager
-    def getGroupMetadataLogOpt: Option[Log] =
+    def getGroupMetadataLogOpt: Option[UnifiedLog] =
       logManager.getLog(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0))
 
     TestUtils.waitUntilTrue(() => getGroupMetadataLogOpt.exists(_.logSegments.exists(_.log.batches.asScala.nonEmpty)),
diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
index e02529b..526c499 100644
--- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
+++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
@@ -16,7 +16,7 @@
  */
 package kafka.raft
 
-import kafka.log.{Defaults, Log, SegmentDeletion}
+import kafka.log.{Defaults, UnifiedLog, SegmentDeletion}
 import kafka.server.KafkaConfig.{MetadataLogSegmentBytesProp, MetadataLogSegmentMillisProp, MetadataLogSegmentMinBytesProp, NodeIdProp, ProcessRolesProp, QuorumVotersProp}
 import kafka.server.{KafkaConfig, KafkaRaftServer}
 import kafka.utils.{MockTime, TestUtils}
@@ -905,7 +905,7 @@ object KafkaMetadataLogTest {
 
     val logDir = createLogDirectory(
       tempDir,
-      Log.logDirName(KafkaRaftServer.MetadataPartition)
+      UnifiedLog.logDirName(KafkaRaftServer.MetadataPartition)
     )
 
     val metadataLog = KafkaMetadataLog(
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
index e3c1e65..2422dcc 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -42,7 +42,7 @@ object StressTestLog {
     logProperties.put(LogConfig.MaxMessageBytesProp, Int.MaxValue: java.lang.Integer)
     logProperties.put(LogConfig.SegmentIndexBytesProp, 1024*1024: java.lang.Integer)
 
-    val log = Log(dir = dir,
+    val log = UnifiedLog(dir = dir,
       config = LogConfig(logProperties),
       logStartOffset = 0L,
       recoveryPoint = 0L,
@@ -118,7 +118,7 @@ object StressTestLog {
     }
   }
 
-  class WriterThread(val log: Log) extends WorkerThread with LogProgress {
+  class WriterThread(val log: UnifiedLog) extends WorkerThread with LogProgress {
     override def work(): Unit = {
       val logAppendInfo = log.appendAsLeader(TestUtils.singletonRecords(currentOffset.toString.getBytes), 0)
       require(logAppendInfo.firstOffset.forall(_.messageOffset == currentOffset) && logAppendInfo.lastOffset == currentOffset)
@@ -128,7 +128,7 @@ object StressTestLog {
     }
   }
 
-  class ReaderThread(val log: Log) extends WorkerThread with LogProgress {
+  class ReaderThread(val log: UnifiedLog) extends WorkerThread with LogProgress {
     override def work(): Unit = {
       try {
         log.read(currentOffset,
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
index cb2180a..f274954 100755
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -210,7 +210,7 @@ object TestLinearWriteSpeed {
 
   class LogWritable(val dir: File, config: LogConfig, scheduler: Scheduler, val messages: MemoryRecords) extends Writable {
     Utils.delete(dir)
-    val log = Log(dir, config, 0L, 0L, scheduler, new BrokerTopicStats, Time.SYSTEM, 60 * 60 * 1000,
+    val log = UnifiedLog(dir, config, 0L, 0L, scheduler, new BrokerTopicStats, Time.SYSTEM, 60 * 60 * 1000,
       LogManager.ProducerIdExpirationCheckIntervalMs, new LogDirFailureChannel(10), topicId = None, keepPartitionMetadataFile = true)
     def write(): Int = {
       log.appendAsLeader(messages, leaderEpoch = 0)
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 98ca81a..4ce7658 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.ExecutionException
 import java.util.{Collections, Optional, Properties}
 
 import scala.collection.Seq
-import kafka.log.Log
+import kafka.log.UnifiedLog
 import kafka.zk.{TopicPartitionZNode, ZooKeeperTestHarness}
 import kafka.utils.TestUtils
 import kafka.server.{KafkaConfig, KafkaServer}
@@ -389,7 +389,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     servers
   }
 
-  private def writeDups(numKeys: Int, numDups: Int, log: Log): Seq[(Int, Int)] = {
+  private def writeDups(numKeys: Int, numDups: Int, log: UnifiedLog): Seq[(Int, Int)] = {
     var counter = 0
     for (_ <- 0 until numDups; key <- 0 until numKeys) yield {
       val count = counter
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index a791141..83165ef 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -280,11 +280,11 @@ class PartitionLockTest extends Logging {
         }
       }
 
-      override def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): Log = {
+      override def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): UnifiedLog = {
         val log = super.createLog(isNew, isFutureReplica, offsetCheckpoints, None)
         val logDirFailureChannel = new LogDirFailureChannel(1)
         val segments = new LogSegments(log.topicPartition)
-        val leaderEpochCache = Log.maybeCreateLeaderEpochCache(log.dir, log.topicPartition, logDirFailureChannel, log.config.recordVersion, "")
+        val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(log.dir, log.topicPartition, logDirFailureChannel, log.config.recordVersion, "")
         val maxProducerIdExpirationMs = 60 * 60 * 1000
         val producerStateManager = new ProducerStateManager(log.topicPartition, log.dir, maxProducerIdExpirationMs)
         val offsets = LogLoader.load(LoadLogParams(
@@ -367,13 +367,13 @@ class PartitionLockTest extends Logging {
   }
 
   private class SlowLog(
-    log: Log,
+    log: UnifiedLog,
     logStartOffset: Long,
     localLog: LocalLog,
     leaderEpochCache: Option[LeaderEpochFileCache],
     producerStateManager: ProducerStateManager,
     appendSemaphore: Semaphore
-  ) extends Log(
+  ) extends UnifiedLog(
     logStartOffset,
     localLog,
     new BrokerTopicStats,
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index b671d34..65f4148 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -207,11 +207,11 @@ class PartitionTest extends AbstractPartitionTest {
       logManager,
       alterIsrManager) {
 
-      override def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): Log = {
+      override def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): UnifiedLog = {
         val log = super.createLog(isNew, isFutureReplica, offsetCheckpoints, None)
         val logDirFailureChannel = new LogDirFailureChannel(1)
         val segments = new LogSegments(log.topicPartition)
-        val leaderEpochCache = Log.maybeCreateLeaderEpochCache(log.dir, log.topicPartition, logDirFailureChannel, log.config.recordVersion, "")
+        val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(log.dir, log.topicPartition, logDirFailureChannel, log.config.recordVersion, "")
         val maxProducerIdExpirationMs = 60 * 60 * 1000
         val producerStateManager = new ProducerStateManager(log.topicPartition, log.dir, maxProducerIdExpirationMs)
         val offsets = LogLoader.load(LoadLogParams(
@@ -1042,7 +1042,7 @@ class PartitionTest extends AbstractPartitionTest {
     val remoteReplica = partition.getReplica(remoteBrokerId).get
     assertEquals(initializeTimeMs, remoteReplica.lastCaughtUpTimeMs)
     assertEquals(LogOffsetMetadata.UnknownOffsetMetadata.messageOffset, remoteReplica.logEndOffset)
-    assertEquals(Log.UnknownOffset, remoteReplica.logStartOffset)
+    assertEquals(UnifiedLog.UnknownOffset, remoteReplica.logStartOffset)
 
     time.sleep(500)
 
@@ -1096,7 +1096,7 @@ class PartitionTest extends AbstractPartitionTest {
 
     val remoteReplica = partition.getReplica(remoteBrokerId).get
     assertEquals(LogOffsetMetadata.UnknownOffsetMetadata.messageOffset, remoteReplica.logEndOffset)
-    assertEquals(Log.UnknownOffset, remoteReplica.logStartOffset)
+    assertEquals(UnifiedLog.UnknownOffset, remoteReplica.logStartOffset)
 
     partition.updateFollowerFetchState(remoteBrokerId,
       followerFetchOffsetMetadata = LogOffsetMetadata(3),
@@ -1157,7 +1157,7 @@ class PartitionTest extends AbstractPartitionTest {
 
     val remoteReplica = partition.getReplica(remoteBrokerId).get
     assertEquals(LogOffsetMetadata.UnknownOffsetMetadata.messageOffset, remoteReplica.logEndOffset)
-    assertEquals(Log.UnknownOffset, remoteReplica.logStartOffset)
+    assertEquals(UnifiedLog.UnknownOffset, remoteReplica.logStartOffset)
 
     partition.updateFollowerFetchState(remoteBrokerId,
       followerFetchOffsetMetadata = LogOffsetMetadata(10),
@@ -1214,7 +1214,7 @@ class PartitionTest extends AbstractPartitionTest {
     val remoteReplica = partition.getReplica(remoteBrokerId).get
     assertEquals(initializeTimeMs, remoteReplica.lastCaughtUpTimeMs)
     assertEquals(LogOffsetMetadata.UnknownOffsetMetadata.messageOffset, remoteReplica.logEndOffset)
-    assertEquals(Log.UnknownOffset, remoteReplica.logStartOffset)
+    assertEquals(UnifiedLog.UnknownOffset, remoteReplica.logStartOffset)
 
     // On initialization, the replica is considered caught up and should not be removed
     partition.maybeShrinkIsr()
@@ -1261,7 +1261,7 @@ class PartitionTest extends AbstractPartitionTest {
     val remoteReplica = partition.getReplica(remoteBrokerId).get
     assertEquals(initializeTimeMs, remoteReplica.lastCaughtUpTimeMs)
     assertEquals(LogOffsetMetadata.UnknownOffsetMetadata.messageOffset, remoteReplica.logEndOffset)
-    assertEquals(Log.UnknownOffset, remoteReplica.logStartOffset)
+    assertEquals(UnifiedLog.UnknownOffset, remoteReplica.logStartOffset)
 
     // Shrink the ISR
     time.sleep(partition.replicaLagTimeMaxMs + 1)
@@ -1321,7 +1321,7 @@ class PartitionTest extends AbstractPartitionTest {
     val remoteReplica = partition.getReplica(remoteBrokerId).get
     assertEquals(initializeTimeMs, remoteReplica.lastCaughtUpTimeMs)
     assertEquals(LogOffsetMetadata.UnknownOffsetMetadata.messageOffset, remoteReplica.logEndOffset)
-    assertEquals(Log.UnknownOffset, remoteReplica.logStartOffset)
+    assertEquals(UnifiedLog.UnknownOffset, remoteReplica.logStartOffset)
 
     // There is a short delay before the first fetch. The follower is not yet caught up to the log end.
     time.sleep(5000)
@@ -1386,7 +1386,7 @@ class PartitionTest extends AbstractPartitionTest {
     val remoteReplica = partition.getReplica(remoteBrokerId).get
     assertEquals(initializeTimeMs, remoteReplica.lastCaughtUpTimeMs)
     assertEquals(LogOffsetMetadata.UnknownOffsetMetadata.messageOffset, remoteReplica.logEndOffset)
-    assertEquals(Log.UnknownOffset, remoteReplica.logStartOffset)
+    assertEquals(UnifiedLog.UnknownOffset, remoteReplica.logStartOffset)
 
     // The follower catches up to the log end immediately.
     partition.updateFollowerFetchState(remoteBrokerId,
@@ -1437,7 +1437,7 @@ class PartitionTest extends AbstractPartitionTest {
     val remoteReplica = partition.getReplica(remoteBrokerId).get
     assertEquals(initializeTimeMs, remoteReplica.lastCaughtUpTimeMs)
     assertEquals(LogOffsetMetadata.UnknownOffsetMetadata.messageOffset, remoteReplica.logEndOffset)
-    assertEquals(Log.UnknownOffset, remoteReplica.logStartOffset)
+    assertEquals(UnifiedLog.UnknownOffset, remoteReplica.logStartOffset)
 
     time.sleep(30001)
 
@@ -1513,7 +1513,7 @@ class PartitionTest extends AbstractPartitionTest {
 
     val remoteReplica = partition.getReplica(remoteBrokerId).get
     assertEquals(LogOffsetMetadata.UnknownOffsetMetadata.messageOffset, remoteReplica.logEndOffset)
-    assertEquals(Log.UnknownOffset, remoteReplica.logStartOffset)
+    assertEquals(UnifiedLog.UnknownOffset, remoteReplica.logStartOffset)
 
     // This will attempt to expand the ISR
     partition.updateFollowerFetchState(remoteBrokerId,
@@ -1960,7 +1960,7 @@ class PartitionTest extends AbstractPartitionTest {
     verify(spyConfigRepository, times(2)).topicConfig(topicPartition.topic())
   }
 
-  private def seedLogData(log: Log, numRecords: Int, leaderEpoch: Int): Unit = {
+  private def seedLogData(log: UnifiedLog, numRecords: Int, leaderEpoch: Int): Unit = {
     for (i <- 0 until numRecords) {
       val records = MemoryRecords.withRecords(0L, CompressionType.NONE, leaderEpoch,
         new SimpleRecord(s"k$i".getBytes, s"v$i".getBytes))
@@ -1969,13 +1969,13 @@ class PartitionTest extends AbstractPartitionTest {
   }
 
   private class SlowLog(
-    log: Log,
+    log: UnifiedLog,
     logStartOffset: Long,
     localLog: LocalLog,
     leaderEpochCache: Option[LeaderEpochFileCache],
     producerStateManager: ProducerStateManager,
     appendSemaphore: Semaphore
-  ) extends Log(
+  ) extends UnifiedLog(
     logStartOffset,
     localLog,
     new BrokerTopicStats,
diff --git a/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala b/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala
index 9ec9071..08d0950 100644
--- a/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala
@@ -18,7 +18,7 @@ package kafka.cluster
 
 import java.util.Properties
 
-import kafka.log.{ClientRecordDeletion, Log, LogConfig, LogManager}
+import kafka.log.{ClientRecordDeletion, UnifiedLog, LogConfig, LogManager}
 import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
 import kafka.utils.{MockTime, TestUtils}
 import org.apache.kafka.common.errors.OffsetOutOfRangeException
@@ -32,7 +32,7 @@ class ReplicaTest {
   val logDir = TestUtils.randomPartitionLogDir(tmpDir)
   val time = new MockTime()
   val brokerTopicStats = new BrokerTopicStats
-  var log: Log = _
+  var log: UnifiedLog = _
 
   @BeforeEach
   def setup(): Unit = {
@@ -41,7 +41,7 @@ class ReplicaTest {
     logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
     logProps.put(LogConfig.RetentionMsProp, 999: java.lang.Integer)
     val config = LogConfig(logProps)
-    log = Log(logDir,
+    log = UnifiedLog(logDir,
       config,
       logStartOffset = 0L,
       recoveryPoint = 0L,
diff --git a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
index fac34a1..ddd3c18 100644
--- a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger
 import java.util.concurrent.locks.Lock
 
 import kafka.coordinator.AbstractCoordinatorConcurrencyTest._
-import kafka.log.{AppendOrigin, Log, LogConfig}
+import kafka.log.{AppendOrigin, UnifiedLog, LogConfig}
 import kafka.server._
 import kafka.utils._
 import kafka.utils.timer.MockTimer
@@ -160,7 +160,7 @@ object AbstractCoordinatorConcurrencyTest {
   class TestReplicaManager extends ReplicaManager(
     null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, None, null) {
 
-    @volatile var logs: mutable.Map[TopicPartition, (Log, Long)] = _
+    @volatile var logs: mutable.Map[TopicPartition, (UnifiedLog, Long)] = _
     var producePurgatory: DelayedOperationPurgatory[DelayedProduce] = _
     var watchKeys: mutable.Set[TopicPartitionOperationKey] = _
 
@@ -212,13 +212,13 @@ object AbstractCoordinatorConcurrencyTest {
       Some(RecordBatch.MAGIC_VALUE_V2)
     }
 
-    def getOrCreateLogs(): mutable.Map[TopicPartition, (Log, Long)] = {
+    def getOrCreateLogs(): mutable.Map[TopicPartition, (UnifiedLog, Long)] = {
       if (logs == null)
-        logs = mutable.Map[TopicPartition, (Log, Long)]()
+        logs = mutable.Map[TopicPartition, (UnifiedLog, Long)]()
       logs
     }
 
-    def updateLog(topicPartition: TopicPartition, log: Log, endOffset: Long): Unit = {
+    def updateLog(topicPartition: TopicPartition, log: UnifiedLog, endOffset: Long): Unit = {
       getOrCreateLogs().put(topicPartition, (log, endOffset))
     }
 
@@ -226,7 +226,7 @@ object AbstractCoordinatorConcurrencyTest {
       getOrCreateLogs().get(topicPartition).map(_._1.config)
     }
 
-    override def getLog(topicPartition: TopicPartition): Option[Log] =
+    override def getLog(topicPartition: TopicPartition): Option[UnifiedLog] =
       getOrCreateLogs().get(topicPartition).map(l => l._1)
 
     override def getLogEndOffset(topicPartition: TopicPartition): Option[Long] =
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index bf475cc..5fe4bf9 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -27,7 +27,7 @@ import javax.management.ObjectName
 import kafka.api._
 import kafka.cluster.Partition
 import kafka.common.OffsetAndMetadata
-import kafka.log.{AppendOrigin, Log, LogAppendInfo}
+import kafka.log.{AppendOrigin, UnifiedLog, LogAppendInfo}
 import kafka.metrics.KafkaYammerMetrics
 import kafka.server.{FetchDataInfo, FetchLogEnd, HostedPartition, KafkaConfig, LogOffsetMetadata, ReplicaManager, RequestLocal}
 import kafka.utils.{KafkaScheduler, MockTime, TestUtils}
@@ -823,7 +823,7 @@ class GroupMetadataManagerTest {
     val endOffset = 10L
     val groupEpoch = 2
 
-    val logMock: Log = EasyMock.mock(classOf[Log])
+    val logMock: UnifiedLog = EasyMock.mock(classOf[UnifiedLog])
     EasyMock.expect(replicaManager.getLog(groupTopicPartition)).andStubReturn(Some(logMock))
     expectGroupMetadataLoad(logMock, startOffset, MemoryRecords.EMPTY)
     EasyMock.expect(replicaManager.getLogEndOffset(groupTopicPartition)).andStubReturn(Some(endOffset))
@@ -891,7 +891,7 @@ class GroupMetadataManagerTest {
     val tp2 = new TopicPartition("bar", 0)
     val tp3 = new TopicPartition("xxx", 0)
 
-    val logMock: Log = EasyMock.mock(classOf[Log])
+    val logMock: UnifiedLog = EasyMock.mock(classOf[UnifiedLog])
     EasyMock.expect(replicaManager.getLog(groupTopicPartition)).andStubReturn(Some(logMock))
 
     val segment1MemberId = "a"
@@ -2366,7 +2366,7 @@ class GroupMetadataManagerTest {
     EasyMock.expect(mockRecords.sizeInBytes()).andReturn(DefaultRecordBatch.RECORD_BATCH_OVERHEAD + records.sizeInBytes()).anyTimes()
     EasyMock.replay(mockRecords)
 
-    val logMock: Log = EasyMock.mock(classOf[Log])
+    val logMock: UnifiedLog = EasyMock.mock(classOf[UnifiedLog])
     EasyMock.expect(logMock.logStartOffset).andReturn(startOffset).anyTimes()
     EasyMock.expect(logMock.read(EasyMock.eq(startOffset),
       maxLength = EasyMock.anyInt(),
@@ -2512,7 +2512,7 @@ class GroupMetadataManagerTest {
   private def expectGroupMetadataLoad(groupMetadataTopicPartition: TopicPartition,
                                       startOffset: Long,
                                       records: MemoryRecords): Unit = {
-    val logMock: Log =  EasyMock.mock(classOf[Log])
+    val logMock: UnifiedLog =  EasyMock.mock(classOf[UnifiedLog])
     EasyMock.expect(replicaManager.getLog(groupMetadataTopicPartition)).andStubReturn(Some(logMock))
     val endOffset = expectGroupMetadataLoad(logMock, startOffset, records)
     EasyMock.expect(replicaManager.getLogEndOffset(groupMetadataTopicPartition)).andStubReturn(Some(endOffset))
@@ -2524,7 +2524,7 @@ class GroupMetadataManagerTest {
    *
    * @return the calculated end offset to be mocked into [[ReplicaManager.getLogEndOffset]]
    */
-  private def expectGroupMetadataLoad(logMock: Log,
+  private def expectGroupMetadataLoad(logMock: UnifiedLog,
                                       startOffset: Long,
                                       records: MemoryRecords): Long = {
     val endOffset = startOffset + records.records.asScala.size
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
index 3727a2a..85778d5 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 import kafka.coordinator.AbstractCoordinatorConcurrencyTest
 import kafka.coordinator.AbstractCoordinatorConcurrencyTest._
 import kafka.coordinator.transaction.TransactionCoordinatorConcurrencyTest._
-import kafka.log.{Log, LogConfig}
+import kafka.log.{UnifiedLog, LogConfig}
 import kafka.server.{FetchDataInfo, FetchLogEnd, KafkaConfig, LogOffsetMetadata, MetadataCache, RequestLocal}
 import kafka.utils.{Pool, TestUtils}
 import org.apache.kafka.clients.{ClientResponse, NetworkClient}
@@ -458,7 +458,7 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
   }
 
   private def prepareTxnLog(partitionId: Int): Unit = {
-    val logMock: Log =  EasyMock.mock(classOf[Log])
+    val logMock: UnifiedLog =  EasyMock.mock(classOf[UnifiedLog])
     EasyMock.expect(logMock.config).andStubReturn(new LogConfig(Collections.emptyMap()))
 
     val fileRecordsMock: FileRecords = EasyMock.mock(classOf[FileRecords])
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 21629bd..32e41cd 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.CountDownLatch
 import java.util.concurrent.locks.ReentrantLock
 
 import javax.management.ObjectName
-import kafka.log.{AppendOrigin, Defaults, Log, LogConfig}
+import kafka.log.{AppendOrigin, Defaults, UnifiedLog, LogConfig}
 import kafka.server.{FetchDataInfo, FetchLogEnd, LogOffsetMetadata, ReplicaManager, RequestLocal}
 import kafka.utils.{MockScheduler, Pool, TestUtils}
 import kafka.zk.KafkaZkClient
@@ -150,7 +150,7 @@ class TransactionStateManagerTest {
     val endOffset = 1L
 
     val fileRecordsMock = EasyMock.mock[FileRecords](classOf[FileRecords])
-    val logMock = EasyMock.mock[Log](classOf[Log])
+    val logMock = EasyMock.mock[UnifiedLog](classOf[UnifiedLog])
     EasyMock.expect(replicaManager.getLog(topicPartition)).andStubReturn(Some(logMock))
     EasyMock.expect(logMock.logStartOffset).andStubReturn(startOffset)
     EasyMock.expect(logMock.read(EasyMock.eq(startOffset),
@@ -813,7 +813,7 @@ class TransactionStateManagerTest {
     val startOffset = 0L
     val endOffset = 10L
 
-    val logMock: Log = EasyMock.mock(classOf[Log])
+    val logMock: UnifiedLog = EasyMock.mock(classOf[UnifiedLog])
     EasyMock.expect(replicaManager.getLog(topicPartition)).andStubReturn(Some(logMock))
     EasyMock.expect(logMock.logStartOffset).andStubReturn(startOffset)
     EasyMock.expect(logMock.read(EasyMock.eq(startOffset),
@@ -986,7 +986,7 @@ class TransactionStateManagerTest {
                             records: MemoryRecords): Unit = {
     EasyMock.reset(replicaManager)
 
-    val logMock: Log = EasyMock.mock(classOf[Log])
+    val logMock: UnifiedLog = EasyMock.mock(classOf[UnifiedLog])
     val fileRecordsMock: FileRecords = EasyMock.mock(classOf[FileRecords])
 
     val endOffset = startOffset + records.records.asScala.size
diff --git a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
index 99c85b6..2fc7942 100644
--- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
@@ -37,7 +37,7 @@ abstract class AbstractLogCleanerIntegrationTest {
   var cleaner: LogCleaner = _
   val logDir = TestUtils.tempDir()
 
-  private val logs = ListBuffer.empty[Log]
+  private val logs = ListBuffer.empty[UnifiedLog]
   private val defaultMaxMessageSize = 128
   private val defaultMinCleanableDirtyRatio = 0.0F
   private val defaultMinCompactionLagMS = 0L
@@ -89,7 +89,7 @@ abstract class AbstractLogCleanerIntegrationTest {
                   cleanerIoBufferSize: Option[Int] = None,
                   propertyOverrides: Properties = new Properties()): LogCleaner = {
 
-    val logMap = new Pool[TopicPartition, Log]()
+    val logMap = new Pool[TopicPartition, UnifiedLog]()
     for (partition <- partitions) {
       val dir = new File(logDir, s"${partition.topic}-${partition.partition}")
       Files.createDirectories(dir.toPath)
@@ -101,7 +101,7 @@ abstract class AbstractLogCleanerIntegrationTest {
         deleteDelay = deleteDelay,
         segmentSize = segmentSize,
         maxCompactionLagMs = maxCompactionLagMs))
-      val log = Log(dir,
+      val log = UnifiedLog(dir,
         logConfig,
         logStartOffset = 0L,
         recoveryPoint = 0L,
@@ -133,8 +133,8 @@ abstract class AbstractLogCleanerIntegrationTest {
   def counter: Int = ctr
   def incCounter(): Unit = ctr += 1
 
-  def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionType,
-                        startKey: Int = 0, magicValue: Byte = RecordBatch.CURRENT_MAGIC_VALUE): Seq[(Int, String, Long)] = {
+  def writeDups(numKeys: Int, numDups: Int, log: UnifiedLog, codec: CompressionType,
+                startKey: Int = 0, magicValue: Byte = RecordBatch.CURRENT_MAGIC_VALUE): Seq[(Int, String, Long)] = {
     for(_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield {
       val value = counter.toString
       val appendInfo = log.appendAsLeader(TestUtils.singletonRecords(value = value.toString.getBytes, codec = codec,
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
index af6265c..f308b54 100755
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
@@ -52,7 +52,7 @@ class BrokerCompressionTest {
     val logProps = new Properties()
     logProps.put(LogConfig.CompressionTypeProp, brokerCompression)
     /*configure broker-side compression  */
-    val log = Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
+    val log = UnifiedLog(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
       time = time, brokerTopicStats = new BrokerTopicStats, maxProducerIdExpirationMs = 60 * 60 * 1000,
       producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
       logDirFailureChannel = new LogDirFailureChannel(10), topicId = None, keepPartitionMetadataFile = true)
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index b969740..16b219c 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -178,7 +178,7 @@ class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest with K
       s"log cleaner should have processed at least to offset $secondBlockCleanableSegmentOffset, but lastCleaned=$lastCleaned2")
   }
 
-  private def readFromLog(log: Log): Iterable[(Int, Int)] = {
+  private def readFromLog(log: UnifiedLog): Iterable[(Int, Int)] = {
     for (segment <- log.logSegments; record <- segment.log.records.asScala) yield {
       val key = TestUtils.readString(record.key).toInt
       val value = TestUtils.readString(record.value).toInt
@@ -186,7 +186,7 @@ class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest with K
     }
   }
 
-  private def writeKeyDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionType, timestamp: Long, startValue: Int, step: Int): Seq[(Int, Int)] = {
+  private def writeKeyDups(numKeys: Int, numDups: Int, log: UnifiedLog, codec: CompressionType, timestamp: Long, startValue: Int, step: Int): Seq[(Int, Int)] = {
     var valCounter = startValue
     for (_ <- 0 until numDups; key <- 0 until numKeys) yield {
       val curValue = valCounter
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
index 6f82e22..c077542 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
@@ -93,7 +93,7 @@ class LogCleanerLagIntegrationTest extends AbstractLogCleanerIntegrationTest wit
     assertTrue(sizeUpToActiveSegmentAtT0 > compactedSize, s"log should have been compacted: size up to offset of active segment at T0=$sizeUpToActiveSegmentAtT0 compacted size=$compactedSize")
   }
 
-  private def readFromLog(log: Log): Iterable[(Int, Int)] = {
+  private def readFromLog(log: UnifiedLog): Iterable[(Int, Int)] = {
     for (segment <- log.logSegments; record <- segment.log.records.asScala) yield {
       val key = TestUtils.readString(record.key).toInt
       val value = TestUtils.readString(record.value).toInt
@@ -101,7 +101,7 @@ class LogCleanerLagIntegrationTest extends AbstractLogCleanerIntegrationTest wit
     }
   }
 
-  private def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionType, timestamp: Long): Seq[(Int, Int)] = {
+  private def writeDups(numKeys: Int, numDups: Int, log: UnifiedLog, codec: CompressionType, timestamp: Long): Seq[(Int, Int)] = {
     for (_ <- 0 until numDups; key <- 0 until numKeys) yield {
       val count = counter
       log.appendAsLeader(TestUtils.singletonRecords(value = counter.toString.getBytes, codec = codec,
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 83c8c50..93d9713 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -53,7 +53,7 @@ class LogCleanerManagerTest extends Logging {
   val cleanerCheckpoints: mutable.Map[TopicPartition, Long] = mutable.Map[TopicPartition, Long]()
 
   class LogCleanerManagerMock(logDirs: Seq[File],
-                              logs: Pool[TopicPartition, Log],
+                              logs: Pool[TopicPartition, UnifiedLog],
                               logDirFailureChannel: LogDirFailureChannel) extends LogCleanerManager(logDirs, logs, logDirFailureChannel) {
     override def allCleanerCheckpoints: Map[TopicPartition, Long] = {
       cleanerCheckpoints.toMap
@@ -75,8 +75,8 @@ class LogCleanerManagerTest extends Logging {
 
   private def setupIncreasinglyFilthyLogs(partitions: Seq[TopicPartition],
                                           startNumBatches: Int,
-                                          batchIncrement: Int): Pool[TopicPartition, Log] = {
-    val logs = new Pool[TopicPartition, Log]()
+                                          batchIncrement: Int): Pool[TopicPartition, UnifiedLog] = {
+    val logs = new Pool[TopicPartition, UnifiedLog]()
     var numBatches = startNumBatches
 
     for (tp <- partitions) {
@@ -100,7 +100,7 @@ class LogCleanerManagerTest extends Logging {
     val config = createLowRetentionLogConfig(logSegmentSize, LogConfig.Compact)
     val maxProducerIdExpirationMs = 60 * 60 * 1000
     val segments = new LogSegments(tp)
-    val leaderEpochCache = Log.maybeCreateLeaderEpochCache(tpDir, topicPartition, logDirFailureChannel, config.recordVersion, "")
+    val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(tpDir, topicPartition, logDirFailureChannel, config.recordVersion, "")
     val producerStateManager = new ProducerStateManager(topicPartition, tpDir, maxProducerIdExpirationMs, time)
     val offsets = LogLoader.load(LoadLogParams(
       tpDir,
@@ -119,7 +119,7 @@ class LogCleanerManagerTest extends Logging {
     val localLog = new LocalLog(tpDir, config, segments, offsets.recoveryPoint,
       offsets.nextOffsetMetadata, time.scheduler, time, tp, logDirFailureChannel)
     // the exception should be caught and the partition that caused it marked as uncleanable
-    class LogMock extends Log(offsets.logStartOffset, localLog, new BrokerTopicStats,
+    class LogMock extends UnifiedLog(offsets.logStartOffset, localLog, new BrokerTopicStats,
         LogManager.ProducerIdExpirationCheckIntervalMs, leaderEpochCache,
         producerStateManager, _topicId = None, keepPartitionMetadataFile = true) {
       // Throw an error in getFirstBatchTimestampForSegments since it is called in grabFilthiestLog()
@@ -127,14 +127,14 @@ class LogCleanerManagerTest extends Logging {
         throw new IllegalStateException("Error!")
     }
 
-    val log: Log = new LogMock()
+    val log: UnifiedLog = new LogMock()
     writeRecords(log = log,
       numBatches = logSegmentsCount * 2,
       recordsPerBatch = 10,
       batchesPerSegment = 2
     )
 
-    val logsPool = new Pool[TopicPartition, Log]()
+    val logsPool = new Pool[TopicPartition, UnifiedLog]()
     logsPool.put(tp, log)
     val cleanerManager = createCleanerManagerMock(logsPool)
     cleanerCheckpoints.put(tp, 1)
@@ -249,7 +249,7 @@ class LogCleanerManagerTest extends Logging {
     val tp = new TopicPartition("foo", 0)
     val log = createLog(segmentSize = 2048, LogConfig.Compact, tp)
 
-    val logs = new Pool[TopicPartition, Log]()
+    val logs = new Pool[TopicPartition, UnifiedLog]()
     logs.put(tp, log)
 
     appendRecords(log, numRecords = 3)
@@ -276,7 +276,7 @@ class LogCleanerManagerTest extends Logging {
 
     val tp = new TopicPartition("foo", 0)
 
-    val logs = new Pool[TopicPartition, Log]()
+    val logs = new Pool[TopicPartition, UnifiedLog]()
     val log = createLog(2048, LogConfig.Compact, topicPartition = tp)
     logs.put(tp, log)
 
@@ -302,7 +302,7 @@ class LogCleanerManagerTest extends Logging {
   @Test
   def testLogsWithSegmentsToDeleteShouldNotConsiderCleanupPolicyDeleteLogs(): Unit = {
     val records = TestUtils.singletonRecords("test".getBytes)
-    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Delete)
+    val log: UnifiedLog = createLog(records.sizeInBytes * 5, LogConfig.Delete)
     val cleanerManager: LogCleanerManager = createCleanerManager(log)
 
     val readyToDelete = cleanerManager.deletableLogs().size
@@ -315,7 +315,7 @@ class LogCleanerManagerTest extends Logging {
   @Test
   def testLogsWithSegmentsToDeleteShouldConsiderCleanupPolicyCompactDeleteLogs(): Unit = {
     val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
-    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact + "," + LogConfig.Delete)
+    val log: UnifiedLog = createLog(records.sizeInBytes * 5, LogConfig.Compact + "," + LogConfig.Delete)
     val cleanerManager: LogCleanerManager = createCleanerManager(log)
 
     val readyToDelete = cleanerManager.deletableLogs().size
@@ -329,7 +329,7 @@ class LogCleanerManagerTest extends Logging {
   @Test
   def testLogsWithSegmentsToDeleteShouldConsiderCleanupPolicyCompactLogs(): Unit = {
     val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
-    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val log: UnifiedLog = createLog(records.sizeInBytes * 5, LogConfig.Compact)
     val cleanerManager: LogCleanerManager = createCleanerManager(log)
 
     val readyToDelete = cleanerManager.deletableLogs().size
@@ -342,7 +342,7 @@ class LogCleanerManagerTest extends Logging {
   @Test
   def testLogsUnderCleanupIneligibleForCompaction(): Unit = {
     val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
-    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Delete)
+    val log: UnifiedLog = createLog(records.sizeInBytes * 5, LogConfig.Delete)
     val cleanerManager: LogCleanerManager = createCleanerManager(log)
 
     log.appendAsLeader(records, leaderEpoch = 0)
@@ -390,7 +390,7 @@ class LogCleanerManagerTest extends Logging {
   @Test
   def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
     val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
-    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val log: UnifiedLog = createLog(records.sizeInBytes * 5, LogConfig.Compact)
     val cleanerManager: LogCleanerManager = createCleanerManager(log)
 
     // expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints
@@ -404,7 +404,7 @@ class LogCleanerManagerTest extends Logging {
   @Test
   def testUpdateCheckpointsShouldRemovePartitionData(): Unit = {
     val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
-    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val log: UnifiedLog = createLog(records.sizeInBytes * 5, LogConfig.Compact)
     val cleanerManager: LogCleanerManager = createCleanerManager(log)
 
     // write some data into the cleaner-offset-checkpoint file
@@ -419,7 +419,7 @@ class LogCleanerManagerTest extends Logging {
   @Test
   def testHandleLogDirFailureShouldRemoveDirAndData(): Unit = {
     val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
-    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val log: UnifiedLog = createLog(records.sizeInBytes * 5, LogConfig.Compact)
     val cleanerManager: LogCleanerManager = createCleanerManager(log)
 
     // write some data into the cleaner-offset-checkpoint file in logDir and logDir2
@@ -437,7 +437,7 @@ class LogCleanerManagerTest extends Logging {
   @Test
   def testMaybeTruncateCheckpointShouldTruncateData(): Unit = {
     val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
-    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val log: UnifiedLog = createLog(records.sizeInBytes * 5, LogConfig.Compact)
     val cleanerManager: LogCleanerManager = createCleanerManager(log)
     val lowerOffset = 1L
     val higherOffset = 1000L
@@ -457,7 +457,7 @@ class LogCleanerManagerTest extends Logging {
   @Test
   def testAlterCheckpointDirShouldRemoveDataInSrcDirAndAddInNewDir(): Unit = {
     val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
-    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val log: UnifiedLog = createLog(records.sizeInBytes * 5, LogConfig.Compact)
     val cleanerManager: LogCleanerManager = createCleanerManager(log)
 
     // write some data into the cleaner-offset-checkpoint file in logDir
@@ -480,7 +480,7 @@ class LogCleanerManagerTest extends Logging {
   @Test
   def testConcurrentLogCleanupAndLogTruncation(): Unit = {
     val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
-    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Delete)
+    val log: UnifiedLog = createLog(records.sizeInBytes * 5, LogConfig.Delete)
     val cleanerManager: LogCleanerManager = createCleanerManager(log)
 
     // log cleanup starts
@@ -500,7 +500,7 @@ class LogCleanerManagerTest extends Logging {
   @Test
   def testConcurrentLogCleanupAndTopicDeletion(): Unit = {
     val records = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes)
-    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Delete)
+    val log: UnifiedLog = createLog(records.sizeInBytes * 5, LogConfig.Delete)
     val cleanerManager: LogCleanerManager = createCleanerManager(log)
 
     // log cleanup starts
@@ -520,7 +520,7 @@ class LogCleanerManagerTest extends Logging {
   @Test
   def testLogsWithSegmentsToDeleteShouldNotConsiderUncleanablePartitions(): Unit = {
     val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
-    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val log: UnifiedLog = createLog(records.sizeInBytes * 5, LogConfig.Compact)
     val cleanerManager: LogCleanerManager = createCleanerManager(log)
     cleanerManager.markPartitionUncleanable(log.dir.getParent, topicPartition)
 
@@ -695,7 +695,7 @@ class LogCleanerManagerTest extends Logging {
   @Test
   def testDoneDeleting(): Unit = {
     val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
-    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact + "," + LogConfig.Delete)
+    val log: UnifiedLog = createLog(records.sizeInBytes * 5, LogConfig.Compact + "," + LogConfig.Delete)
     val cleanerManager: LogCleanerManager = createCleanerManager(log)
     val tp = new TopicPartition("log", 0)
 
@@ -752,23 +752,23 @@ class LogCleanerManagerTest extends Logging {
     assertEquals(15L, cleanerCheckpoints.get(tp0).get, "Unselected log should have checkpoint offset updated")
   }
 
-  private def createCleanerManager(log: Log): LogCleanerManager = {
-    val logs = new Pool[TopicPartition, Log]()
+  private def createCleanerManager(log: UnifiedLog): LogCleanerManager = {
+    val logs = new Pool[TopicPartition, UnifiedLog]()
     logs.put(topicPartition, log)
     new LogCleanerManager(Seq(logDir, logDir2), logs, null)
   }
 
-  private def createCleanerManagerMock(pool: Pool[TopicPartition, Log]): LogCleanerManagerMock = {
+  private def createCleanerManagerMock(pool: Pool[TopicPartition, UnifiedLog]): LogCleanerManagerMock = {
     new LogCleanerManagerMock(Seq(logDir), pool, null)
   }
 
   private def createLog(segmentSize: Int,
                         cleanupPolicy: String,
-                        topicPartition: TopicPartition = new TopicPartition("log", 0)): Log = {
+                        topicPartition: TopicPartition = new TopicPartition("log", 0)): UnifiedLog = {
     val config = createLowRetentionLogConfig(segmentSize, cleanupPolicy)
-    val partitionDir = new File(logDir, Log.logDirName(topicPartition))
+    val partitionDir = new File(logDir, UnifiedLog.logDirName(topicPartition))
 
-    Log(partitionDir,
+    UnifiedLog(partitionDir,
       config,
       logStartOffset = 0L,
       recoveryPoint = 0L,
@@ -792,7 +792,7 @@ class LogCleanerManagerTest extends Logging {
     LogConfig(logProps)
   }
 
-  private def writeRecords(log: Log,
+  private def writeRecords(log: UnifiedLog,
                            numBatches: Int,
                            recordsPerBatch: Int,
                            batchesPerSegment: Int): Unit = {
@@ -804,7 +804,7 @@ class LogCleanerManagerTest extends Logging {
     log.roll()
   }
 
-  private def appendRecords(log: Log, numRecords: Int): Unit = {
+  private def appendRecords(log: UnifiedLog, numRecords: Int): Unit = {
     val startOffset = log.logEndOffset
     val endOffset = startOffset + numRecords
     var lastTimestamp = 0L
@@ -820,7 +820,7 @@ class LogCleanerManagerTest extends Logging {
   }
 
   private def makeLog(dir: File = logDir, config: LogConfig) =
-    Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
+    UnifiedLog(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
       time = time, brokerTopicStats = new BrokerTopicStats, maxProducerIdExpirationMs = 60 * 60 * 1000,
       producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
       logDirFailureChannel = new LogDirFailureChannel(10), topicId = None, keepPartitionMetadataFile = true)
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
index 7d33b09..1471ff1 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
@@ -93,7 +93,7 @@ class LogCleanerParameterizedIntegrationTest extends AbstractLogCleanerIntegrati
     logProps.put(LogConfig.RetentionMsProp, retentionMs: Integer)
     logProps.put(LogConfig.CleanupPolicyProp, "compact,delete")
 
-    def runCleanerAndCheckCompacted(numKeys: Int): (Log, Seq[(Int, String, Long)]) = {
+    def runCleanerAndCheckCompacted(numKeys: Int): (UnifiedLog, Seq[(Int, String, Long)]) = {
       cleaner = makeCleaner(partitions = topicPartitions.take(1), propertyOverrides = logProps, backOffMs = 100L)
       val log = cleaner.logs.get(topicPartitions(0))
 
@@ -280,7 +280,7 @@ class LogCleanerParameterizedIntegrationTest extends AbstractLogCleanerIntegrati
     assertTrue(lastCleaned >= firstDirty, s"log cleaner should have processed up to offset $firstDirty, but lastCleaned=$lastCleaned")
   }
 
-  private def checkLogAfterAppendingDups(log: Log, startSize: Long, appends: Seq[(Int, String, Long)]): Unit = {
+  private def checkLogAfterAppendingDups(log: UnifiedLog, startSize: Long, appends: Seq[(Int, String, Long)]): Unit = {
     val read = readFromLog(log)
     assertEquals(toMap(appends), toMap(read), "Contents of the map shouldn't change")
     assertTrue(startSize > log.size)
@@ -290,7 +290,7 @@ class LogCleanerParameterizedIntegrationTest extends AbstractLogCleanerIntegrati
     messages.map { case (key, value, offset) => key -> (value, offset) }.toMap
   }
 
-  private def readFromLog(log: Log): Iterable[(Int, String, Long)] = {
+  private def readFromLog(log: UnifiedLog): Iterable[(Int, String, Long)] = {
     for (segment <- log.logSegments; deepLogEntry <- segment.log.records.asScala) yield {
       val key = TestUtils.readString(deepLogEntry.key).toInt
       val value = TestUtils.readString(deepLogEntry.value)
@@ -298,7 +298,7 @@ class LogCleanerParameterizedIntegrationTest extends AbstractLogCleanerIntegrati
     }
   }
 
-  private def writeDupsSingleMessageSet(numKeys: Int, numDups: Int, log: Log, codec: CompressionType,
+  private def writeDupsSingleMessageSet(numKeys: Int, numDups: Int, log: UnifiedLog, codec: CompressionType,
                                         startKey: Int = 0, magicValue: Byte): Seq[(Int, String, Long)] = {
     val kvs = for (_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield {
       val payload = counter.toString
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 610f242..0a95735 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -101,11 +101,11 @@ class LogCleanerTest {
     logProps.put(LogConfig.SegmentBytesProp, 1024 : java.lang.Integer)
     logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact + "," + LogConfig.Delete)
     val config = LogConfig.fromProps(logConfig.originals, logProps)
-    val topicPartition = Log.parseTopicPartitionName(dir)
+    val topicPartition = UnifiedLog.parseTopicPartitionName(dir)
     val logDirFailureChannel = new LogDirFailureChannel(10)
     val maxProducerIdExpirationMs = 60 * 60 * 1000
     val logSegments = new LogSegments(topicPartition)
-    val leaderEpochCache = Log.maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, config.recordVersion, "")
+    val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, config.recordVersion, "")
     val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs, time)
     val offsets = LogLoader.load(LoadLogParams(
       dir,
@@ -123,7 +123,7 @@ class LogCleanerTest {
       producerStateManager))
     val localLog = new LocalLog(dir, config, logSegments, offsets.recoveryPoint,
       offsets.nextOffsetMetadata, time.scheduler, time, topicPartition, logDirFailureChannel)
-    val log = new Log(offsets.logStartOffset,
+    val log = new UnifiedLog(offsets.logStartOffset,
                       localLog,
                       brokerTopicStats = new BrokerTopicStats,
                       producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
@@ -162,7 +162,7 @@ class LogCleanerTest {
 
     // Remember reference to the first log and determine its file name expected for async deletion
     val firstLogFile = log.logSegments.head.log
-    val expectedFileName = CoreUtils.replaceSuffix(firstLogFile.file.getPath, "", Log.DeletedFileSuffix)
+    val expectedFileName = CoreUtils.replaceSuffix(firstLogFile.file.getPath, "", UnifiedLog.DeletedFileSuffix)
 
     // Clean the log. This should trigger replaceSegments() and deleteOldSegments();
     val offsetMap = new FakeOffsetMap(Int.MaxValue)
@@ -781,7 +781,7 @@ class LogCleanerTest {
     )
   }
 
-  def createLogWithMessagesLargerThanMaxSize(largeMessageSize: Int): (Log, FakeOffsetMap) = {
+  def createLogWithMessagesLargerThanMaxSize(largeMessageSize: Int): (UnifiedLog, FakeOffsetMap) = {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, largeMessageSize * 16: java.lang.Integer)
     logProps.put(LogConfig.MaxMessageBytesProp, largeMessageSize * 2: java.lang.Integer)
@@ -1070,22 +1070,22 @@ class LogCleanerTest {
     assertEquals(numInvalidMessages, stats.invalidMessagesRead, "Cleaner should have seen %d invalid messages.")
   }
 
-  def lastOffsetsPerBatchInLog(log: Log): Iterable[Long] = {
+  def lastOffsetsPerBatchInLog(log: UnifiedLog): Iterable[Long] = {
     for (segment <- log.logSegments; batch <- segment.log.batches.asScala)
       yield batch.lastOffset
   }
 
-  def lastSequencesInLog(log: Log): Map[Long, Int] = {
+  def lastSequencesInLog(log: UnifiedLog): Map[Long, Int] = {
     (for (segment <- log.logSegments;
           batch <- segment.log.batches.asScala if !batch.isControlBatch && batch.hasProducerId)
       yield batch.producerId -> batch.lastSequence).toMap
   }
 
   /* extract all the offsets from a log */
-  def offsetsInLog(log: Log): Iterable[Long] =
+  def offsetsInLog(log: UnifiedLog): Iterable[Long] =
     log.logSegments.flatMap(s => s.log.records.asScala.filter(_.hasValue).filter(_.hasKey).map(m => m.offset))
 
-  def unkeyedMessageCountInLog(log: Log) =
+  def unkeyedMessageCountInLog(log: UnifiedLog) =
     log.logSegments.map(s => s.log.records.asScala.filter(_.hasValue).count(m => !m.hasKey)).sum
 
   def abortCheckDone(topicPartition: TopicPartition): Unit = {
@@ -1430,9 +1430,9 @@ class LogCleanerTest {
 
     // 1) Simulate recovery just after .cleaned file is created, before rename to .swap
     //    On recovery, clean operation is aborted. All messages should be present in the log
-    log.logSegments.head.changeFileSuffixes("", Log.CleanedFileSuffix)
-    for (file <- dir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) {
-      Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")), false)
+    log.logSegments.head.changeFileSuffixes("", UnifiedLog.CleanedFileSuffix)
+    for (file <- dir.listFiles if file.getName.endsWith(UnifiedLog.DeletedFileSuffix)) {
+      Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, UnifiedLog.DeletedFileSuffix, "")), false)
     }
     log = recoverAndCheck(config, allKeys)
 
@@ -1446,10 +1446,10 @@ class LogCleanerTest {
 
     // 2) Simulate recovery just after .cleaned file is created, and a subset of them are renamed to .swap
     //    On recovery, clean operation is aborted. All messages should be present in the log
-    log.logSegments.head.changeFileSuffixes("", Log.CleanedFileSuffix)
-    log.logSegments.head.log.renameTo(new File(CoreUtils.replaceSuffix(log.logSegments.head.log.file.getPath, Log.CleanedFileSuffix, Log.SwapFileSuffix)))
-    for (file <- dir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) {
-      Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")), false)
+    log.logSegments.head.changeFileSuffixes("", UnifiedLog.CleanedFileSuffix)
+    log.logSegments.head.log.renameTo(new File(CoreUtils.replaceSuffix(log.logSegments.head.log.file.getPath, UnifiedLog.CleanedFileSuffix, UnifiedLog.SwapFileSuffix)))
+    for (file <- dir.listFiles if file.getName.endsWith(UnifiedLog.DeletedFileSuffix)) {
+      Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, UnifiedLog.DeletedFileSuffix, "")), false)
     }
     log = recoverAndCheck(config, allKeys)
 
@@ -1463,9 +1463,9 @@ class LogCleanerTest {
 
     // 3) Simulate recovery just after swap file is created, before old segment files are
     //    renamed to .deleted. Clean operation is resumed during recovery.
-    log.logSegments.head.changeFileSuffixes("", Log.SwapFileSuffix)
-    for (file <- dir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) {
-      Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")), false)
+    log.logSegments.head.changeFileSuffixes("", UnifiedLog.SwapFileSuffix)
+    for (file <- dir.listFiles if file.getName.endsWith(UnifiedLog.DeletedFileSuffix)) {
+      Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, UnifiedLog.DeletedFileSuffix, "")), false)
     }
     log = recoverAndCheck(config, cleanedKeys)
 
@@ -1484,7 +1484,7 @@ class LogCleanerTest {
 
     // 4) Simulate recovery after swap file is created and old segments files are renamed
     //    to .deleted. Clean operation is resumed during recovery.
-    log.logSegments.head.changeFileSuffixes("", Log.SwapFileSuffix)
+    log.logSegments.head.changeFileSuffixes("", UnifiedLog.SwapFileSuffix)
     log = recoverAndCheck(config, cleanedKeys)
 
     // add some more messages and clean the log again
@@ -1502,7 +1502,7 @@ class LogCleanerTest {
 
     // 5) Simulate recovery after a subset of swap files are renamed to regular files and old segments files are renamed
     //    to .deleted. Clean operation is resumed during recovery.
-    log.logSegments.head.timeIndex.file.renameTo(new File(CoreUtils.replaceSuffix(log.logSegments.head.timeIndex.file.getPath, "", Log.SwapFileSuffix)))
+    log.logSegments.head.timeIndex.file.renameTo(new File(CoreUtils.replaceSuffix(log.logSegments.head.timeIndex.file.getPath, "", UnifiedLog.SwapFileSuffix)))
     log = recoverAndCheck(config, cleanedKeys)
 
     // add some more messages and clean the log again
@@ -1716,7 +1716,7 @@ class LogCleanerTest {
   def testMaxCleanTimeSecs(): Unit = {
     val logCleaner = new LogCleaner(new CleanerConfig,
       logDirs = Array(TestUtils.tempDir()),
-      logs = new Pool[TopicPartition, Log](),
+      logs = new Pool[TopicPartition, UnifiedLog](),
       logDirFailureChannel = new LogDirFailureChannel(1),
       time = time)
 
@@ -1734,7 +1734,7 @@ class LogCleanerTest {
   }
 
 
-  private def writeToLog(log: Log, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = {
+  private def writeToLog(log: UnifiedLog, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = {
     for(((key, value), offset) <- keysAndValues.zip(offsetSeq))
       yield log.appendAsFollower(messageWithOffset(key, value, offset)).lastOffset
   }
@@ -1770,7 +1770,7 @@ class LogCleanerTest {
     messageWithOffset(key.toString.getBytes, value.toString.getBytes, offset)
 
   private def makeLog(dir: File = dir, config: LogConfig = logConfig, recoveryPoint: Long = 0L) =
-    Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = recoveryPoint, scheduler = time.scheduler,
+    UnifiedLog(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = recoveryPoint, scheduler = time.scheduler,
       time = time, brokerTopicStats = new BrokerTopicStats, maxProducerIdExpirationMs = 60 * 60 * 1000,
       producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
       logDirFailureChannel = new LogDirFailureChannel(10), topicId = None, keepPartitionMetadataFile = true)
@@ -1785,7 +1785,7 @@ class LogCleanerTest {
                 time = time,
                 checkDone = checkDone)
 
-  private def writeToLog(log: Log, seq: Iterable[(Int, Int)]): Iterable[Long] = {
+  private def writeToLog(log: UnifiedLog, seq: Iterable[(Int, Int)]): Iterable[Long] = {
     for ((key, value) <- seq)
       yield log.appendAsLeader(record(key, value), leaderEpoch = 0).firstOffset.get.messageOffset
   }
@@ -1801,7 +1801,7 @@ class LogCleanerTest {
       partitionLeaderEpoch, new SimpleRecord(key.toString.getBytes, value.toString.getBytes))
   }
 
-  private def appendTransactionalAsLeader(log: Log,
+  private def appendTransactionalAsLeader(log: UnifiedLog,
                                           producerId: Long,
                                           producerEpoch: Short,
                                           leaderEpoch: Int = 0,
@@ -1809,7 +1809,7 @@ class LogCleanerTest {
     appendIdempotentAsLeader(log, producerId, producerEpoch, isTransactional = true, origin = origin)
   }
 
-  private def appendIdempotentAsLeader(log: Log,
+  private def appendIdempotentAsLeader(log: UnifiedLog,
                                        producerId: Long,
                                        producerEpoch: Short,
                                        isTransactional: Boolean = false,
@@ -1851,7 +1851,7 @@ class LogCleanerTest {
 
   private def tombstoneRecord(key: Int): MemoryRecords = record(key, null)
 
-  private def recoverAndCheck(config: LogConfig, expectedKeys: Iterable[Long]): Log = {
+  private def recoverAndCheck(config: LogConfig, expectedKeys: Iterable[Long]): UnifiedLog = {
     LogTestUtils.recoverAndCheck(dir, config, expectedKeys, new BrokerTopicStats(), time, time.scheduler)
   }
 }
diff --git a/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala b/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
index 3efe2be..e10b5ab 100644
--- a/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
@@ -61,7 +61,7 @@ class LogConcurrencyTest {
     testUncommittedDataNotConsumed(createLog(logConfig))
   }
 
-  def testUncommittedDataNotConsumed(log: Log): Unit = {
+  def testUncommittedDataNotConsumed(log: UnifiedLog): Unit = {
     val executor = Executors.newFixedThreadPool(2)
     try {
       val maxOffset = 5000
@@ -82,7 +82,7 @@ class LogConcurrencyTest {
    * Simple consumption task which reads the log in ascending order and collects
    * consumed batches for validation
    */
-  private class ConsumerTask(log: Log, lastOffset: Int) extends Callable[Unit] {
+  private class ConsumerTask(log: UnifiedLog, lastOffset: Int) extends Callable[Unit] {
     val consumedBatches = ListBuffer.empty[FetchedBatch]
 
     override def call(): Unit = {
@@ -105,7 +105,7 @@ class LogConcurrencyTest {
   /**
    * This class simulates basic leader/follower behavior.
    */
-  private class LogAppendTask(log: Log, lastOffset: Long) extends Callable[Unit] {
+  private class LogAppendTask(log: UnifiedLog, lastOffset: Long) extends Callable[Unit] {
     override def call(): Unit = {
       var leaderEpoch = 1
       var isLeader = true
@@ -140,8 +140,8 @@ class LogConcurrencyTest {
     }
   }
 
-  private def createLog(config: LogConfig = LogConfig(new Properties())): Log = {
-    Log(dir = logDir,
+  private def createLog(config: LogConfig = LogConfig(new Properties())): UnifiedLog = {
+    UnifiedLog(dir = logDir,
       config = config,
       logStartOffset = 0L,
       recoveryPoint = 0L,
@@ -155,7 +155,7 @@ class LogConcurrencyTest {
       keepPartitionMetadataFile = true)
   }
 
-  private def validateConsumedData(log: Log, consumedBatches: Iterable[FetchedBatch]): Unit = {
+  private def validateConsumedData(log: UnifiedLog, consumedBatches: Iterable[FetchedBatch]): Unit = {
     val iter = consumedBatches.iterator
     log.logSegments.foreach { segment =>
       segment.log.batches.forEach { batch =>
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index 33ee41d..ad924d6 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -67,7 +67,7 @@ class LogLoaderTest {
     val logConfig = LogConfig(logProps)
     val logDirs = Seq(logDir)
     val topicPartition = new TopicPartition("foo", 0)
-    var log: Log = null
+    var log: UnifiedLog = null
     val time = new MockTime()
     var cleanShutdownInterceptedValue = false
     case class SimulateError(var hasError: Boolean = false)
@@ -85,19 +85,19 @@ class LogLoaderTest {
 
         override def loadLog(logDir: File, hadCleanShutdown: Boolean, recoveryPoints: Map[TopicPartition, Long],
                              logStartOffsets: Map[TopicPartition, Long], defaultConfig: LogConfig,
-                             topicConfigs: Map[String, LogConfig]): Log = {
+                             topicConfigs: Map[String, LogConfig]): UnifiedLog = {
           if (simulateError.hasError) {
             throw new RuntimeException("Simulated error")
           }
           cleanShutdownInterceptedValue = hadCleanShutdown
-          val topicPartition = Log.parseTopicPartitionName(logDir)
+          val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
           val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
           val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
           val logStartOffset = logStartOffsets.getOrElse(topicPartition, 0L)
           val logDirFailureChannel: LogDirFailureChannel = new LogDirFailureChannel(1)
           val maxProducerIdExpirationMs = 60 * 60 * 1000
           val segments = new LogSegments(topicPartition)
-          val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.recordVersion, "")
+          val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.recordVersion, "")
           val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxProducerIdExpirationMs, time)
           val loadLogParams = LoadLogParams(logDir, topicPartition, config, time.scheduler, time,
             logDirFailureChannel, hadCleanShutdown, segments, logStartOffset, logRecoveryPoint,
@@ -106,7 +106,7 @@ class LogLoaderTest {
           val localLog = new LocalLog(logDir, logConfig, segments, offsets.recoveryPoint,
             offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, topicPartition,
             logDirFailureChannel)
-          new Log(offsets.logStartOffset, localLog, brokerTopicStats,
+          new UnifiedLog(offsets.logStartOffset, localLog, brokerTopicStats,
             LogManager.ProducerIdExpirationCheckIntervalMs, leaderEpochCache,
             producerStateManager, None, true)
         }
@@ -175,12 +175,12 @@ class LogLoaderTest {
                         time: Time = mockTime,
                         maxProducerIdExpirationMs: Int = maxProducerIdExpirationMs,
                         producerIdExpirationCheckIntervalMs: Int = LogManager.ProducerIdExpirationCheckIntervalMs,
-                        lastShutdownClean: Boolean = true): Log = {
+                        lastShutdownClean: Boolean = true): UnifiedLog = {
     LogTestUtils.createLog(dir, config, brokerTopicStats, scheduler, time, logStartOffset, recoveryPoint,
       maxProducerIdExpirationMs, producerIdExpirationCheckIntervalMs, lastShutdownClean)
   }
 
-  private def createLogWithOffsetOverflow(logConfig: LogConfig): (Log, LogSegment) = {
+  private def createLogWithOffsetOverflow(logConfig: LogConfig): (UnifiedLog, LogSegment) = {
     LogTestUtils.initializeLogDirWithOverflowedSegment(logDir)
 
     val log = createLog(logDir, logConfig, recoveryPoint = Long.MaxValue)
@@ -191,7 +191,7 @@ class LogLoaderTest {
     (log, segmentWithOverflow)
   }
 
-  private def recoverAndCheck(config: LogConfig, expectedKeys: Iterable[Long]): Log = {
+  private def recoverAndCheck(config: LogConfig, expectedKeys: Iterable[Long]): UnifiedLog = {
     // method is called only in case of recovery from hard reset
     LogTestUtils.recoverAndCheck(logDir, config, expectedKeys, brokerTopicStats, mockTime, mockTime.scheduler)
   }
@@ -257,7 +257,7 @@ class LogLoaderTest {
 
     def createLogWithInterceptedReads(recoveryPoint: Long) = {
       val maxProducerIdExpirationMs = 60 * 60 * 1000
-      val topicPartition = Log.parseTopicPartitionName(logDir)
+      val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
       val logDirFailureChannel = new LogDirFailureChannel(10)
       // Intercept all segment read calls
       val interceptedLogSegments = new LogSegments(topicPartition) {
@@ -279,7 +279,7 @@ class LogLoaderTest {
           super.add(wrapper)
         }
       }
-      val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, logConfig.recordVersion, "")
+      val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, logConfig.recordVersion, "")
       val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxProducerIdExpirationMs, mockTime)
       val loadLogParams = LoadLogParams(
         logDir,
@@ -299,7 +299,7 @@ class LogLoaderTest {
       val localLog = new LocalLog(logDir, logConfig, interceptedLogSegments, offsets.recoveryPoint,
         offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, topicPartition,
         logDirFailureChannel)
-      new Log(offsets.logStartOffset, localLog, brokerTopicStats,
+      new UnifiedLog(offsets.logStartOffset, localLog, brokerTopicStats,
         LogManager.ProducerIdExpirationCheckIntervalMs, leaderEpochCache, producerStateManager,
         None, keepPartitionMetadataFile = true)
     }
@@ -349,12 +349,12 @@ class LogLoaderTest {
 
     EasyMock.replay(stateManager)
 
-    val topicPartition = Log.parseTopicPartitionName(logDir)
+    val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
     val logDirFailureChannel: LogDirFailureChannel = new LogDirFailureChannel(1)
     val config = LogConfig(new Properties())
     val maxProducerIdExpirationMs = 300000
     val segments = new LogSegments(topicPartition)
-    val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.recordVersion, "")
+    val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.recordVersion, "")
     val offsets = LogLoader.load(LoadLogParams(
       logDir,
       topicPartition,
@@ -372,7 +372,7 @@ class LogLoaderTest {
     val localLog = new LocalLog(logDir, config, segments, offsets.recoveryPoint,
       offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, topicPartition,
       logDirFailureChannel)
-    val log = new Log(offsets.logStartOffset,
+    val log = new UnifiedLog(offsets.logStartOffset,
       localLog,
       brokerTopicStats = brokerTopicStats,
       producerIdExpirationCheckIntervalMs = 30000,
@@ -476,14 +476,14 @@ class LogLoaderTest {
 
     EasyMock.replay(stateManager)
 
-    val topicPartition = Log.parseTopicPartitionName(logDir)
+    val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
     val logProps = new Properties()
     logProps.put(LogConfig.MessageFormatVersionProp, "0.10.2")
     val config = LogConfig(logProps)
     val maxProducerIdExpirationMs = 300000
     val logDirFailureChannel = null
     val segments = new LogSegments(topicPartition)
-    val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.recordVersion, "")
+    val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.recordVersion, "")
     val offsets = LogLoader.load(LoadLogParams(
       logDir,
       topicPartition,
@@ -501,7 +501,7 @@ class LogLoaderTest {
     val localLog = new LocalLog(logDir, config, segments, offsets.recoveryPoint,
       offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, topicPartition,
       logDirFailureChannel)
-    new Log(offsets.logStartOffset,
+    new UnifiedLog(offsets.logStartOffset,
       localLog,
       brokerTopicStats = brokerTopicStats,
       producerIdExpirationCheckIntervalMs = 30000,
@@ -533,14 +533,14 @@ class LogLoaderTest {
 
     EasyMock.replay(stateManager)
 
-    val topicPartition = Log.parseTopicPartitionName(logDir)
+    val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
     val logProps = new Properties()
     logProps.put(LogConfig.MessageFormatVersionProp, "0.10.2")
     val config = LogConfig(logProps)
     val maxProducerIdExpirationMs = 300000
     val logDirFailureChannel = null
     val segments = new LogSegments(topicPartition)
-    val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.recordVersion, "")
+    val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.recordVersion, "")
     val offsets = LogLoader.load(LoadLogParams(
       logDir,
       topicPartition,
@@ -558,7 +558,7 @@ class LogLoaderTest {
     val localLog = new LocalLog(logDir, config, segments, offsets.recoveryPoint,
       offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, topicPartition,
       logDirFailureChannel)
-    new Log(offsets.logStartOffset,
+    new UnifiedLog(offsets.logStartOffset,
       localLog,
       brokerTopicStats = brokerTopicStats,
       producerIdExpirationCheckIntervalMs = 30000,
@@ -592,14 +592,14 @@ class LogLoaderTest {
 
     EasyMock.replay(stateManager)
 
-    val topicPartition = Log.parseTopicPartitionName(logDir)
+    val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
     val logProps = new Properties()
     logProps.put(LogConfig.MessageFormatVersionProp, "0.11.0")
     val config = LogConfig(logProps)
     val maxProducerIdExpirationMs = 300000
     val logDirFailureChannel = null
     val segments = new LogSegments(topicPartition)
-    val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.recordVersion, "")
+    val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.recordVersion, "")
     val offsets = LogLoader.load(LoadLogParams(
       logDir,
       topicPartition,
@@ -617,7 +617,7 @@ class LogLoaderTest {
     val localLog = new LocalLog(logDir, config, segments, offsets.recoveryPoint,
       offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, topicPartition,
       logDirFailureChannel)
-    new Log(offsets.logStartOffset,
+    new UnifiedLog(offsets.logStartOffset,
       localLog,
       brokerTopicStats = brokerTopicStats,
       producerIdExpirationCheckIntervalMs = 30000,
@@ -682,7 +682,7 @@ class LogLoaderTest {
     // We expect 3 snapshot files, two of which are for the first two segments, the last was written out during log closing.
     assertEquals(Seq(1, 2, 4), ProducerStateManager.listSnapshotFiles(logDir).map(_.offset).sorted)
     // Inject a stray snapshot file within the bounds of the log at offset 3, it should be cleaned up after loading the log
-    val straySnapshotFile = Log.producerSnapshotFile(logDir, 3).toPath
+    val straySnapshotFile = UnifiedLog.producerSnapshotFile(logDir, 3).toPath
     Files.createFile(straySnapshotFile)
     assertEquals(Seq(1, 2, 3, 4), ProducerStateManager.listSnapshotFiles(logDir).map(_.offset).sorted)
 
@@ -756,7 +756,7 @@ class LogLoaderTest {
     }
     log.close()
 
-    def verifyRecoveredLog(log: Log, expectedRecoveryPoint: Long): Unit = {
+    def verifyRecoveredLog(log: UnifiedLog, expectedRecoveryPoint: Long): Unit = {
       assertEquals(expectedRecoveryPoint, log.recoveryPoint, s"Unexpected recovery point")
       assertEquals(numMessages, log.logEndOffset, s"Should have $numMessages messages when log is reopened w/o recovery")
       assertEquals(lastIndexOffset, log.activeSegment.offsetIndex.lastOffset, "Should have same last index offset as before.")
@@ -893,10 +893,10 @@ class LogLoaderTest {
    */
   @Test
   def testBogusIndexSegmentsAreRemoved(): Unit = {
-    val bogusIndex1 = Log.offsetIndexFile(logDir, 0)
-    val bogusTimeIndex1 = Log.timeIndexFile(logDir, 0)
-    val bogusIndex2 = Log.offsetIndexFile(logDir, 5)
-    val bogusTimeIndex2 = Log.timeIndexFile(logDir, 5)
+    val bogusIndex1 = UnifiedLog.offsetIndexFile(logDir, 0)
+    val bogusTimeIndex1 = UnifiedLog.timeIndexFile(logDir, 0)
+    val bogusIndex2 = UnifiedLog.offsetIndexFile(logDir, 5)
+    val bogusTimeIndex2 = UnifiedLog.timeIndexFile(logDir, 5)
 
     // The files remain absent until we first access it because we are doing lazy loading for time index and offset index
     // files but in this test case we need to create these files in order to test we will remove them.
@@ -1022,7 +1022,7 @@ class LogLoaderTest {
     //This write will roll the segment, yielding a new segment with base offset = max(1, Integer.MAX_VALUE+2) = Integer.MAX_VALUE+2
     log.appendAsFollower(set2)
     assertEquals(Integer.MAX_VALUE.toLong + 2, log.activeSegment.baseOffset)
-    assertTrue(Log.producerSnapshotFile(logDir, Integer.MAX_VALUE.toLong + 2).exists)
+    assertTrue(UnifiedLog.producerSnapshotFile(logDir, Integer.MAX_VALUE.toLong + 2).exists)
     //This will go into the existing log
     log.appendAsFollower(set3)
     assertEquals(Integer.MAX_VALUE.toLong + 2, log.activeSegment.baseOffset)
@@ -1085,7 +1085,7 @@ class LogLoaderTest {
     //This write will roll the segment, yielding a new segment with base offset = max(1, Integer.MAX_VALUE+2) = Integer.MAX_VALUE+2
     log.appendAsFollower(set2)
     assertEquals(Integer.MAX_VALUE.toLong + 2, log.activeSegment.baseOffset)
-    assertTrue(Log.producerSnapshotFile(logDir, Integer.MAX_VALUE.toLong + 2).exists)
+    assertTrue(UnifiedLog.producerSnapshotFile(logDir, Integer.MAX_VALUE.toLong + 2).exists)
     //This will go into the existing log
     log.appendAsFollower(set3)
     assertEquals(Integer.MAX_VALUE.toLong + 2, log.activeSegment.baseOffset)
@@ -1125,11 +1125,11 @@ class LogLoaderTest {
     //This write will roll the segment, yielding a new segment with base offset = max(1, 3) = 3
     log.appendAsFollower(set2)
     assertEquals(3, log.activeSegment.baseOffset)
-    assertTrue(Log.producerSnapshotFile(logDir, 3).exists)
+    assertTrue(UnifiedLog.producerSnapshotFile(logDir, 3).exists)
     //This will also roll the segment, yielding a new segment with base offset = max(5, Integer.MAX_VALUE+4) = Integer.MAX_VALUE+4
     log.appendAsFollower(set3)
     assertEquals(Integer.MAX_VALUE.toLong + 4, log.activeSegment.baseOffset)
-    assertTrue(Log.producerSnapshotFile(logDir, Integer.MAX_VALUE.toLong + 4).exists)
+    assertTrue(UnifiedLog.producerSnapshotFile(logDir, Integer.MAX_VALUE.toLong + 4).exists)
     //This will go into the existing log
     log.appendAsFollower(set4)
     assertEquals(Integer.MAX_VALUE.toLong + 4, log.activeSegment.baseOffset)
@@ -1175,11 +1175,11 @@ class LogLoaderTest {
     // Simulate recovery just after .cleaned file is created, before rename to .swap. On recovery, existing split
     // operation is aborted but the recovery process itself kicks off split which should complete.
     newSegments.reverse.foreach(segment => {
-      segment.changeFileSuffixes("", Log.CleanedFileSuffix)
+      segment.changeFileSuffixes("", UnifiedLog.CleanedFileSuffix)
       segment.truncateTo(0)
     })
-    for (file <- logDir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix))
-      Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")))
+    for (file <- logDir.listFiles if file.getName.endsWith(UnifiedLog.DeletedFileSuffix))
+      Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, UnifiedLog.DeletedFileSuffix, "")))
 
     val recoveredLog = recoverAndCheck(logConfig, expectedKeys)
     assertEquals(expectedKeys, LogTestUtils.keysInLog(recoveredLog))
@@ -1201,13 +1201,13 @@ class LogLoaderTest {
     // operation is aborted but the recovery process itself kicks off split which should complete.
     newSegments.reverse.foreach { segment =>
       if (segment != newSegments.last)
-        segment.changeFileSuffixes("", Log.CleanedFileSuffix)
+        segment.changeFileSuffixes("", UnifiedLog.CleanedFileSuffix)
       else
-        segment.changeFileSuffixes("", Log.SwapFileSuffix)
+        segment.changeFileSuffixes("", UnifiedLog.SwapFileSuffix)
       segment.truncateTo(0)
     }
-    for (file <- logDir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix))
-      Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")))
+    for (file <- logDir.listFiles if file.getName.endsWith(UnifiedLog.DeletedFileSuffix))
+      Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, UnifiedLog.DeletedFileSuffix, "")))
 
     val recoveredLog = recoverAndCheck(logConfig, expectedKeys)
     assertEquals(expectedKeys, LogTestUtils.keysInLog(recoveredLog))
@@ -1228,10 +1228,10 @@ class LogLoaderTest {
     // Simulate recovery right after all new segments have been renamed to .swap. On recovery, existing split operation
     // is completed and the old segment must be deleted.
     newSegments.reverse.foreach(segment => {
-      segment.changeFileSuffixes("", Log.SwapFileSuffix)
+      segment.changeFileSuffixes("", UnifiedLog.SwapFileSuffix)
     })
-    for (file <- logDir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix))
-      Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")))
+    for (file <- logDir.listFiles if file.getName.endsWith(UnifiedLog.DeletedFileSuffix))
+      Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, UnifiedLog.DeletedFileSuffix, "")))
 
     // Truncate the old segment
     segmentWithOverflow.truncateTo(0)
@@ -1254,9 +1254,9 @@ class LogLoaderTest {
 
     // Simulate recovery right after all new segments have been renamed to .swap and old segment has been deleted. On
     // recovery, existing split operation is completed.
-    newSegments.reverse.foreach(_.changeFileSuffixes("", Log.SwapFileSuffix))
+    newSegments.reverse.foreach(_.changeFileSuffixes("", UnifiedLog.SwapFileSuffix))
 
-    for (file <- logDir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix))
+    for (file <- logDir.listFiles if file.getName.endsWith(UnifiedLog.DeletedFileSuffix))
       Utils.delete(file)
 
     // Truncate the old segment
@@ -1280,7 +1280,7 @@ class LogLoaderTest {
 
     // Simulate recovery right after one of the new segment has been renamed to .swap and the other to .log. On
     // recovery, existing split operation is completed.
-    newSegments.last.changeFileSuffixes("", Log.SwapFileSuffix)
+    newSegments.last.changeFileSuffixes("", UnifiedLog.SwapFileSuffix)
 
     // Truncate the old segment
     segmentWithOverflow.truncateTo(0)
@@ -1613,7 +1613,7 @@ class LogLoaderTest {
     assertEquals(4, log.logEndOffset)
 
     val offsetsWithSnapshotFiles = (1 until 5)
-        .map(offset => SnapshotFile(Log.producerSnapshotFile(logDir, offset)))
+        .map(offset => SnapshotFile(UnifiedLog.producerSnapshotFile(logDir, offset)))
         .filter(snapshotFile => snapshotFile.file.exists())
         .map(_.offset)
     val inMemorySnapshotFiles = (1 until 5)
@@ -1664,6 +1664,6 @@ class LogLoaderTest {
     }
     assertTrue(offsetsWithMissingSnapshotFiles.isEmpty,
       s"Found offsets with missing producer state snapshot files: $offsetsWithMissingSnapshotFiles")
-    assertFalse(logDir.list().exists(_.endsWith(Log.DeletedFileSuffix)), "Expected no files to be present with the deleted file suffix")
+    assertFalse(logDir.list().exists(_.endsWith(UnifiedLog.DeletedFileSuffix)), "Expected no files to be present with the deleted file suffix")
   }
 }
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 4e08b34..9eb1b42 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -497,7 +497,7 @@ class LogManagerTest {
     }
   }
 
-  private def readLog(log: Log, offset: Long, maxLength: Int = 1024): FetchDataInfo = {
+  private def readLog(log: UnifiedLog, offset: Long, maxLength: Int = 1024): FetchDataInfo = {
     log.read(offset, maxLength, isolation = FetchLogEnd, minOneMessage = true)
   }
 
@@ -511,7 +511,7 @@ class LogManagerTest {
     val spyConfigRepository = spy(new MockConfigRepository)
     logManager = createLogManager(configRepository = spyConfigRepository)
     val spyLogManager = spy(logManager)
-    val mockLog = mock(classOf[Log])
+    val mockLog = mock(classOf[UnifiedLog])
 
     val testTopicOne = "test-topic-one"
     val testTopicTwo = "test-topic-two"
@@ -566,7 +566,7 @@ class LogManagerTest {
     val spyConfigRepository = spy(new MockConfigRepository)
     logManager = createLogManager(configRepository = spyConfigRepository)
     val spyLogManager = spy(logManager)
-    val mockLog = mock(classOf[Log])
+    val mockLog = mock(classOf[UnifiedLog])
 
     val testTopicOne = "test-topic-one"
     val testTopicTwo = "test-topic-two"
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 3d2e2d7..9884576 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -564,7 +564,7 @@ class LogSegmentTest {
 
     // create a log file in a separate directory to avoid conflicting with created segments
     val tempDir = TestUtils.tempDir()
-    val fileRecords = FileRecords.open(Log.logFile(tempDir, 0))
+    val fileRecords = FileRecords.open(UnifiedLog.logFile(tempDir, 0))
 
     // Simulate a scenario where we have a single log with an offset range exceeding Int.MaxValue
     fileRecords.append(records(0, 1024))
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala
index 0345ace..7e01e3b 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentsTest.scala
@@ -235,6 +235,6 @@ class LogSegmentsTest {
 
     assertEquals(Int.MaxValue, LogSegments.sizeInBytes(Seq(logSegment)))
     assertEquals(largeSize, LogSegments.sizeInBytes(Seq(logSegment, logSegment)))
-    assertTrue(Log.sizeInBytes(Seq(logSegment, logSegment)) > Int.MaxValue)
+    assertTrue(UnifiedLog.sizeInBytes(Seq(logSegment, logSegment)) > Int.MaxValue)
   }
 }
diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
index accc47b..1f32ed8 100644
--- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
@@ -38,10 +38,10 @@ object LogTestUtils {
                     logDir: File,
                     indexIntervalBytes: Int = 10,
                     time: Time = Time.SYSTEM): LogSegment = {
-    val ms = FileRecords.open(Log.logFile(logDir, offset))
-    val idx = LazyIndex.forOffset(Log.offsetIndexFile(logDir, offset), offset, maxIndexSize = 1000)
-    val timeIdx = LazyIndex.forTime(Log.timeIndexFile(logDir, offset), offset, maxIndexSize = 1500)
-    val txnIndex = new TransactionIndex(offset, Log.transactionIndexFile(logDir, offset))
+    val ms = FileRecords.open(UnifiedLog.logFile(logDir, offset))
+    val idx = LazyIndex.forOffset(UnifiedLog.offsetIndexFile(logDir, offset), offset, maxIndexSize = 1000)
+    val timeIdx = LazyIndex.forTime(UnifiedLog.timeIndexFile(logDir, offset), offset, maxIndexSize = 1500)
+    val txnIndex = new TransactionIndex(offset, UnifiedLog.transactionIndexFile(logDir, offset))
 
     new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, time)
   }
@@ -81,8 +81,8 @@ object LogTestUtils {
                 producerIdExpirationCheckIntervalMs: Int = LogManager.ProducerIdExpirationCheckIntervalMs,
                 lastShutdownClean: Boolean = true,
                 topicId: Option[Uuid] = None,
-                keepPartitionMetadataFile: Boolean = true): Log = {
-    Log(dir = dir,
+                keepPartitionMetadataFile: Boolean = true): UnifiedLog = {
+    UnifiedLog(dir = dir,
       config = config,
       logStartOffset = logStartOffset,
       recoveryPoint = recoveryPoint,
@@ -102,9 +102,9 @@ object LogTestUtils {
    * @param log Log to check
    * @return true if log contains at least one segment with offset overflow; false otherwise
    */
-  def hasOffsetOverflow(log: Log): Boolean = firstOverflowSegment(log).isDefined
+  def hasOffsetOverflow(log: UnifiedLog): Boolean = firstOverflowSegment(log).isDefined
 
-  def firstOverflowSegment(log: Log): Option[LogSegment] = {
+  def firstOverflowSegment(log: UnifiedLog): Option[LogSegment] = {
     def hasOverflow(baseOffset: Long, batch: RecordBatch): Boolean =
       batch.lastOffset > baseOffset + Int.MaxValue || batch.baseOffset < baseOffset
 
@@ -117,7 +117,7 @@ object LogTestUtils {
   }
 
   def rawSegment(logDir: File, baseOffset: Long): FileRecords =
-    FileRecords.open(Log.logFile(logDir, baseOffset))
+    FileRecords.open(UnifiedLog.logFile(logDir, baseOffset))
 
   /**
    * Initialize the given log directory with a set of segments, one of which will have an
@@ -138,8 +138,8 @@ object LogTestUtils {
       segment.append(MemoryRecords.withRecords(baseOffset + Int.MaxValue - 1, CompressionType.NONE, 0,
         record(baseOffset + Int.MaxValue - 1)))
       // Need to create the offset files explicitly to avoid triggering segment recovery to truncate segment.
-      Log.offsetIndexFile(logDir, baseOffset).createNewFile()
-      Log.timeIndexFile(logDir, baseOffset).createNewFile()
+      UnifiedLog.offsetIndexFile(logDir, baseOffset).createNewFile()
+      UnifiedLog.timeIndexFile(logDir, baseOffset).createNewFile()
       baseOffset + Int.MaxValue
     }
 
@@ -165,29 +165,29 @@ object LogTestUtils {
   }
 
   /* extract all the keys from a log */
-  def keysInLog(log: Log): Iterable[Long] = {
+  def keysInLog(log: UnifiedLog): Iterable[Long] = {
     for (logSegment <- log.logSegments;
          batch <- logSegment.log.batches.asScala if !batch.isControlBatch;
          record <- batch.asScala if record.hasValue && record.hasKey)
       yield TestUtils.readString(record.key).toLong
   }
 
-  def recoverAndCheck(logDir: File, config: LogConfig, expectedKeys: Iterable[Long], brokerTopicStats: BrokerTopicStats, time: Time, scheduler: Scheduler): Log = {
+  def recoverAndCheck(logDir: File, config: LogConfig, expectedKeys: Iterable[Long], brokerTopicStats: BrokerTopicStats, time: Time, scheduler: Scheduler): UnifiedLog = {
     // Recover log file and check that after recovery, keys are as expected
     // and all temporary files have been deleted
     val recoveredLog = createLog(logDir, config, brokerTopicStats, scheduler, time, lastShutdownClean = false)
     time.sleep(config.fileDeleteDelayMs + 1)
     for (file <- logDir.listFiles) {
-      assertFalse(file.getName.endsWith(Log.DeletedFileSuffix), "Unexpected .deleted file after recovery")
-      assertFalse(file.getName.endsWith(Log.CleanedFileSuffix), "Unexpected .cleaned file after recovery")
-      assertFalse(file.getName.endsWith(Log.SwapFileSuffix), "Unexpected .swap file after recovery")
+      assertFalse(file.getName.endsWith(UnifiedLog.DeletedFileSuffix), "Unexpected .deleted file after recovery")
+      assertFalse(file.getName.endsWith(UnifiedLog.CleanedFileSuffix), "Unexpected .cleaned file after recovery")
+      assertFalse(file.getName.endsWith(UnifiedLog.SwapFileSuffix), "Unexpected .swap file after recovery")
     }
     assertEquals(expectedKeys, keysInLog(recoveredLog))
     assertFalse(hasOffsetOverflow(recoveredLog))
     recoveredLog
   }
 
-  def appendEndTxnMarkerAsLeader(log: Log,
+  def appendEndTxnMarkerAsLeader(log: UnifiedLog,
                                  producerId: Long,
                                  producerEpoch: Short,
                                  controlType: ControlRecordType,
@@ -210,31 +210,31 @@ object LogTestUtils {
     MemoryRecords.withEndTransactionMarker(offset, timestamp, partitionLeaderEpoch, producerId, epoch, marker)
   }
 
-  def readLog(log: Log,
-             startOffset: Long,
-             maxLength: Int,
-             isolation: FetchIsolation = FetchLogEnd,
-             minOneMessage: Boolean = true): FetchDataInfo = {
+  def readLog(log: UnifiedLog,
+              startOffset: Long,
+              maxLength: Int,
+              isolation: FetchIsolation = FetchLogEnd,
+              minOneMessage: Boolean = true): FetchDataInfo = {
     log.read(startOffset, maxLength, isolation, minOneMessage)
   }
 
-  def allAbortedTransactions(log: Log): Iterable[AbortedTxn] = log.logSegments.flatMap(_.txnIndex.allAbortedTxns)
+  def allAbortedTransactions(log: UnifiedLog): Iterable[AbortedTxn] = log.logSegments.flatMap(_.txnIndex.allAbortedTxns)
 
   def deleteProducerSnapshotFiles(logDir: File): Unit = {
-    val files = logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(Log.ProducerSnapshotFileSuffix))
+    val files = logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(UnifiedLog.ProducerSnapshotFileSuffix))
     files.foreach(Utils.delete)
   }
 
   def listProducerSnapshotOffsets(logDir: File): Seq[Long] =
     ProducerStateManager.listSnapshotFiles(logDir).map(_.offset).sorted
 
-  def assertLeaderEpochCacheEmpty(log: Log): Unit = {
+  def assertLeaderEpochCacheEmpty(log: UnifiedLog): Unit = {
     assertEquals(None, log.leaderEpochCache)
     assertEquals(None, log.latestEpoch)
     assertFalse(LeaderEpochCheckpointFile.newFile(log.dir).exists())
   }
 
-  def appendNonTransactionalAsLeader(log: Log, numRecords: Int): Unit = {
+  def appendNonTransactionalAsLeader(log: UnifiedLog, numRecords: Int): Unit = {
     val simpleRecords = (0 until numRecords).map { seq =>
       new SimpleRecord(s"$seq".getBytes)
     }
@@ -242,14 +242,14 @@ object LogTestUtils {
     log.appendAsLeader(records, leaderEpoch = 0)
   }
 
-  def appendTransactionalAsLeader(log: Log,
+  def appendTransactionalAsLeader(log: UnifiedLog,
                                   producerId: Long,
                                   producerEpoch: Short,
                                   time: Time): Int => Unit = {
     appendIdempotentAsLeader(log, producerId, producerEpoch, time, isTransactional = true)
   }
 
-  def appendIdempotentAsLeader(log: Log,
+  def appendIdempotentAsLeader(log: UnifiedLog,
                                producerId: Long,
                                producerEpoch: Short,
                                time: Time,
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 59bbb23..0c2fb6b 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -872,9 +872,9 @@ class ProducerStateManagerTest {
     // the broker shutdown cleanly and emitted a snapshot file larger than the base offset of the active segment.
 
     // Create 3 snapshot files at different offsets.
-    Log.producerSnapshotFile(logDir, 5).createNewFile() // not stray
-    Log.producerSnapshotFile(logDir, 2).createNewFile() // stray
-    Log.producerSnapshotFile(logDir, 42).createNewFile() // not stray
+    UnifiedLog.producerSnapshotFile(logDir, 5).createNewFile() // not stray
+    UnifiedLog.producerSnapshotFile(logDir, 2).createNewFile() // stray
+    UnifiedLog.producerSnapshotFile(logDir, 42).createNewFile() // not stray
 
     // claim that we only have one segment with a base offset of 5
     stateManager.removeStraySnapshots(Seq(5))
@@ -892,9 +892,9 @@ class ProducerStateManagerTest {
     // Snapshots associated with an offset in the list of segment base offsets should remain.
 
     // Create 3 snapshot files at different offsets.
-    Log.producerSnapshotFile(logDir, 5).createNewFile() // stray
-    Log.producerSnapshotFile(logDir, 2).createNewFile() // stray
-    Log.producerSnapshotFile(logDir, 42).createNewFile() // not stray
+    UnifiedLog.producerSnapshotFile(logDir, 5).createNewFile() // stray
+    UnifiedLog.producerSnapshotFile(logDir, 2).createNewFile() // stray
+    UnifiedLog.producerSnapshotFile(logDir, 42).createNewFile() // not stray
 
     stateManager.removeStraySnapshots(Seq(42))
     assertEquals(Seq(42), ProducerStateManager.listSnapshotFiles(logDir).map(_.offset).sorted)
@@ -906,11 +906,11 @@ class ProducerStateManagerTest {
    */
   @Test
   def testRemoveAndMarkSnapshotForDeletion(): Unit = {
-    Log.producerSnapshotFile(logDir, 5).createNewFile()
+    UnifiedLog.producerSnapshotFile(logDir, 5).createNewFile()
     val manager = new ProducerStateManager(partition, logDir, time = time)
     assertTrue(manager.latestSnapshotOffset.isDefined)
     val snapshot = manager.removeAndMarkSnapshotForDeletion(5).get
-    assertTrue(snapshot.file.toPath.toString.endsWith(Log.DeletedFileSuffix))
+    assertTrue(snapshot.file.toPath.toString.endsWith(UnifiedLog.DeletedFileSuffix))
     assertTrue(manager.latestSnapshotOffset.isEmpty)
   }
 
@@ -923,7 +923,7 @@ class ProducerStateManagerTest {
    */
   @Test
   def testRemoveAndMarkSnapshotForDeletionAlreadyDeleted(): Unit = {
-    val file = Log.producerSnapshotFile(logDir, 5)
+    val file = UnifiedLog.producerSnapshotFile(logDir, 5)
     file.createNewFile()
     val manager = new ProducerStateManager(partition, logDir, time = time)
     assertTrue(manager.latestSnapshotOffset.isDefined)
@@ -945,7 +945,7 @@ class ProducerStateManagerTest {
     // Truncate the last snapshot
     val latestSnapshotOffset = stateManager.latestSnapshotOffset
     assertEquals(Some(2L), latestSnapshotOffset)
-    val snapshotToTruncate = Log.producerSnapshotFile(logDir, latestSnapshotOffset.get)
+    val snapshotToTruncate = UnifiedLog.producerSnapshotFile(logDir, latestSnapshotOffset.get)
     val channel = FileChannel.open(snapshotToTruncate.toPath, StandardOpenOption.WRITE)
     try {
       makeFileCorrupt(channel)
@@ -1005,6 +1005,6 @@ class ProducerStateManagerTest {
   }
 
   private def currentSnapshotOffsets: Set[Long] =
-    logDir.listFiles.map(Log.offsetFromFile).toSet
+    logDir.listFiles.map(UnifiedLog.offsetFromFile).toSet
 
 }
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
similarity index 98%
rename from core/src/test/scala/unit/kafka/log/LogTest.scala
rename to core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index af7cc16..7967134 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -45,7 +45,7 @@ import scala.collection.Map
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable.ListBuffer
 
-class LogTest {
+class UnifiedLogTest {
   var config: KafkaConfig = null
   val brokerTopicStats = new BrokerTopicStats
   val tmpDir = TestUtils.tempDir()
@@ -67,8 +67,8 @@ class LogTest {
 
   def createEmptyLogs(dir: File, offsets: Int*): Unit = {
     for(offset <- offsets) {
-      Log.logFile(dir, offset).createNewFile()
-      Log.offsetIndexFile(dir, offset).createNewFile()
+      UnifiedLog.logFile(dir, offset).createNewFile()
+      UnifiedLog.offsetIndexFile(dir, offset).createNewFile()
     }
   }
 
@@ -171,7 +171,7 @@ class LogTest {
   }
 
   private def testTruncateBelowFirstUnstableOffset(
-    truncateFunc: Log => (Long => Unit)
+    truncateFunc: UnifiedLog => (Long => Unit)
   ): Unit = {
     // Verify that truncation below the first unstable offset correctly
     // resets the producer state. Specifically we are testing the case when
@@ -266,7 +266,7 @@ class LogTest {
     assertHighWatermark(4L)
   }
 
-  private def assertNonEmptyFetch(log: Log, offset: Long, isolation: FetchIsolation): Unit = {
+  private def assertNonEmptyFetch(log: UnifiedLog, offset: Long, isolation: FetchIsolation): Unit = {
     val readInfo = log.read(startOffset = offset,
       maxLength = Int.MaxValue,
       isolation = isolation,
@@ -288,7 +288,7 @@ class LogTest {
     assertValidLogOffsetMetadata(log, readInfo.fetchOffsetMetadata)
   }
 
-  private def assertEmptyFetch(log: Log, offset: Long, isolation: FetchIsolation): Unit = {
+  private def assertEmptyFetch(log: UnifiedLog, offset: Long, isolation: FetchIsolation): Unit = {
     val readInfo = log.read(startOffset = offset,
       maxLength = Int.MaxValue,
       isolation = isolation,
@@ -463,8 +463,8 @@ class LogTest {
   @Test
   def testOffsetFromProducerSnapshotFile(): Unit = {
     val offset = 23423423L
-    val snapshotFile = Log.producerSnapshotFile(tmpDir, offset)
-    assertEquals(offset, Log.offsetFromFile(snapshotFile))
+    val snapshotFile = UnifiedLog.producerSnapshotFile(tmpDir, offset)
+    assertEquals(offset, UnifiedLog.offsetFromFile(snapshotFile))
   }
 
   /**
@@ -620,7 +620,7 @@ class LogTest {
   def testLogSegmentsCallCorrect(): Unit = {
     // Create 3 segments and make sure we get the right values from various logSegments calls.
     def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
-    def getSegmentOffsets(log :Log, from: Long, to: Long) = log.logSegments(from, to).map { _.baseOffset }
+    def getSegmentOffsets(log :UnifiedLog, from: Long, to: Long) = log.logSegments(from, to).map { _.baseOffset }
     val setSize = createRecords.sizeInBytes
     val msgPerSeg = 10
     val segmentSize = msgPerSeg * setSize  // each segment will be 10 messages
@@ -1017,7 +1017,7 @@ class LogTest {
    */
   @Test
   def testLoadingLogDeletesProducerStateSnapshotsPastLogEndOffset(): Unit = {
-    val straySnapshotFile = Log.producerSnapshotFile(logDir, 42).toPath
+    val straySnapshotFile = UnifiedLog.producerSnapshotFile(logDir, 42).toPath
     Files.createFile(straySnapshotFile)
     val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5, retentionBytes = -1, fileDeleteDelayMs = 0)
     createLog(logDir, logConfig)
@@ -2120,8 +2120,8 @@ class LogTest {
     log.deleteOldSegments()
 
     assertEquals(1, log.numberOfSegments, "Only one segment should remain.")
-    assertTrue(segments.forall(_.log.file.getName.endsWith(Log.DeletedFileSuffix)) &&
-      segments.forall(_.lazyOffsetIndex.file.getName.endsWith(Log.DeletedFileSuffix)),
+    assertTrue(segments.forall(_.log.file.getName.endsWith(UnifiedLog.DeletedFileSuffix)) &&
+      segments.forall(_.lazyOffsetIndex.file.getName.endsWith(UnifiedLog.DeletedFileSuffix)),
       "All log and index files should end in .deleted")
     assertTrue(segments.forall(_.log.file.exists) && segments.forall(_.lazyOffsetIndex.file.exists),
       "The .deleted files should still be there.")
@@ -2234,8 +2234,8 @@ class LogTest {
     assertEquals(Some(5), log.latestEpoch)
 
     // Ensure that after a directory rename, the epoch cache is written to the right location
-    val tp = Log.parseTopicPartitionName(log.dir)
-    log.renameDir(Log.logDeleteDirName(tp))
+    val tp = UnifiedLog.parseTopicPartitionName(log.dir)
+    log.renameDir(UnifiedLog.logDeleteDirName(tp))
     log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch = 10)
     assertEquals(Some(10), log.latestEpoch)
     assertTrue(LeaderEpochCheckpointFile.newFile(log.dir).exists())
@@ -2255,8 +2255,8 @@ class LogTest {
     assertEquals(Some(5), log.latestEpoch)
 
     // Ensure that after a directory rename, the partition metadata file is written to the right location.
-    val tp = Log.parseTopicPartitionName(log.dir)
-    log.renameDir(Log.logDeleteDirName(tp))
+    val tp = UnifiedLog.parseTopicPartitionName(log.dir)
+    log.renameDir(UnifiedLog.logDeleteDirName(tp))
     log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch = 10)
     assertEquals(Some(10), log.latestEpoch)
     assertTrue(PartitionMetadataFile.newFile(log.dir).exists())
@@ -2278,8 +2278,8 @@ class LogTest {
     log.partitionMetadataFile.record(topicId)
 
     // Ensure that after a directory rename, the partition metadata file is written to the right location.
-    val tp = Log.parseTopicPartitionName(log.dir)
-    log.renameDir(Log.logDeleteDirName(tp))
+    val tp = UnifiedLog.parseTopicPartitionName(log.dir)
+    log.renameDir(UnifiedLog.logDeleteDirName(tp))
     assertTrue(PartitionMetadataFile.newFile(log.dir).exists())
     assertFalse(PartitionMetadataFile.newFile(this.logDir).exists())
 
@@ -2351,14 +2351,14 @@ class LogTest {
     val (log, segmentWithOverflow) = createLogWithOffsetOverflow(logConfig)
     assertTrue(LogTestUtils.hasOffsetOverflow(log), "At least one segment must have offset overflow")
 
-    val allRecordsBeforeSplit = LogTest.allRecords(log)
+    val allRecordsBeforeSplit = UnifiedLogTest.allRecords(log)
 
     // split the segment with overflow
     log.splitOverflowedSegment(segmentWithOverflow)
 
     // assert we were successfully able to split the segment
     assertEquals(4, log.numberOfSegments)
-    LogTest.verifyRecordsInLog(log, allRecordsBeforeSplit)
+    UnifiedLogTest.verifyRecordsInLog(log, allRecordsBeforeSplit)
 
     // verify we do not have offset overflow anymore
     assertFalse(LogTestUtils.hasOffsetOverflow(log))
@@ -2395,8 +2395,8 @@ class LogTest {
   private def testDegenerateSplitSegmentWithOverflow(segmentBaseOffset: Long, records: List[MemoryRecords]): Unit = {
     val segment = LogTestUtils.rawSegment(logDir, segmentBaseOffset)
     // Need to create the offset files explicitly to avoid triggering segment recovery to truncate segment.
-    Log.offsetIndexFile(logDir, segmentBaseOffset).createNewFile()
-    Log.timeIndexFile(logDir, segmentBaseOffset).createNewFile()
+    UnifiedLog.offsetIndexFile(logDir, segmentBaseOffset).createNewFile()
+    UnifiedLog.timeIndexFile(logDir, segmentBaseOffset).createNewFile()
     records.foreach(segment.append _)
     segment.close()
 
@@ -2407,14 +2407,14 @@ class LogTest {
       throw new AssertionError("Failed to create log with a segment which has overflowed offsets")
     }
 
-    val allRecordsBeforeSplit = LogTest.allRecords(log)
+    val allRecordsBeforeSplit = UnifiedLogTest.allRecords(log)
     log.splitOverflowedSegment(segmentWithOverflow)
 
     assertEquals(1, log.numberOfSegments)
 
     val firstBatchBaseOffset = records.head.batches.asScala.head.baseOffset
     assertEquals(firstBatchBaseOffset, log.activeSegment.baseOffset)
-    LogTest.verifyRecordsInLog(log, allRecordsBeforeSplit)
+    UnifiedLogTest.verifyRecordsInLog(log, allRecordsBeforeSplit)
 
     assertFalse(LogTestUtils.hasOffsetOverflow(log))
   }
@@ -2515,7 +2515,7 @@ class LogTest {
     assertEquals(log.logStartOffset, 15)
   }
 
-  def epochCache(log: Log): LeaderEpochFileCache = {
+  def epochCache(log: UnifiedLog): LeaderEpochFileCache = {
     log.leaderEpochCache.get
   }
 
@@ -3004,7 +3004,7 @@ class LogTest {
     assertEquals(None, log.firstUnstableOffset)
   }
 
-  private def assertCachedFirstUnstableOffset(log: Log, expectedOffset: Long): Unit = {
+  private def assertCachedFirstUnstableOffset(log: UnifiedLog, expectedOffset: Long): Unit = {
     assertTrue(log.producerStateManager.firstUnstableOffset.isDefined)
     val firstUnstableOffset = log.producerStateManager.firstUnstableOffset.get
     assertEquals(expectedOffset, firstUnstableOffset.messageOffset)
@@ -3012,7 +3012,7 @@ class LogTest {
     assertValidLogOffsetMetadata(log, firstUnstableOffset)
   }
 
-  private def assertValidLogOffsetMetadata(log: Log, offsetMetadata: LogOffsetMetadata): Unit = {
+  private def assertValidLogOffsetMetadata(log: UnifiedLog, offsetMetadata: LogOffsetMetadata): Unit = {
     assertFalse(offsetMetadata.messageOffsetOnly)
 
     val segmentBaseOffset = offsetMetadata.segmentBaseOffset
@@ -3299,7 +3299,7 @@ class LogTest {
 
   @Test
   def testLoadPartitionDirWithNoSegmentsShouldNotThrow(): Unit = {
-    val dirName = Log.logDeleteDirName(new TopicPartition("foo", 3))
+    val dirName = UnifiedLog.logDeleteDirName(new TopicPartition("foo", 3))
     val logDir = new File(tmpDir, dirName)
     logDir.mkdirs()
     val logConfig = LogTestUtils.createLogConfig()
@@ -3344,7 +3344,7 @@ class LogTest {
     builder.close()
   }
 
-  private def appendAsFollower(log: Log, records: MemoryRecords, leaderEpoch: Int = 0): Unit = {
+  private def appendAsFollower(log: UnifiedLog, records: MemoryRecords, leaderEpoch: Int = 0): Unit = {
     records.batches.forEach(_.setPartitionLeaderEpoch(leaderEpoch))
     log.appendAsFollower(records)
   }
@@ -3360,12 +3360,12 @@ class LogTest {
                         producerIdExpirationCheckIntervalMs: Int = LogManager.ProducerIdExpirationCheckIntervalMs,
                         lastShutdownClean: Boolean = true,
                         topicId: Option[Uuid] = None,
-                        keepPartitionMetadataFile: Boolean = true): Log = {
+                        keepPartitionMetadataFile: Boolean = true): UnifiedLog = {
     LogTestUtils.createLog(dir, config, brokerTopicStats, scheduler, time, logStartOffset, recoveryPoint,
       maxProducerIdExpirationMs, producerIdExpirationCheckIntervalMs, lastShutdownClean, topicId = topicId, keepPartitionMetadataFile = keepPartitionMetadataFile)
   }
 
-  private def createLogWithOffsetOverflow(logConfig: LogConfig): (Log, LogSegment) = {
+  private def createLogWithOffsetOverflow(logConfig: LogConfig): (UnifiedLog, LogSegment) = {
     LogTestUtils.initializeLogDirWithOverflowedSegment(logDir)
 
     val log = createLog(logDir, logConfig, recoveryPoint = Long.MaxValue)
@@ -3377,8 +3377,8 @@ class LogTest {
   }
 }
 
-object LogTest {
-  def allRecords(log: Log): List[Record] = {
+object UnifiedLogTest {
+  def allRecords(log: UnifiedLog): List[Record] = {
     val recordsFound = ListBuffer[Record]()
     for (logSegment <- log.logSegments) {
       for (batch <- logSegment.log.batches.asScala) {
@@ -3388,7 +3388,7 @@ object LogTest {
     recordsFound.toList
   }
 
-  def verifyRecordsInLog(log: Log, expectedRecords: List[Record]): Unit = {
+  def verifyRecordsInLog(log: UnifiedLog, expectedRecords: List[Record]): Unit = {
     assertEquals(expectedRecords, allRecords(log))
   }
 }
diff --git a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
index 8339b7f..3f6271d 100644
--- a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
@@ -21,7 +21,7 @@ import java.util.Properties
 import java.util.concurrent.atomic.AtomicBoolean
 
 import kafka.cluster.Partition
-import kafka.log.{Log, LogManager}
+import kafka.log.{UnifiedLog, LogManager}
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.utils.TestUtils.MockAlterIsrManager
 import kafka.utils._
@@ -214,7 +214,7 @@ class IsrExpirationTest {
   }
 
   private def getPartitionWithAllReplicasInIsr(topic: String, partitionId: Int, time: Time, config: KafkaConfig,
-                                               localLog: Log): Partition = {
+                                               localLog: UnifiedLog): Partition = {
     val leaderId = config.brokerId
     val tp = new TopicPartition(topic, partitionId)
     val partition = replicaManager.createPartition(tp)
@@ -240,8 +240,8 @@ class IsrExpirationTest {
     partition
   }
 
-  private def logMock: Log = {
-    val log: Log = EasyMock.createMock(classOf[Log])
+  private def logMock: UnifiedLog = {
+    val log: UnifiedLog = EasyMock.createMock(classOf[UnifiedLog])
     EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
     EasyMock.expect(log.logEndOffsetMetadata).andReturn(LogOffsetMetadata(leaderLogEndOffset)).anyTimes()
     EasyMock.expect(log.logEndOffset).andReturn(leaderLogEndOffset).anyTimes()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
index 4b4a86b..58550bf 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
@@ -20,7 +20,7 @@ import java.io.File
 import java.nio.file.Files
 import java.util.Properties
 import kafka.common.{InconsistentBrokerMetadataException, InconsistentNodeIdException, KafkaException}
-import kafka.log.Log
+import kafka.log.UnifiedLog
 import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.test.TestUtils
@@ -174,7 +174,7 @@ class KafkaRaftServerTest {
     }
 
     // Create the metadata dir in the data directory
-    Files.createDirectory(new File(dataDir, Log.logDirName(KafkaRaftServer.MetadataPartition)).toPath)
+    Files.createDirectory(new File(dataDir, UnifiedLog.logDirName(KafkaRaftServer.MetadataPartition)).toPath)
 
     val configProperties = new Properties
     configProperties.put(KafkaConfig.ProcessRolesProp, "broker")
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 92f2f4d..78f03d9 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -17,7 +17,7 @@
 
 package kafka.server
 
-import kafka.log.{ClientRecordDeletion, Log, LogSegment}
+import kafka.log.{ClientRecordDeletion, UnifiedLog, LogSegment}
 import kafka.utils.{MockTime, TestUtils}
 import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic}
 import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse, ListOffsetsTopicResponse}
@@ -255,7 +255,7 @@ class LogOffsetTest extends BaseRequestTest {
    * a race condition) */
   @Test
   def testFetchOffsetsBeforeWithChangingSegmentSize(): Unit = {
-    val log: Log = EasyMock.niceMock(classOf[Log])
+    val log: UnifiedLog = EasyMock.niceMock(classOf[UnifiedLog])
     val logSegment: LogSegment = EasyMock.niceMock(classOf[LogSegment])
     EasyMock.expect(logSegment.size).andStubAnswer(new IAnswer[Int] {
       private val value = new AtomicInteger(0)
@@ -272,7 +272,7 @@ class LogOffsetTest extends BaseRequestTest {
    * different (simulating a race condition) */
   @Test
   def testFetchOffsetsBeforeWithChangingSegments(): Unit = {
-    val log: Log = EasyMock.niceMock(classOf[Log])
+    val log: UnifiedLog = EasyMock.niceMock(classOf[UnifiedLog])
     val logSegment: LogSegment = EasyMock.niceMock(classOf[LogSegment])
     EasyMock.expect(log.logSegments).andStubAnswer {
       new IAnswer[Iterable[LogSegment]] {
@@ -312,7 +312,7 @@ class LogOffsetTest extends BaseRequestTest {
       .partitions.asScala.find(_.partitionIndex == tp.partition).get
   }
 
-  private def createTopicAndGetLog(topic: String, topicPartition: TopicPartition): Log = {
+  private def createTopicAndGetLog(topic: String, topicPartition: TopicPartition): UnifiedLog = {
     createTopic(topic, 1, 1)
 
     val logManager = server.getLogManager
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index 6c3b123..6809c4d 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -20,7 +20,7 @@ import java.util.{Collections, Optional}
 
 import kafka.api.Request
 import kafka.cluster.{BrokerEndPoint, Partition}
-import kafka.log.{Log, LogManager}
+import kafka.log.{UnifiedLog, LogManager}
 import kafka.server.AbstractFetcherThread.ResultWithPartitions
 import kafka.server.QuotaFactory.UnboundedQuota
 import kafka.utils.{DelayedItem, TestUtils}
@@ -104,7 +104,7 @@ class ReplicaAlterLogDirsThreadTest {
     val partition = Mockito.mock(classOf[Partition])
     val replicaManager = Mockito.mock(classOf[ReplicaManager])
     val quotaManager = Mockito.mock(classOf[ReplicationQuotaManager])
-    val futureLog = Mockito.mock(classOf[Log])
+    val futureLog = Mockito.mock(classOf[UnifiedLog])
 
     val leaderEpoch = 5
     val logEndOffset = 0
@@ -202,7 +202,7 @@ class ReplicaAlterLogDirsThreadTest {
     val partition = Mockito.mock(classOf[Partition])
     val replicaManager = Mockito.mock(classOf[ReplicaManager])
     val quotaManager = Mockito.mock(classOf[ReplicationQuotaManager])
-    val futureLog = Mockito.mock(classOf[Log])
+    val futureLog = Mockito.mock(classOf[UnifiedLog])
 
     val leaderEpoch = 5
     val logEndOffset = 0
@@ -438,11 +438,11 @@ class ReplicaAlterLogDirsThreadTest {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
     val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
     val logManager: LogManager = createMock(classOf[LogManager])
-    val logT1p0: Log = createNiceMock(classOf[Log])
-    val logT1p1: Log = createNiceMock(classOf[Log])
+    val logT1p0: UnifiedLog = createNiceMock(classOf[UnifiedLog])
+    val logT1p1: UnifiedLog = createNiceMock(classOf[UnifiedLog])
     // one future replica mock because our mocking methods return same values for both future replicas
-    val futureLogT1p0: Log = createNiceMock(classOf[Log])
-    val futureLogT1p1: Log = createNiceMock(classOf[Log])
+    val futureLogT1p0: UnifiedLog = createNiceMock(classOf[UnifiedLog])
+    val futureLogT1p1: UnifiedLog = createNiceMock(classOf[UnifiedLog])
     val partitionT1p0: Partition = createMock(classOf[Partition])
     val partitionT1p1: Partition = createMock(classOf[Partition])
     val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
@@ -531,9 +531,9 @@ class ReplicaAlterLogDirsThreadTest {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
     val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
     val logManager: LogManager = createMock(classOf[LogManager])
-    val log: Log = createNiceMock(classOf[Log])
+    val log: UnifiedLog = createNiceMock(classOf[UnifiedLog])
     // one future replica mock because our mocking methods return same values for both future replicas
-    val futureLog: Log = createNiceMock(classOf[Log])
+    val futureLog: UnifiedLog = createNiceMock(classOf[UnifiedLog])
     val partition: Partition = createMock(classOf[Partition])
     val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
     val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit]  = EasyMock.newCapture()
@@ -618,8 +618,8 @@ class ReplicaAlterLogDirsThreadTest {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
     val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
     val logManager: LogManager = createMock(classOf[LogManager])
-    val log: Log = createNiceMock(classOf[Log])
-    val futureLog: Log = createNiceMock(classOf[Log])
+    val log: UnifiedLog = createNiceMock(classOf[UnifiedLog])
+    val futureLog: UnifiedLog = createNiceMock(classOf[UnifiedLog])
     val partition: Partition = createMock(classOf[Partition])
     val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
     val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit]  = EasyMock.newCapture()
@@ -672,8 +672,8 @@ class ReplicaAlterLogDirsThreadTest {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
     val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
     val logManager: LogManager = createMock(classOf[LogManager])
-    val log: Log = createNiceMock(classOf[Log])
-    val futureLog: Log = createNiceMock(classOf[Log])
+    val log: UnifiedLog = createNiceMock(classOf[UnifiedLog])
+    val futureLog: UnifiedLog = createNiceMock(classOf[UnifiedLog])
     val partition: Partition = createMock(classOf[Partition])
     val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
     val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit]  = EasyMock.newCapture()
@@ -762,8 +762,8 @@ class ReplicaAlterLogDirsThreadTest {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
     val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
     val logManager: LogManager = createMock(classOf[LogManager])
-    val log: Log = createNiceMock(classOf[Log])
-    val futureLog: Log = createNiceMock(classOf[Log])
+    val log: UnifiedLog = createNiceMock(classOf[UnifiedLog])
+    val futureLog: UnifiedLog = createNiceMock(classOf[UnifiedLog])
     val partition: Partition = createMock(classOf[Partition])
     val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
     val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit]  = EasyMock.newCapture()
@@ -825,8 +825,8 @@ class ReplicaAlterLogDirsThreadTest {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
     val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
     val logManager: LogManager = createMock(classOf[LogManager])
-    val log: Log = createNiceMock(classOf[Log])
-    val futureLog: Log = createNiceMock(classOf[Log])
+    val log: UnifiedLog = createNiceMock(classOf[UnifiedLog])
+    val futureLog: UnifiedLog = createNiceMock(classOf[UnifiedLog])
     val partition: Partition = createMock(classOf[Partition])
     val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
@@ -875,8 +875,8 @@ class ReplicaAlterLogDirsThreadTest {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
     val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
     val logManager: LogManager = createMock(classOf[LogManager])
-    val log: Log = createNiceMock(classOf[Log])
-    val futureLog: Log = createNiceMock(classOf[Log])
+    val log: UnifiedLog = createNiceMock(classOf[UnifiedLog])
+    val futureLog: UnifiedLog = createNiceMock(classOf[UnifiedLog])
     val partition: Partition = createMock(classOf[Partition])
     val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
@@ -940,7 +940,7 @@ class ReplicaAlterLogDirsThreadTest {
     assertFalse(partitionsWithError3.nonEmpty)
   }
 
-  def stub(logT1p0: Log, logT1p1: Log, futureLog: Log, partition: Partition,
+  def stub(logT1p0: UnifiedLog, logT1p1: UnifiedLog, futureLog: UnifiedLog, partition: Partition,
            replicaManager: ReplicaManager): IExpectationSetters[Option[Partition]] = {
     expect(replicaManager.localLog(t1p0)).andReturn(Some(logT1p0)).anyTimes()
     expect(replicaManager.localLogOrException(t1p0)).andReturn(logT1p0).anyTimes()
@@ -954,8 +954,8 @@ class ReplicaAlterLogDirsThreadTest {
     expect(replicaManager.onlinePartition(t1p1)).andReturn(Some(partition)).anyTimes()
   }
 
-  def stubWithFetchMessages(logT1p0: Log, logT1p1: Log, futureLog: Log, partition: Partition, replicaManager: ReplicaManager,
-          responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit]): IExpectationSetters[Unit] = {
+  def stubWithFetchMessages(logT1p0: UnifiedLog, logT1p1: UnifiedLog, futureLog: UnifiedLog, partition: Partition, replicaManager: ReplicaManager,
+                            responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit]): IExpectationSetters[Unit] = {
     stub(logT1p0, logT1p1, futureLog, partition, replicaManager)
     expect(replicaManager.fetchMessages(
       EasyMock.anyLong(),
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index ba1b40b..217b81e 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -18,7 +18,7 @@ package kafka.server
 
 import kafka.api.{ApiVersion, KAFKA_2_6_IV0}
 import kafka.cluster.{BrokerEndPoint, Partition}
-import kafka.log.{Log, LogAppendInfo, LogManager}
+import kafka.log.{UnifiedLog, LogAppendInfo, LogManager}
 import kafka.server.QuotaFactory.UnboundedQuota
 import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
 import kafka.utils.TestUtils
@@ -119,7 +119,7 @@ class ReplicaFetcherThreadTest {
     val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
     val logManager: LogManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val log: Log = createNiceMock(classOf[Log])
+    val log: UnifiedLog = createNiceMock(classOf[UnifiedLog])
     val partition: Partition = createMock(classOf[Partition])
     val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
@@ -268,7 +268,7 @@ class ReplicaFetcherThreadTest {
     //Setup all dependencies
     val logManager: LogManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val log: Log = createNiceMock(classOf[Log])
+    val log: UnifiedLog = createNiceMock(classOf[UnifiedLog])
     val partition: Partition = createMock(classOf[Partition])
     val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
@@ -332,7 +332,7 @@ class ReplicaFetcherThreadTest {
     val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
     val logManager: LogManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val log: Log = createNiceMock(classOf[Log])
+    val log: UnifiedLog = createNiceMock(classOf[UnifiedLog])
     val partition: Partition = createMock(classOf[Partition])
     val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
@@ -387,7 +387,7 @@ class ReplicaFetcherThreadTest {
     val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
     val logManager: LogManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val log: Log = createNiceMock(classOf[Log])
+    val log: UnifiedLog = createNiceMock(classOf[UnifiedLog])
     val partition: Partition = createMock(classOf[Partition])
     val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
@@ -444,7 +444,7 @@ class ReplicaFetcherThreadTest {
     val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
     val logManager: LogManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val log: Log = createNiceMock(classOf[Log])
+    val log: UnifiedLog = createNiceMock(classOf[UnifiedLog])
     val partition: Partition = createMock(classOf[Partition])
     val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
@@ -522,7 +522,7 @@ class ReplicaFetcherThreadTest {
     val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
     val logManager: LogManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val log: Log = createNiceMock(classOf[Log])
+    val log: UnifiedLog = createNiceMock(classOf[UnifiedLog])
     val partition: Partition = createNiceMock(classOf[Partition])
     val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
@@ -631,7 +631,7 @@ class ReplicaFetcherThreadTest {
     val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
     val logManager: LogManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val log: Log = createNiceMock(classOf[Log])
+    val log: UnifiedLog = createNiceMock(classOf[UnifiedLog])
     val partition: Partition = createMock(classOf[Partition])
     val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
@@ -697,7 +697,7 @@ class ReplicaFetcherThreadTest {
     val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
     val logManager: LogManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val log: Log = createNiceMock(classOf[Log])
+    val log: UnifiedLog = createNiceMock(classOf[UnifiedLog])
     val partition: Partition = createMock(classOf[Partition])
     val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
@@ -742,7 +742,7 @@ class ReplicaFetcherThreadTest {
     val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
     val logManager: LogManager = createMock(classOf[LogManager])
     val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val log: Log = createNiceMock(classOf[Log])
+    val log: UnifiedLog = createNiceMock(classOf[UnifiedLog])
     val partition: Partition = createMock(classOf[Partition])
     val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
@@ -803,7 +803,7 @@ class ReplicaFetcherThreadTest {
     val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
     val logManager: LogManager = createNiceMock(classOf[LogManager])
     val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val log: Log = createNiceMock(classOf[Log])
+    val log: UnifiedLog = createNiceMock(classOf[UnifiedLog])
     val partition: Partition = createMock(classOf[Partition])
     val replicaManager: ReplicaManager = createNiceMock(classOf[ReplicaManager])
 
@@ -858,7 +858,7 @@ class ReplicaFetcherThreadTest {
     val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
     val logManager: LogManager = createNiceMock(classOf[LogManager])
     val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val log: Log = createNiceMock(classOf[Log])
+    val log: UnifiedLog = createNiceMock(classOf[UnifiedLog])
     val partition: Partition = createMock(classOf[Partition])
     val replicaManager: ReplicaManager = createNiceMock(classOf[ReplicaManager])
 
@@ -975,7 +975,7 @@ class ReplicaFetcherThreadTest {
 
     val mockBlockingSend: BlockingSend = createNiceMock(classOf[BlockingSend])
 
-    val log: Log = createNiceMock(classOf[Log])
+    val log: UnifiedLog = createNiceMock(classOf[UnifiedLog])
 
     val partition: Partition = createNiceMock(classOf[Partition])
     expect(partition.localLogOrException).andReturn(log)
@@ -1019,7 +1019,7 @@ class ReplicaFetcherThreadTest {
     assertEquals(records.sizeInBytes(), brokerTopicStats.allTopicsStats.replicationBytesInRate.get.count())
   }
 
-  def stub(partition: Partition, replicaManager: ReplicaManager, log: Log): Unit = {
+  def stub(partition: Partition, replicaManager: ReplicaManager, log: UnifiedLog): Unit = {
     expect(replicaManager.localLogOrException(t1p0)).andReturn(log).anyTimes()
     expect(replicaManager.getPartitionOrException(t1p0)).andReturn(partition).anyTimes()
     expect(replicaManager.localLogOrException(t1p1)).andReturn(log).anyTimes()
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index c3a039a..74d68c6 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -21,7 +21,7 @@ import java.util.{Collections, Optional, Properties}
 import java.util.concurrent.atomic.AtomicBoolean
 
 import kafka.cluster.Partition
-import kafka.log.{Log, LogManager, LogOffsetSnapshot}
+import kafka.log.{UnifiedLog, LogManager, LogOffsetSnapshot}
 import kafka.utils._
 import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.common.metrics.Metrics
@@ -208,7 +208,7 @@ class ReplicaManagerQuotasTest {
     val scheduler: KafkaScheduler = createNiceMock(classOf[KafkaScheduler])
 
     //Create log which handles both a regular read and a 0 bytes read
-    val log: Log = createNiceMock(classOf[Log])
+    val log: UnifiedLog = createNiceMock(classOf[UnifiedLog])
     expect(log.logStartOffset).andReturn(0L).anyTimes()
     expect(log.logEndOffset).andReturn(20L).anyTimes()
     expect(log.highWatermark).andReturn(5).anyTimes()
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 90fc800..987b7c8 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -1707,7 +1707,7 @@ class ReplicaManagerTest {
     val tp = new TopicPartition(topic, topicPartition)
     val maxProducerIdExpirationMs = 30000
     val segments = new LogSegments(tp)
-    val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, tp, mockLogDirFailureChannel, logConfig.recordVersion, "")
+    val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(logDir, tp, mockLogDirFailureChannel, logConfig.recordVersion, "")
     val producerStateManager = new ProducerStateManager(tp, logDir, maxProducerIdExpirationMs, time)
     val offsets = LogLoader.load(LoadLogParams(
       logDir,
@@ -1725,7 +1725,7 @@ class ReplicaManagerTest {
       producerStateManager))
     val localLog = new LocalLog(logDir, logConfig, segments, offsets.recoveryPoint,
       offsets.nextOffsetMetadata, mockScheduler, time, tp, mockLogDirFailureChannel)
-    val mockLog = new Log(
+    val mockLog = new UnifiedLog(
       logStartOffset = offsets.logStartOffset,
       localLog = localLog,
       brokerTopicStats = mockBrokerTopicStats,
diff --git a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
index 97637b1..33776cf 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
@@ -20,7 +20,7 @@ package kafka.server.epoch
 import java.io.{File, RandomAccessFile}
 import java.util.Properties
 import kafka.api.ApiVersion
-import kafka.log.{Log, LogLoader}
+import kafka.log.{UnifiedLog, LogLoader}
 import kafka.server.KafkaConfig._
 import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.tools.DumpLogSegments
@@ -418,12 +418,12 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
   }
 
   private def getLogFile(broker: KafkaServer, partition: Int): File = {
-    val log: Log = getLog(broker, partition)
+    val log: UnifiedLog = getLog(broker, partition)
     log.flush()
     log.dir.listFiles.filter(_.getName.endsWith(".log"))(0)
   }
 
-  private def getLog(broker: KafkaServer, partition: Int): Log = {
+  private def getLog(broker: KafkaServer, partition: Int): UnifiedLog = {
     broker.logManager.getLog(new TopicPartition(topic, partition)).orNull
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
index c7dbf0a..37bb2a9 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
@@ -19,7 +19,7 @@ package kafka.server.epoch
 import java.io.File
 import java.util.concurrent.atomic.AtomicBoolean
 
-import kafka.log.{Log, LogManager}
+import kafka.log.{UnifiedLog, LogManager}
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.server._
 import kafka.utils.{MockTime, TestUtils}
@@ -58,7 +58,7 @@ class OffsetsForLeaderEpochTest {
     val request = Seq(newOffsetForLeaderTopic(tp, RecordBatch.NO_PARTITION_LEADER_EPOCH, epochRequested))
 
     //Stubs
-    val mockLog: Log = createNiceMock(classOf[Log])
+    val mockLog: UnifiedLog = createNiceMock(classOf[UnifiedLog])
     val logManager: LogManager = createNiceMock(classOf[LogManager])
     expect(mockLog.endOffsetForEpoch(epochRequested)).andReturn(Some(offsetAndEpoch))
     expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index 6a316e3..a8c5002 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -17,7 +17,7 @@
 
 package unit.kafka.server.metadata
 
-import kafka.log.Log
+import kafka.log.UnifiedLog
 import kafka.server.metadata.BrokerMetadataPublisher
 import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.image.{MetadataImageTest, TopicImage, TopicsImage}
@@ -108,8 +108,8 @@ class BrokerMetadataPublisherTest {
   private def mockLog(
     topicId: Uuid,
     topicPartition: TopicPartition
-  ): Log = {
-    val log = Mockito.mock(classOf[Log])
+  ): UnifiedLog = {
+    val log = Mockito.mock(classOf[UnifiedLog])
     Mockito.when(log.topicId).thenReturn(Some(topicId))
     Mockito.when(log.topicPartition).thenReturn(topicPartition)
     log
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index ea3304d..bd2aae8 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer
 import java.util
 import java.util.Properties
 
-import kafka.log.{AppendOrigin, Log, LogConfig, LogManager, LogTestUtils}
+import kafka.log.{AppendOrigin, UnifiedLog, LogConfig, LogManager, LogTestUtils}
 import kafka.server.{BrokerTopicStats, FetchLogEnd, LogDirFailureChannel}
 import kafka.tools.DumpLogSegments.TimeIndexDumpErrors
 import kafka.utils.{MockTime, TestUtils}
@@ -53,13 +53,13 @@ class DumpLogSegmentsTest {
   val time = new MockTime(0, 0)
 
   val batches = new ArrayBuffer[BatchInfo]
-  var log: Log = _
+  var log: UnifiedLog = _
 
   @BeforeEach
   def setUp(): Unit = {
     val props = new Properties
     props.setProperty(LogConfig.IndexIntervalBytesProp, "128")
-    log = Log(logDir, LogConfig(props), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
+    log = UnifiedLog(logDir, LogConfig(props), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
       time = time, brokerTopicStats = new BrokerTopicStats, maxProducerIdExpirationMs = 60 * 60 * 1000,
       producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
       logDirFailureChannel = new LogDirFailureChannel(10), topicId = None, keepPartitionMetadataFile = true)
diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
index ab61683..0f23519 100644
--- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
@@ -19,7 +19,7 @@ package kafka.utils
 import java.util.Properties
 import java.util.concurrent.atomic._
 import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
-import kafka.log.{LoadLogParams, LocalLog, Log, LogConfig, LogLoader, LogManager, LogSegments, ProducerStateManager}
+import kafka.log.{LoadLogParams, LocalLog, UnifiedLog, LogConfig, LogLoader, LogManager, LogSegments, ProducerStateManager}
 import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
 import kafka.utils.TestUtils.retry
 import org.junit.jupiter.api.Assertions._
@@ -119,10 +119,10 @@ class SchedulerTest {
     val logConfig = LogConfig(new Properties())
     val brokerTopicStats = new BrokerTopicStats
     val maxProducerIdExpirationMs = 60 * 60 * 1000
-    val topicPartition = Log.parseTopicPartitionName(logDir)
+    val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
     val logDirFailureChannel = new LogDirFailureChannel(10)
     val segments = new LogSegments(topicPartition)
-    val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, logConfig.recordVersion, "")
+    val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, logConfig.recordVersion, "")
     val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxProducerIdExpirationMs, mockTime)
     val offsets = LogLoader.load(LoadLogParams(
       logDir,
@@ -140,7 +140,7 @@ class SchedulerTest {
       producerStateManager))
     val localLog = new LocalLog(logDir, logConfig, segments, offsets.recoveryPoint,
       offsets.nextOffsetMetadata, scheduler, mockTime, topicPartition, logDirFailureChannel)
-    val log = new Log(logStartOffset = offsets.logStartOffset,
+    val log = new UnifiedLog(logStartOffset = offsets.logStartOffset,
       localLog = localLog,
       brokerTopicStats, LogManager.ProducerIdExpirationCheckIntervalMs,
       leaderEpochCache, producerStateManager,
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index bd9c02c..3202d6c 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1243,7 +1243,7 @@ object TestUtils extends Logging {
         topicPartitions.forall { tp =>
           !Arrays.asList(new File(logDir).list()).asScala.exists { partitionDirectoryName =>
             partitionDirectoryName.startsWith(tp.topic + "-" + tp.partition) &&
-              partitionDirectoryName.endsWith(Log.DeleteDirSuffix)
+              partitionDirectoryName.endsWith(UnifiedLog.DeleteDirSuffix)
           }
         }
       }

Mime
View raw message