kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mimai...@apache.org
Subject [kafka] branch trunk updated: KAFKA-10314: KafkaStorageException on reassignment when offline log directories exist (#9122)
Date Fri, 04 Sep 2020 15:36:00 GMT
This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 23cade8  KAFKA-10314: KafkaStorageException on reassignment when offline log directories
exist (#9122)
23cade8 is described below

commit 23cade850678eeae1bc1227a2a2a4bc09b02f2fc
Author: Noa Resare <noa@resare.com>
AuthorDate: Fri Sep 4 16:34:31 2020 +0100

    KAFKA-10314: KafkaStorageException on reassignment when offline log directories exist
(#9122)
    
    Make sure that we set the isNew field in LeaderAndIsrRequest correctly for brokers
    that gets added to the replica set on reassignment.
    
    This is tested by creating a variant of ControllerIntergationTest.testPartitionReassignment()
    that makes one of the log directories on the target broker offline before initiating the
    reassignment. Without the change to the way isNew is set, this fails after a timeout.
With
    the change, it succeeds.
    
    To facilitate calling causeLogDirFailure() both from ControllerIntegrationTest and
    LogDirFailureTest, the method was moved to TestUtils along with the other helper
    methods that deals with interacting with KafkaServer instances for test cases.
    
    Reviewers: Mickael Maison <mickael.maison@gmail.com>
---
 .../scala/kafka/controller/KafkaController.scala   |  9 ++++-
 .../controller/ControllerIntegrationTest.scala     | 43 +++++++++++++++++++++-
 .../unit/kafka/server/LogDirFailureTest.scala      | 38 ++-----------------
 .../test/scala/unit/kafka/utils/TestUtils.scala    | 33 ++++++++++++++++-
 4 files changed, 83 insertions(+), 40 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 368af84..1b01953 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -875,8 +875,15 @@ class KafkaController(val config: KafkaConfig,
       case Some(updatedLeaderIsrAndControllerEpoch) =>
         try {
           brokerRequestBatch.newBatch()
-          brokerRequestBatch.addLeaderAndIsrRequestForBrokers(assignment.replicas, topicPartition,
+          // the isNew flag, when set to true, makes sure that when a replica possibly resided
+          // in a logDir that is offline, we refrain from just creating a new replica in
a good
+          // logDir. This is exactly the behavior we want for the original replicas, but
not
+          // for the replicas we add in this reassignment. For new replicas, want to be able
+          // to assign to one of the good logDirs.
+          brokerRequestBatch.addLeaderAndIsrRequestForBrokers(assignment.originReplicas,
topicPartition,
             updatedLeaderIsrAndControllerEpoch, assignment, isNew = false)
+          brokerRequestBatch.addLeaderAndIsrRequestForBrokers(assignment.addingReplicas,
topicPartition,
+            updatedLeaderIsrAndControllerEpoch, assignment, isNew = true)
           brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
         } catch {
           case e: IllegalStateException =>
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index b84addd..4fa0184 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -308,6 +308,44 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
   }
 
   @Test
+  def testPartitionReassignmentToBrokerWithOfflineLogDir(): Unit = {
+    servers = makeServers(2, logDirCount = 2)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+
+    val metricName = s"kafka.controller:type=ControllerStats,name=${ControllerState.AlterPartitionReassignment.rateAndTimeMetricName.get}"
+    val timerCount = timer(metricName).count
+
+    val otherBroker = servers.filter(_.config.brokerId != controllerId).head
+    val otherBrokerId = otherBroker.config.brokerId
+
+    // To have an offline log dir, we need a topicPartition assigned to it
+    val topicPartitionToPutOffline = new TopicPartition("filler", 0)
+    TestUtils.createTopic(
+      zkClient,
+      topicPartitionToPutOffline.topic,
+      partitionReplicaAssignment = Map(topicPartitionToPutOffline.partition -> Seq(otherBrokerId)),
+      servers = servers
+    )
+
+    TestUtils.causeLogDirFailure(TestUtils.Checkpoint, otherBroker, topicPartitionToPutOffline)
+
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(controllerId))
+    val reassignment = Map(tp -> ReplicaAssignment(Seq(otherBrokerId), List(), List()))
+    TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers
= servers)
+    zkClient.createPartitionReassignment(reassignment.map { case (k, v) => k -> v.replicas
})
+    waitForPartitionState(tp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch
+ 3,
+      "with an offline log directory on the target broker, the partition reassignment stalls")
+    TestUtils.waitUntilTrue(() =>  zkClient.getFullReplicaAssignmentForTopics(Set(tp.topic))
== reassignment,
+      "failed to get updated partition assignment on topic znode after partition reassignment")
+    TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress,
+      "failed to remove reassign partitions path after completion")
+
+    val updatedTimerCount = timer(metricName).count
+    assertTrue(s"Timer count $updatedTimerCount should be greater than $timerCount", updatedTimerCount
> timerCount)
+  }
+
+  @Test
   def testPartitionReassignmentWithOfflineReplicaHaltingProgress(): Unit = {
     servers = makeServers(2)
     val controllerId = TestUtils.waitUntilControllerElected(zkClient)
@@ -769,8 +807,9 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
                           enableControlledShutdown: Boolean = true,
                           listeners : Option[String] = None,
                           listenerSecurityProtocolMap : Option[String] = None,
-                          controlPlaneListenerName : Option[String] = None) = {
-    val configs = TestUtils.createBrokerConfigs(numConfigs, zkConnect, enableControlledShutdown
= enableControlledShutdown)
+                          controlPlaneListenerName : Option[String] = None,
+                          logDirCount: Int = 1) = {
+    val configs = TestUtils.createBrokerConfigs(numConfigs, zkConnect, enableControlledShutdown
= enableControlledShutdown, logDirCount = logDirCount)
     configs.foreach { config =>
       config.setProperty(KafkaConfig.AutoLeaderRebalanceEnableProp, autoLeaderRebalanceEnable.toString)
       config.setProperty(KafkaConfig.UncleanLeaderElectionEnableProp, uncleanLeaderElectionEnable.toString)
diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
index 28d60c6..aa2a847 100644
--- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.{ExecutionException, TimeUnit}
 
 import kafka.api.IntegrationTestHarness
 import kafka.controller.{OfflineReplica, PartitionAndReplica}
-import kafka.server.LogDirFailureTest._
+import kafka.utils.TestUtils.{Checkpoint, LogDirFailureType, Roll}
 import kafka.utils.{CoreUtils, Exit, TestUtils}
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
@@ -151,7 +151,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
     val leaderServerId = producer.partitionsFor(topic).asScala.find(_.partition() == 0).get.leader().id()
     val leaderServer = servers.find(_.config.brokerId == leaderServerId).get
 
-    causeLogDirFailure(failureType, leaderServer, partition)
+    TestUtils.causeLogDirFailure(failureType, leaderServer, partition)
 
     // send() should fail due to either KafkaStorageException or NotLeaderOrFollowerException
     try {
@@ -183,7 +183,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
     producer.send(record).get()
     TestUtils.consumeRecords(consumer, 1)
 
-    causeLogDirFailure(failureType, leaderServer, partition)
+    TestUtils.causeLogDirFailure(failureType, leaderServer, partition)
 
     TestUtils.waitUntilTrue(() => {
       // ProduceResponse may contain KafkaStorageException and trigger metadata update
@@ -206,31 +206,6 @@ class LogDirFailureTest extends IntegrationTestHarness {
     assertTrue(offlineReplicas.contains(PartitionAndReplica(new TopicPartition(topic, 0),
leaderServerId)))
   }
 
-  private def causeLogDirFailure(failureType: LogDirFailureType,
-                                 leaderServer: KafkaServer,
-                                 partition: TopicPartition): Unit = {
-    // Make log directory of the partition on the leader broker inaccessible by replacing
it with a file
-    val localLog = leaderServer.replicaManager.localLogOrException(partition)
-    val logDir = localLog.dir.getParentFile
-    CoreUtils.swallow(Utils.delete(logDir), this)
-    logDir.createNewFile()
-    assertTrue(logDir.isFile)
-
-    if (failureType == Roll) {
-      try {
-        leaderServer.replicaManager.getLog(partition).get.roll()
-        fail("Log rolling should fail with KafkaStorageException")
-      } catch {
-        case e: KafkaStorageException => // This is expected
-      }
-    } else if (failureType == Checkpoint) {
-      leaderServer.replicaManager.checkpointHighWatermarks()
-    }
-
-    // Wait for ReplicaHighWatermarkCheckpoint to happen so that the log directory of the
topic will be offline
-    TestUtils.waitUntilTrue(() => !leaderServer.logManager.isLogDirOnline(logDir.getAbsolutePath),
"Expected log directory offline", 3000L)
-    assertTrue(leaderServer.replicaManager.localLog(partition).isEmpty)
-  }
 
   private def subscribeAndWaitForAssignment(topic: String, consumer: KafkaConsumer[Array[Byte],
Array[Byte]]): Unit = {
     consumer.subscribe(Collections.singletonList(topic))
@@ -238,10 +213,3 @@ class LogDirFailureTest extends IntegrationTestHarness {
   }
 
 }
-
-object LogDirFailureTest {
-  sealed trait LogDirFailureType
-  case object Roll extends LogDirFailureType
-  case object Checkpoint extends LogDirFailureType
-}
-
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 7f8e16c..21f2bc3 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -45,7 +45,7 @@ import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding,
AclBindingFilter}
 import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
+import org.apache.kafka.common.errors.{KafkaStorageException, UnknownTopicOrPartitionException}
 import org.apache.kafka.common.header.Header
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.network.{ListenerName, Mode}
@@ -53,7 +53,7 @@ import org.apache.kafka.common.record._
 import org.apache.kafka.common.resource.ResourcePattern
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer,
Deserializer, IntegerSerializer, Serializer}
-import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.utils.Utils._
 import org.apache.kafka.common.{KafkaFuture, TopicPartition}
 import org.apache.kafka.server.authorizer.{Authorizer => JAuthorizer}
@@ -96,6 +96,10 @@ object TestUtils extends Logging {
   private val committedValue : Array[Byte] = "committed".getBytes(StandardCharsets.UTF_8)
   private val abortedValue : Array[Byte] = "aborted".getBytes(StandardCharsets.UTF_8)
 
+  sealed trait LogDirFailureType
+  case object Roll extends LogDirFailureType
+  case object Checkpoint extends LogDirFailureType
+
   /**
    * Create a temporary directory
    */
@@ -1140,6 +1144,31 @@ object TestUtils extends Logging {
     ), "Failed to hard-delete the delete directory")
   }
 
+
+  def causeLogDirFailure(failureType: LogDirFailureType, leaderServer: KafkaServer, partition:
TopicPartition): Unit = {
+    // Make log directory of the partition on the leader broker inaccessible by replacing
it with a file
+    val localLog = leaderServer.replicaManager.localLogOrException(partition)
+    val logDir = localLog.dir.getParentFile
+    CoreUtils.swallow(Utils.delete(logDir), this)
+    logDir.createNewFile()
+    assertTrue(logDir.isFile)
+
+    if (failureType == Roll) {
+      try {
+        leaderServer.replicaManager.getLog(partition).get.roll()
+        fail("Log rolling should fail with KafkaStorageException")
+      } catch {
+        case e: KafkaStorageException => // This is expected
+      }
+    } else if (failureType == Checkpoint) {
+      leaderServer.replicaManager.checkpointHighWatermarks()
+    }
+
+    // Wait for ReplicaHighWatermarkCheckpoint to happen so that the log directory of the
topic will be offline
+    TestUtils.waitUntilTrue(() => !leaderServer.logManager.isLogDirOnline(logDir.getAbsolutePath),
"Expected log directory offline", 3000L)
+    assertTrue(leaderServer.replicaManager.localLog(partition).isEmpty)
+  }
+
   /**
    * Translate the given buffer into a string
    *


Mime
View raw message