kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lind...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6949; alterReplicaLogDirs() should grab partition lock when accessing log of the future replica
Date Tue, 26 Jun 2018 06:39:32 GMT
This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9ea81ba  KAFKA-6949; alterReplicaLogDirs() should grab partition lock when accessing
log of the future replica
9ea81ba is described below

commit 9ea81baf34113ccb05244ea9fb63199e7f071bb0
Author: Dong Lin <lindong28@gmail.com>
AuthorDate: Mon Jun 25 23:39:02 2018 -0700

    KAFKA-6949; alterReplicaLogDirs() should grab partition lock when accessing log of the
future replica
    
    NoSuchElementException will be thrown if ReplicaAlterDirThread replaces the current replica
with future replica right before the request handler thread executes `futureReplica.log.get.dir.getParent`
in the ReplicaManager.alterReplicaLogDirs(). The solution is to grab the partition lock when
request handler thread attempts to check the destination log directory of the future replica.
    
    Author: Dong Lin <lindong28@gmail.com>
    
    Reviewers: Jun Rao <junrao@gmail.com>
    
    Closes #5081 from lindong28/KAFKA-6949
---
 core/src/main/scala/kafka/cluster/Partition.scala  | 68 ++++++++++++++++------
 .../main/scala/kafka/server/ReplicaManager.scala   | 17 +++---
 .../scala/unit/kafka/cluster/PartitionTest.scala   | 54 +++++++++++++++--
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  2 +-
 4 files changed, 110 insertions(+), 31 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 9ab1ec4..b80c344 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -149,10 +149,10 @@ class Partition(val topic: String,
     * @return true iff the future replica is created
     */
   def maybeCreateFutureReplica(logDir: String): Boolean = {
-    // The readLock is needed to make sure that while the caller checks the log directory
of the
+    // The writeLock is needed to make sure that while the caller checks the log directory
of the
     // current replica and the existence of the future replica, no other thread can update
the log directory of the
     // current replica or remove the future replica.
-    inReadLock(leaderIsrUpdateLock) {
+    inWriteLock(leaderIsrUpdateLock) {
       val currentReplica = getReplica().get
       if (currentReplica.log.get.dir.getParent == logDir)
         false
@@ -207,29 +207,52 @@ class Partition(val topic: String,
     allReplicasMap.remove(replicaId)
   }
 
-  def removeFutureLocalReplica() {
+  def futureReplicaDirChanged(newDestinationDir: String): Boolean = {
+    inReadLock(leaderIsrUpdateLock) {
+      getReplica(Request.FutureLocalReplicaId) match {
+        case Some(futureReplica) =>
+          if (futureReplica.log.get.dir.getParent != newDestinationDir)
+            true
+          else
+            false
+        case None => false
+      }
+    }
+  }
+
+  def removeFutureLocalReplica(deleteFromLogDir: Boolean = true) {
     inWriteLock(leaderIsrUpdateLock) {
       allReplicasMap.remove(Request.FutureLocalReplicaId)
+      if (deleteFromLogDir)
+        logManager.asyncDelete(topicPartition, isFuture = true)
     }
   }
 
-  // Return true iff the future log has caught up with the current log for this partition
+  // Return true iff the future replica exists and it has caught up with the current replica
for this partition
   // Only ReplicaAlterDirThread will call this method and ReplicaAlterDirThread should remove
the partition
   // from its partitionStates if this method returns true
   def maybeReplaceCurrentWithFutureReplica(): Boolean = {
     val replica = getReplica().get
-    val futureReplica = getReplica(Request.FutureLocalReplicaId).get
-    if (replica.logEndOffset == futureReplica.logEndOffset) {
+    val futureReplicaLEO = getReplica(Request.FutureLocalReplicaId).map(_.logEndOffset)
+    if (futureReplicaLEO.contains(replica.logEndOffset)) {
       // The write lock is needed to make sure that while ReplicaAlterDirThread checks the
LEO of the
       // current replica, no other thread can update LEO of the current replica via log truncation
or log append operation.
       inWriteLock(leaderIsrUpdateLock) {
-        if (replica.logEndOffset == futureReplica.logEndOffset) {
-          logManager.replaceCurrentWithFutureLog(topicPartition)
-          replica.log = futureReplica.log
-          futureReplica.log = None
-          allReplicasMap.remove(Request.FutureLocalReplicaId)
-          true
-        } else false
+        getReplica(Request.FutureLocalReplicaId) match {
+          case Some(futureReplica) =>
+            if (replica.logEndOffset == futureReplica.logEndOffset) {
+              logManager.replaceCurrentWithFutureLog(topicPartition)
+              replica.log = futureReplica.log
+              futureReplica.log = None
+              allReplicasMap.remove(Request.FutureLocalReplicaId)
+              true
+            } else false
+          case None =>
+            // Future replica is removed by a non-ReplicaAlterLogDirsThread before this method
is called
+            // In this case the partition should have been removed from state of the ReplicaAlterLogDirsThread
+            // Return false so that ReplicaAlterLogDirsThread does not have to remove this
partition from the state again to avoid race condition
+            false
+        }
       }
     } else false
   }
@@ -550,15 +573,22 @@ class Partition(val topic: String,
   }
 
   private def doAppendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture:
Boolean): Unit = {
-      if (isFuture)
-        getReplicaOrException(Request.FutureLocalReplicaId).log.get.appendAsFollower(records)
-      else {
-        // The read lock is needed to prevent the follower replica from being updated while
ReplicaAlterDirThread
-        // is executing maybeDeleteAndSwapFutureReplica() to replace follower replica with
the future replica.
+    inReadLock(leaderIsrUpdateLock) {
+      if (isFuture) {
+        // The read lock is needed to handle race condition if request handler thread tries
to
+        // remove future replica after receiving AlterReplicaLogDirsRequest.
         inReadLock(leaderIsrUpdateLock) {
-           getReplicaOrException().log.get.appendAsFollower(records)
+          getReplica(Request.FutureLocalReplicaId) match {
+            case Some(replica) => replica.log.get.appendAsFollower(records)
+            case None => // Future replica is removed by a non-ReplicaAlterLogDirsThread
before this method is called
+          }
         }
+      } else {
+        // The read lock is needed to prevent the follower replica from being updated while
ReplicaAlterDirThread
+        // is executing maybeDeleteAndSwapFutureReplica() to replace follower replica with
the future replica.
+        getReplicaOrException().log.get.appendAsFollower(records)
       }
+    }
   }
 
   def appendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: Boolean) {
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 965595b..ed9559f 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -577,14 +577,17 @@ class ReplicaManager(val config: KafkaConfig,
           if (!logManager.isLogDirOnline(destinationDir))
             throw new KafkaStorageException(s"Log directory $destinationDir is offline")
 
-          // Stop current replica movement if the destinationDir is different from the existing
destination log directory
-          getReplica(topicPartition, Request.FutureLocalReplicaId) match {
-            case Some(futureReplica) =>
-              if (futureReplica.log.get.dir.getParent != destinationDir) {
+          getPartition(topicPartition) match {
+            case Some(partition) =>
+              if (partition eq ReplicaManager.OfflinePartition)
+                throw new KafkaStorageException(s"Partition $topicPartition is offline")
+
+              // Stop current replica movement if the destinationDir is different from the
existing destination log directory
+              if (partition.futureReplicaDirChanged(destinationDir)) {
                 replicaAlterLogDirsManager.removeFetcherForPartitions(Set(topicPartition))
-                getPartition(topicPartition).get.removeFutureLocalReplica()
-                logManager.asyncDelete(topicPartition, isFuture = true)
+                partition.removeFutureLocalReplica()
               }
+
             case None =>
           }
 
@@ -1418,7 +1421,7 @@ class ReplicaManager(val config: KafkaConfig,
       replicaFetcherManager.removeFetcherForPartitions(newOfflinePartitions)
       replicaAlterLogDirsManager.removeFetcherForPartitions(newOfflinePartitions ++ partitionsWithOfflineFutureReplica.map(_.topicPartition))
 
-      partitionsWithOfflineFutureReplica.foreach(partition => partition.removeFutureLocalReplica())
+      partitionsWithOfflineFutureReplica.foreach(partition => partition.removeFutureLocalReplica(deleteFromLogDir
= false))
       newOfflinePartitions.foreach { topicPartition =>
         val partition = allPartitions.put(topicPartition, ReplicaManager.OfflinePartition)
         partition.removePartitionMetrics()
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index fe9038a..41bdefd 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -19,8 +19,10 @@ package kafka.cluster
 import java.io.File
 import java.nio.ByteBuffer
 import java.util.Properties
+import java.util.concurrent.CountDownLatch
 import java.util.concurrent.atomic.AtomicBoolean
 
+import kafka.api.Request
 import kafka.common.UnexpectedAppendOffsetException
 import kafka.log.{LogConfig, LogManager, CleanerConfig}
 import kafka.server._
@@ -44,7 +46,8 @@ class PartitionTest {
   val metrics = new Metrics
 
   var tmpDir: File = _
-  var logDir: File = _
+  var logDir1: File = _
+  var logDir2: File = _
   var replicaManager: ReplicaManager = _
   var logManager: LogManager = _
   var logConfig: LogConfig = _
@@ -58,13 +61,14 @@ class PartitionTest {
     logConfig = LogConfig(logProps)
 
     tmpDir = TestUtils.tempDir()
-    logDir = TestUtils.randomPartitionLogDir(tmpDir)
+    logDir1 = TestUtils.randomPartitionLogDir(tmpDir)
+    logDir2 = TestUtils.randomPartitionLogDir(tmpDir)
     logManager = TestUtils.createLogManager(
-      logDirs = Seq(logDir), defaultConfig = logConfig, CleanerConfig(enableCleaner = false),
time)
+      logDirs = Seq(logDir1, logDir2), defaultConfig = logConfig, CleanerConfig(enableCleaner
= false), time)
     logManager.startup()
 
     val brokerProps = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect)
-    brokerProps.put("log.dir", logDir.getAbsolutePath)
+    brokerProps.put(KafkaConfig.LogDirsProp, Seq(logDir1, logDir2).map(_.getAbsolutePath).mkString(","))
     val brokerConfig = KafkaConfig.fromProps(brokerProps)
     replicaManager = new ReplicaManager(
       config = brokerConfig, metrics, time, zkClient = null, new MockScheduler(time),
@@ -84,6 +88,48 @@ class PartitionTest {
   }
 
   @Test
+  // Verify that partition.removeFutureLocalReplica() and partition.maybeReplaceCurrentWithFutureReplica()
can run concurrently
+  def testMaybeReplaceCurrentWithFutureReplica(): Unit = {
+    val latch = new CountDownLatch(1)
+
+    logManager.maybeUpdatePreferredLogDir(topicPartition, logDir1.getAbsolutePath)
+    val log1 = logManager.getOrCreateLog(topicPartition, logConfig)
+    logManager.maybeUpdatePreferredLogDir(topicPartition, logDir2.getAbsolutePath)
+    val log2 = logManager.getOrCreateLog(topicPartition, logConfig, isFuture = true)
+    val currentReplica = new Replica(brokerId, topicPartition, time, log = Some(log1))
+    val futureReplica = new Replica(Request.FutureLocalReplicaId, topicPartition, time, log
= Some(log2))
+    val partition = new Partition(topicPartition.topic, topicPartition.partition, time, replicaManager)
+
+    partition.addReplicaIfNotExists(futureReplica)
+    partition.addReplicaIfNotExists(currentReplica)
+    assertEquals(Some(currentReplica), partition.getReplica(brokerId))
+    assertEquals(Some(futureReplica), partition.getReplica(Request.FutureLocalReplicaId))
+
+    val thread1 = new Thread {
+      override def run(): Unit = {
+        latch.await()
+        partition.removeFutureLocalReplica()
+      }
+    }
+
+    val thread2 = new Thread {
+      override def run(): Unit = {
+        latch.await()
+        partition.maybeReplaceCurrentWithFutureReplica()
+      }
+    }
+
+    thread1.start()
+    thread2.start()
+
+    latch.countDown()
+    thread1.join()
+    thread2.join()
+    assertEquals(None, partition.getReplica(Request.FutureLocalReplicaId))
+  }
+
+
+  @Test
   def testAppendRecordsAsFollowerBelowLogStartOffset(): Unit = {
     val log = logManager.getOrCreateLog(topicPartition, logConfig)
     val replica = new Replica(brokerId, topicPartition, time, log = Some(log))
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index da87c30..d2aae2c 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -929,7 +929,7 @@ object TestUtils extends Logging {
                        defaultConfig: LogConfig = LogConfig(),
                        cleanerConfig: CleanerConfig = CleanerConfig(enableCleaner = false),
                        time: MockTime = new MockTime()): LogManager = {
-    new LogManager(logDirs = logDirs,
+    new LogManager(logDirs = logDirs.map(_.getAbsoluteFile),
                    initialOfflineDirs = Array.empty[File],
                    topicConfigs = Map(),
                    initialDefaultConfig = defaultConfig,


Mime
View raw message