kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] 02/02: KAFKA-8813: Refresh log config if it's updated before initialization (#7305)
Date Tue, 15 Oct 2019 18:30:14 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 50d9b0c9111f13c91ae9fd6b525977e90db158fa
Author: Vikas Singh <vikas@confluent.io>
AuthorDate: Tue Oct 15 11:22:26 2019 -0700

    KAFKA-8813: Refresh log config if it's updated before initialization (#7305)
    
    A partition log in initialized in following steps:
    
    1. Fetch log config from ZK
    2. Call LogManager.getOrCreateLog which creates the Log object, then
    3. Registers the Log object
    
    Step #3 enables Configuration update thread to deliver configuration
    updates to the log. But if any update arrives between step #1 and #3
    then that update is missed. It breaks following use case:
    
    1. Create a topic with default configuration, and immediately after that
    2. Update the configuration of topic
    
    There is a race condition here and in random cases update made in
    second step will get dropped.
    
    This change fixes it by tracking updates arriving between step #1 and #3
    Once a Partition is done initializing log, it checks if it has missed any
    update. If yes, then the configuration is read from ZK again.
    
    Added unit tests to make sure a dirty configuration is refreshed. Tested
    on local cluster to make sure that topic configuration and updates are
    handled correctly.
    
    Reviewers: Jason Gustafson <jason@confluent.io>
---
 core/src/main/scala/kafka/cluster/Partition.scala  |  31 ++++--
 core/src/main/scala/kafka/log/Log.scala            |  13 ++-
 core/src/main/scala/kafka/log/LogManager.scala     |  55 ++++++++++-
 .../main/scala/kafka/server/ConfigHandler.scala    |  19 ++--
 .../scala/kafka/server/DynamicBrokerConfig.scala   |  20 ++--
 .../scala/unit/kafka/cluster/PartitionTest.scala   | 105 ++++++++++++++++++++-
 .../test/scala/unit/kafka/log/LogManagerTest.scala |  98 +++++++++++++++++++
 core/src/test/scala/unit/kafka/log/LogTest.scala   |   4 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     |  13 ++-
 9 files changed, 318 insertions(+), 40 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index c3282b7..5372258 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -313,17 +313,28 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  private def createLog(replicaId: Int, isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints:
OffsetCheckpoints): Log = {
-    val props = stateStore.fetchTopicConfig()
-    val config = LogConfig.fromProps(logManager.currentDefaultConfig.originals, props)
-    val log = logManager.getOrCreateLog(topicPartition, config, isNew, isFutureReplica)
-    val checkpointHighWatermark = offsetCheckpoints.fetch(log.dir.getParent, topicPartition).getOrElse
{
-      info(s"No checkpointed highwatermark is found for partition $topicPartition")
-      0L
+  // Visible for testing
+  private[cluster] def createLog(replicaId: Int, isNew: Boolean, isFutureReplica: Boolean,
offsetCheckpoints: OffsetCheckpoints): Log = {
+    val fetchLogConfig = () => {
+      val props = stateStore.fetchTopicConfig()
+      LogConfig.fromProps(logManager.currentDefaultConfig.originals, props)
+    }
+
+    logManager.initializingLog(topicPartition)
+    var maybeLog: Option[Log] = None
+    try {
+      val log = logManager.getOrCreateLog(topicPartition, fetchLogConfig(), isNew, isFutureReplica)
+      val checkpointHighWatermark = offsetCheckpoints.fetch(log.dir.getParent, topicPartition).getOrElse
{
+        info(s"No checkpointed highwatermark is found for partition $topicPartition")
+        0L
+      }
+      val initialHighWatermark = log.updateHighWatermark(checkpointHighWatermark)
+      info(s"Log loaded for partition $topicPartition with initial high watermark $initialHighWatermark")
+      maybeLog = Some(log)
+      log
+    } finally {
+      logManager.finishedInitializingLog(topicPartition, maybeLog, fetchLogConfig)
     }
-    val initialHighWatermark = log.updateHighWatermark(checkpointHighWatermark)
-    info(s"Log loaded for partition $topicPartition with initial high watermark $initialHighWatermark")
-    log
   }
 
   def getReplica(replicaId: Int): Option[Replica] = Option(remoteReplicasMap.get(replicaId))
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 94997a1..5e04c3c 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -243,16 +243,15 @@ class Log(@volatile var dir: File,
       0
   }
 
-  def updateConfig(updatedKeys: Set[String], newConfig: LogConfig): Unit = {
+  def updateConfig(newConfig: LogConfig): Unit = {
     val oldConfig = this.config
     this.config = newConfig
-    if (updatedKeys.contains(LogConfig.MessageFormatVersionProp)) {
-      val oldRecordVersion = oldConfig.messageFormatVersion.recordVersion
-      val newRecordVersion = newConfig.messageFormatVersion.recordVersion
-      if (newRecordVersion.precedes(oldRecordVersion))
-        warn(s"Record format version has been downgraded from $oldRecordVersion to $newRecordVersion.")
+    val oldRecordVersion = oldConfig.messageFormatVersion.recordVersion
+    val newRecordVersion = newConfig.messageFormatVersion.recordVersion
+    if (newRecordVersion.precedes(oldRecordVersion))
+      warn(s"Record format version has been downgraded from $oldRecordVersion to $newRecordVersion.")
+    if (newRecordVersion.value != oldRecordVersion.value)
       initializeLeaderEpochCache()
-    }
   }
 
   private def checkIfMemoryMappedBufferClosed(): Unit = {
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index c90ea7a..075ba74 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -82,6 +82,13 @@ class LogManager(logDirs: Seq[File],
   @volatile private var _currentDefaultConfig = initialDefaultConfig
   @volatile private var numRecoveryThreadsPerDataDir = recoveryThreadsPerDataDir
 
+  // This map contains all partitions whose logs are getting loaded and initialized. If log
configuration
+  // of these partitions get updated at the same time, the corresponding entry in this map
is set to "true",
+  // which triggers a config reload after initialization is finished (to get the latest config
value).
+  // See KAFKA-8813 for more detail on the race condition
+  // Visible for testing
+  private[log] val partitionsInitializing = new ConcurrentHashMap[TopicPartition, Boolean]().asScala
+
   def reconfigureDefaultLogConfig(logConfig: LogConfig): Unit = {
     this._currentDefaultConfig = logConfig
   }
@@ -660,6 +667,48 @@ class LogManager(logDirs: Seq[File],
   }
 
   /**
+   * Method to indicate that logs are getting initialized for the partition passed in as
argument.
+   * This method should always be followed by [[kafka.log.LogManager#finishedInitializingLog]]
to indicate that log
+   * initialization is done.
+   */
+  def initializingLog(topicPartition: TopicPartition): Unit = {
+    partitionsInitializing(topicPartition) = false
+  }
+
+  /**
+   * Mark the partition configuration for all partitions that are getting initialized for
topic
+   * as dirty. That will result in reloading of configuration once initialization is done.
+   */
+  def topicConfigUpdated(topic: String): Unit = {
+    partitionsInitializing.keys.filter(_.topic() == topic).foreach {
+      topicPartition => partitionsInitializing.replace(topicPartition, false, true)
+    }
+  }
+
+  /**
+   * Mark all in progress partitions having dirty configuration if broker configuration is
updated.
+   */
+  def brokerConfigUpdated(): Unit = {
+    partitionsInitializing.keys.foreach {
+      topicPartition => partitionsInitializing.replace(topicPartition, false, true)
+    }
+  }
+
+  /**
+   * Method to indicate that the log initialization for the partition passed in as argument
is
+   * finished. This method should follow a call to [[kafka.log.LogManager#initializingLog]]
+   */
+  def finishedInitializingLog(topicPartition: TopicPartition,
+                              maybeLog: Option[Log],
+                              fetchLogConfig: () => LogConfig): Unit = {
+    if (partitionsInitializing(topicPartition)) {
+      maybeLog.foreach(_.updateConfig(fetchLogConfig()))
+    }
+
+    partitionsInitializing -= topicPartition
+  }
+
+  /**
    * If the log already exists, just return a copy of the existing log
    * Otherwise if isNew=true or if there is no offline log directory, create a log for the
given topic and the given partition
    * Otherwise throw KafkaStorageException
@@ -955,9 +1004,9 @@ class LogManager(logDirs: Seq[File],
   def allLogs: Iterable[Log] = currentLogs.values ++ futureLogs.values
 
   def logsByTopic(topic: String): Seq[Log] = {
-    (currentLogs.toList ++ futureLogs.toList).filter { case (topicPartition, _) =>
-      topicPartition.topic() == topic
-    }.map { case (_, log) => log }
+    (currentLogs.toList ++ futureLogs.toList).collect {
+      case (topicPartition, log) if topicPartition.topic == topic => log
+    }
   }
 
   /**
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
index a80d0f5..2395dcd 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -50,11 +50,11 @@ trait ConfigHandler {
   */
 class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaConfig, val
quotas: QuotaManagers, kafkaController: KafkaController) extends ConfigHandler with Logging
 {
 
-  def processConfigChanges(topic: String, topicConfig: Properties): Unit = {
-    // Validate the configurations.
-    val configNamesToExclude = excludedConfigs(topic, topicConfig)
-
-    val logs = logManager.logsByTopic(topic).toBuffer
+  private def updateLogConfig(topic: String,
+                              topicConfig: Properties,
+                              configNamesToExclude: Set[String]): Unit = {
+    logManager.topicConfigUpdated(topic)
+    val logs = logManager.logsByTopic(topic)
     if (logs.nonEmpty) {
       /* combine the default properties with the overrides in zk to create the new LogConfig
*/
       val props = new Properties()
@@ -62,8 +62,15 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig:
KafkaC
         if (!configNamesToExclude.contains(key)) props.put(key, value)
       }
       val logConfig = LogConfig.fromProps(logManager.currentDefaultConfig.originals, props)
-      logs.foreach(_.updateConfig(topicConfig.asScala.keySet, logConfig))
+      logs.foreach(_.updateConfig(logConfig))
     }
+  }
+
+  def processConfigChanges(topic: String, topicConfig: Properties): Unit = {
+    // Validate the configurations.
+    val configNamesToExclude = excludedConfigs(topic, topicConfig)
+
+    updateLogConfig(topic, topicConfig, configNamesToExclude)
 
     def updateThrottledList(prop: String, quotaManager: ReplicationQuotaManager) = {
       if (topicConfig.containsKey(prop) && topicConfig.getProperty(prop).length >
0) {
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index b517875..bf3d988 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -612,6 +612,18 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaServer) extends
Brok
     // validation, no additional validation is performed.
   }
 
+  private def updateLogsConfig(newBrokerDefaults: Map[String, Object]): Unit = {
+    logManager.brokerConfigUpdated()
+    logManager.allLogs.foreach { log =>
+      val props = mutable.Map.empty[Any, Any]
+      props ++= newBrokerDefaults
+      props ++= log.config.originals.asScala.filterKeys(log.config.overriddenConfigs.contains)
+
+      val logConfig = LogConfig(props.asJava)
+      log.updateConfig(logConfig)
+    }
+  }
+
   override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
     val currentLogConfig = logManager.currentDefaultConfig
     val origUncleanLeaderElectionEnable = logManager.currentDefaultConfig.uncleanLeaderElectionEnable
@@ -626,14 +638,8 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaServer) extends
Brok
 
     logManager.reconfigureDefaultLogConfig(LogConfig(newBrokerDefaults))
 
-    logManager.allLogs.foreach { log =>
-      val props = mutable.Map.empty[Any, Any]
-      props ++= newBrokerDefaults.asScala
-      props ++= log.config.originals.asScala.filterKeys(log.config.overriddenConfigs.contains)
+    updateLogsConfig(newBrokerDefaults.asScala)
 
-      val logConfig = LogConfig(props.asJava)
-      log.updateConfig(newBrokerDefaults.asScala.keySet, logConfig)
-    }
     if (logManager.currentDefaultConfig.uncleanLeaderElectionEnable && !origUncleanLeaderElectionEnable)
{
       server.kafkaController.enableDefaultUncleanLeaderElection()
     }
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 8e812eb..6204174 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -40,7 +40,7 @@ import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.{EpochEndOffset, IsolationLevel, ListOffsetRequest}
 import org.junit.{After, Before, Test}
 import org.junit.Assert._
-import org.mockito.Mockito.{doNothing, mock, when}
+import org.mockito.Mockito.{doAnswer, doNothing, mock, spy, times, verify, when}
 import org.scalatest.Assertions.assertThrows
 import org.mockito.ArgumentMatchers
 import org.mockito.invocation.InvocationOnMock
@@ -1545,6 +1545,108 @@ class PartitionTest {
     assertEquals(Set(), Metrics.defaultRegistry().allMetrics().asScala.keySet.filter(_.getType
== "Partition"))
   }
 
+  /**
+   * Test when log is getting initialized, its config remains untouched after initialization
is done.
+   */
+  @Test
+  def testLogConfigNotDirty(): Unit = {
+    val spyLogManager = spy(logManager)
+    val partition = new Partition(topicPartition,
+      replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
+      interBrokerProtocolVersion = ApiVersion.latestVersion,
+      localBrokerId = brokerId,
+      time,
+      stateStore,
+      delayedOperations,
+      metadataCache,
+      spyLogManager)
+
+    partition.createLog(brokerId, isNew = true, isFutureReplica = false, offsetCheckpoints)
+
+    // Validate that initializingLog and finishedInitializingLog was called
+    verify(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))
+    verify(spyLogManager).finishedInitializingLog(ArgumentMatchers.eq(topicPartition),
+      ArgumentMatchers.any(),
+      ArgumentMatchers.any()) // This doesn't get evaluated, but needed to satisfy compilation
+
+    // We should get config from ZK only once
+    verify(stateStore).fetchTopicConfig()
+  }
+
+  /**
+   * Test when log is getting initialized, its config remains gets reloaded if Topic config
gets changed
+   * before initialization is done.
+   */
+  @Test
+  def testLogConfigDirtyAsTopicUpdated(): Unit = {
+    val spyLogManager = spy(logManager)
+    doAnswer(new Answer[Unit] {
+      def answer(invocation: InvocationOnMock): Unit = {
+        logManager.initializingLog(topicPartition)
+        logManager.topicConfigUpdated(topicPartition.topic())
+      }
+    }).when(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))
+
+    val partition = new Partition(topicPartition,
+      replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
+      interBrokerProtocolVersion = ApiVersion.latestVersion,
+      localBrokerId = brokerId,
+      time,
+      stateStore,
+      delayedOperations,
+      metadataCache,
+      spyLogManager)
+
+    partition.createLog(brokerId, isNew = true, isFutureReplica = false, offsetCheckpoints)
+
+    // Validate that initializingLog and finishedInitializingLog was called
+    verify(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))
+    verify(spyLogManager).finishedInitializingLog(ArgumentMatchers.eq(topicPartition),
+      ArgumentMatchers.any(),
+      ArgumentMatchers.any()) // This doesn't get evaluated, but needed to satisfy compilation
+
+    // We should get config from ZK twice, once before log is created, and second time once
+    // we find log config is dirty and refresh it.
+    verify(stateStore, times(2)).fetchTopicConfig()
+  }
+
+  /**
+   * Test when log is getting initialized, its config remains gets reloaded if Broker config
gets changed
+   * before initialization is done.
+   */
+  @Test
+  def testLogConfigDirtyAsBrokerUpdated(): Unit = {
+    val spyLogManager = spy(logManager)
+    doAnswer(new Answer[Unit] {
+      def answer(invocation: InvocationOnMock): Unit = {
+        logManager.initializingLog(topicPartition)
+        logManager.brokerConfigUpdated()
+      }
+    }).when(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))
+
+    val partition = new Partition(topicPartition,
+      replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
+      interBrokerProtocolVersion = ApiVersion.latestVersion,
+      localBrokerId = brokerId,
+      time,
+      stateStore,
+      delayedOperations,
+      metadataCache,
+      spyLogManager)
+
+    partition.createLog(brokerId, isNew = true, isFutureReplica = false, offsetCheckpoints)
+
+    // Validate that initializingLog and finishedInitializingLog was called
+    verify(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))
+    verify(spyLogManager).finishedInitializingLog(ArgumentMatchers.eq(topicPartition),
+      ArgumentMatchers.any(),
+      ArgumentMatchers.any()) // This doesn't get evaluated, but needed to satisfy compilation
+
+    // We should get config from ZK twice, once before log is created, and second time once
+    // we find log config is dirty and refresh it.
+    verify(stateStore, times(2)).fetchTopicConfig()
+  }
+
   private def seedLogData(log: Log, numRecords: Int, leaderEpoch: Int): Unit = {
     for (i <- 0 until numRecords) {
       val records = MemoryRecords.withRecords(0L, CompressionType.NONE, leaderEpoch,
@@ -1552,5 +1654,4 @@ class PartitionTest {
       log.appendAsLeader(records, leaderEpoch)
     }
   }
-
 }
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 91f2a40..a7c17f4 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -26,6 +26,7 @@ import kafka.utils._
 import org.apache.kafka.common.errors.OffsetOutOfRangeException
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 import org.mockito.ArgumentMatchers.any
@@ -468,4 +469,101 @@ class LogManagerTest {
     log.read(offset, maxLength, isolation = FetchLogEnd, minOneMessage = true)
   }
 
+  /**
+   * Test when a configuration of a topic is updated while its log is getting initialized,
+   * the config is refreshed when log initialization is finished.
+   */
+  @Test
+  def testTopicConfigChangeUpdatesLogConfig(): Unit = {
+    val testTopicOne = "test-topic-one"
+    val testTopicTwo = "test-topic-two"
+    val testTopicOnePartition: TopicPartition = new TopicPartition(testTopicOne, 1)
+    val testTopicTwoPartition: TopicPartition = new TopicPartition(testTopicTwo, 1)
+    val mockLog: Log = EasyMock.mock(classOf[Log])
+
+    logManager.initializingLog(testTopicOnePartition)
+    logManager.initializingLog(testTopicTwoPartition)
+
+    logManager.topicConfigUpdated(testTopicOne)
+
+    val logConfig: LogConfig = null
+    var configUpdated = false
+    logManager.finishedInitializingLog(testTopicOnePartition, Some(mockLog), () => {
+      configUpdated = true
+      logConfig
+    })
+    assertTrue(configUpdated)
+
+    var configNotUpdated = true
+    logManager.finishedInitializingLog(testTopicTwoPartition, Some(mockLog), () => {
+      configNotUpdated = false
+      logConfig
+    })
+    assertTrue(configNotUpdated)
+  }
+
+  /**
+   * Test if an error occurs when creating log, log manager removes corresponding
+   * topic partition from the list of initializing partitions.
+   */
+  @Test
+  def testConfigChangeGetsCleanedUp(): Unit = {
+    val testTopicPartition: TopicPartition = new TopicPartition("test-topic", 1)
+
+    logManager.initializingLog(testTopicPartition)
+
+    val logConfig: LogConfig = null
+    var configUpdateNotCalled = true
+    logManager.finishedInitializingLog(testTopicPartition, None, () => {
+      configUpdateNotCalled = false
+      logConfig
+    })
+
+    assertTrue(logManager.partitionsInitializing.isEmpty)
+    assertTrue(configUpdateNotCalled)
+  }
+
+  /**
+   * Test when a broker configuration change happens all logs in process of initialization
+   * pick up latest config when finished with initialization.
+   */
+  @Test
+  def testBrokerConfigChangeDeliveredToAllLogs(): Unit = {
+    val testTopicOne = "test-topic-one"
+    val testTopicTwo = "test-topic-two"
+    val testTopicOnePartition: TopicPartition = new TopicPartition(testTopicOne, 1)
+    val testTopicTwoPartition: TopicPartition = new TopicPartition(testTopicTwo, 1)
+    val mockLog: Log = EasyMock.mock(classOf[Log])
+
+    logManager.initializingLog(testTopicOnePartition)
+    logManager.initializingLog(testTopicTwoPartition)
+
+    logManager.brokerConfigUpdated()
+
+    val logConfig: LogConfig = null
+    var totalChanges = 0
+    logManager.finishedInitializingLog(testTopicOnePartition, Some(mockLog), () => {
+      totalChanges += 1
+      logConfig
+    })
+    logManager.finishedInitializingLog(testTopicTwoPartition, Some(mockLog), () => {
+      totalChanges += 1
+      logConfig
+    })
+
+    assertEquals(2, totalChanges)
+  }
+
+  /**
+   * Test even if no log is getting initialized, if config change events are delivered
+   * things continue to work correctly. This test should not throw.
+   *
+   * This makes sure that events can be delivered even when no log is getting initialized.
+   */
+  @Test
+  def testConfigChangesWithNoLogGettingInitialized(): Unit = {
+    logManager.brokerConfigUpdated()
+    logManager.topicConfigUpdated("test-topic")
+    assertTrue(logManager.partitionsInitializing.isEmpty)
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index c5de28e..fdf40c6 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -2523,7 +2523,7 @@ class LogTest {
 
     val downgradedLogConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes
= 1,
       maxMessageBytes = 64 * 1024, messageFormatVersion = kafka.api.KAFKA_0_10_2_IV0.shortVersion)
-    log.updateConfig(Set(LogConfig.MessageFormatVersionProp), downgradedLogConfig)
+    log.updateConfig(downgradedLogConfig)
     assertLeaderEpochCacheEmpty(log)
 
     log.appendAsLeader(TestUtils.records(List(new SimpleRecord("bar".getBytes())),
@@ -2542,7 +2542,7 @@ class LogTest {
 
     val upgradedLogConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes
= 1,
       maxMessageBytes = 64 * 1024, messageFormatVersion = kafka.api.KAFKA_0_11_0_IV0.shortVersion)
-    log.updateConfig(Set(LogConfig.MessageFormatVersionProp), upgradedLogConfig)
+    log.updateConfig(upgradedLogConfig)
     log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch
= 5)
     assertEquals(Some(5), log.latestEpoch)
   }
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 438dedf..484ee2d 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -53,6 +53,7 @@ import org.apache.kafka.common.{Node, TopicPartition}
 import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
+import org.mockito.ArgumentMatchers
 
 import scala.collection.JavaConverters._
 import scala.collection.{Map, Seq}
@@ -972,15 +973,21 @@ class ReplicaManagerTest {
     }
 
     // Expect to call LogManager.truncateTo exactly once
+    val topicPartitionObj = new TopicPartition(topic, topicPartition)
     val mockLogMgr: LogManager = EasyMock.createMock(classOf[LogManager])
     EasyMock.expect(mockLogMgr.liveLogDirs).andReturn(config.logDirs.map(new File(_).getAbsoluteFile)).anyTimes
     EasyMock.expect(mockLogMgr.currentDefaultConfig).andReturn(LogConfig())
-    EasyMock.expect(mockLogMgr.getOrCreateLog(new TopicPartition(topic, topicPartition),
+    EasyMock.expect(mockLogMgr.getOrCreateLog(topicPartitionObj,
       LogConfig(), isNew = false, isFuture = false)).andReturn(mockLog).anyTimes
     if (expectTruncation) {
-      EasyMock.expect(mockLogMgr.truncateTo(Map(new TopicPartition(topic, topicPartition)
-> offsetFromLeader),
+      EasyMock.expect(mockLogMgr.truncateTo(Map(topicPartitionObj -> offsetFromLeader),
         isFuture = false)).once
     }
+    EasyMock.expect(mockLogMgr.initializingLog(topicPartitionObj)).anyTimes
+
+    EasyMock.expect(mockLogMgr.finishedInitializingLog(
+      EasyMock.eq(topicPartitionObj), EasyMock.anyObject(), EasyMock.anyObject())).anyTimes
+
     EasyMock.replay(mockLogMgr)
 
     val aliveBrokerIds = Seq[Integer](followerBrokerId, leaderBrokerId)
@@ -1015,7 +1022,7 @@ class ReplicaManagerTest {
 
     // Mock network client to show leader offset of 5
     val quota = QuotaFactory.instantiate(config, metrics, time, "")
-    val blockingSend = new ReplicaFetcherMockBlockingSend(Map(new TopicPartition(topic, topicPartition)
->
+    val blockingSend = new ReplicaFetcherMockBlockingSend(Map(topicPartitionObj ->
       new EpochEndOffset(leaderEpochFromLeader, offsetFromLeader)).asJava, BrokerEndPoint(1,
"host1" ,1), time)
     val replicaManager = new ReplicaManager(config, metrics, time, kafkaZkClient, mockScheduler,
mockLogMgr,
       new AtomicBoolean(false), quota, mockBrokerTopicStats,


Mime
View raw message