kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6650: Allowing transition to OfflineReplica state for replicas without leadership info (#4825)
Date Tue, 17 Apr 2018 00:16:12 GMT
This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 341db99  KAFKA-6650: Allowing transition to OfflineReplica state for replicas without
leadership info (#4825)
341db99 is described below

commit 341db990dc4e2acb207622c7fa254f07650742bc
Author: gitlw <lucasatucla@gmail.com>
AuthorDate: Mon Apr 16 17:16:08 2018 -0700

    KAFKA-6650: Allowing transition to OfflineReplica state for replicas without leadership
info (#4825)
    
    A partially deleted topic can end up with some partitions having no leadership info.
    For the partially deleted topic, a new controller should be able to finish the topic deletion
    by transitioning the rogue partition's replicas to OfflineReplica state.
    This patch adds logic to transition replicas to OfflineReplica state whose partitions
have
    no leadership info.
    
    Added a new test method to cover the partially deleted topic case.
    
    Reviewers: Jun Rao <junrao@gmail.com>
---
 .../kafka/controller/ReplicaStateMachine.scala     | 11 +++++-
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   |  2 +-
 .../scala/unit/kafka/admin/DeleteTopicTest.scala   | 43 +++++++++++++++++++---
 .../kafka/controller/ReplicaStateMachineTest.scala |  2 +-
 4 files changed, 48 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index a2d04e6..5fafcc4 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -202,8 +202,10 @@ class ReplicaStateMachine(config: KafkaConfig,
           controllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(replicaId), replica.topicPartition,
             deletePartition = false, (_, _) => ())
         }
-        val replicasToRemoveFromIsr = validReplicas.filter(replica => controllerContext.partitionLeadershipInfo.contains(replica.topicPartition))
-        val updatedLeaderIsrAndControllerEpochs = removeReplicasFromIsr(replicaId, replicasToRemoveFromIsr.map(_.topicPartition))
+        val (replicasWithLeadershipInfo, replicasWithoutLeadershipInfo) = validReplicas.partition
{ replica =>
+          controllerContext.partitionLeadershipInfo.contains(replica.topicPartition)
+        }
+        val updatedLeaderIsrAndControllerEpochs = removeReplicasFromIsr(replicaId, replicasWithLeadershipInfo.map(_.topicPartition))
         updatedLeaderIsrAndControllerEpochs.foreach { case (partition, leaderIsrAndControllerEpoch)
=>
           if (!topicDeletionManager.isPartitionToBeDeleted(partition)) {
             val recipients = controllerContext.partitionReplicaAssignment(partition).filterNot(_
== replicaId)
@@ -216,6 +218,11 @@ class ReplicaStateMachine(config: KafkaConfig,
           logSuccessfulTransition(replicaId, partition, replicaState(replica), OfflineReplica)
           replicaState.put(replica, OfflineReplica)
         }
+
+        replicasWithoutLeadershipInfo.foreach { replica =>
+          logSuccessfulTransition(replicaId, replica.topicPartition, replicaState(replica),
OfflineReplica)
+          replicaState.put(replica, OfflineReplica)
+        }
       case ReplicaDeletionStarted =>
         validReplicas.foreach { replica =>
           logSuccessfulTransition(replicaId, replica.topicPartition, replicaState(replica),
ReplicaDeletionStarted)
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 9b58fc7..a65128a 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -1370,7 +1370,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure:
Boolean
    * @return true if path gets deleted successfully, false if root path doesn't exist
    * @throws KeeperException if there is an error while deleting the znodes
    */
-  private[zk] def deleteRecursive(path: String): Boolean = {
+  def deleteRecursive(path: String): Boolean = {
     val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(path))
     getChildrenResponse.resultCode match {
       case Code.OK =>
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index ef455d4..4c033c4 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -17,7 +17,7 @@
 package kafka.admin
 
 import kafka.log.Log
-import kafka.zk.ZooKeeperTestHarness
+import kafka.zk.{TopicPartitionZNode, ZooKeeperTestHarness}
 import kafka.utils.TestUtils
 import kafka.server.{KafkaConfig, KafkaServer}
 import org.junit.Assert._
@@ -326,7 +326,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     brokerConfigs.head.setProperty("log.segment.bytes","100")
     brokerConfigs.head.setProperty("log.cleaner.dedupe.buffer.size","1048577")
 
-    servers = createTestTopicAndCluster(topic,brokerConfigs)
+    servers = createTestTopicAndCluster(topic, brokerConfigs, expectedReplicaAssignment)
 
     // for simplicity, we are validating cleaner offsets on a single broker
     val server = servers.head
@@ -363,18 +363,18 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
   }
 
-  private def createTestTopicAndCluster(topic: String, deleteTopicEnabled: Boolean = true):
Seq[KafkaServer] = {
+  private def createTestTopicAndCluster(topic: String, deleteTopicEnabled: Boolean = true,
replicaAssignment: Map[Int, List[Int]] = expectedReplicaAssignment): Seq[KafkaServer] = {
     val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, enableControlledShutdown
= false)
     brokerConfigs.foreach(_.setProperty("delete.topic.enable", deleteTopicEnabled.toString))
-    createTestTopicAndCluster(topic, brokerConfigs)
+    createTestTopicAndCluster(topic, brokerConfigs, replicaAssignment)
   }
 
-  private def createTestTopicAndCluster(topic: String, brokerConfigs: Seq[Properties]): Seq[KafkaServer]
= {
+  private def createTestTopicAndCluster(topic: String, brokerConfigs: Seq[Properties], replicaAssignment:
Map[Int, List[Int]]): Seq[KafkaServer] = {
     val topicPartition = new TopicPartition(topic, 0)
     // create brokers
     val servers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
     // create the topic
-    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, replicaAssignment)
     // wait until replica log is created on every broker
     TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined),
       "Replicas for topic test not created")
@@ -408,4 +408,35 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     val leaderIdOpt = zkClient.getLeaderForPartition(new TopicPartition(topic, 0))
     assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined)
   }
+
+  @Test
+  def testDeletingPartiallyDeletedTopic() {
+    /**
+      * A previous controller could have deleted some partitions of a topic from ZK, but
not all partitions, and then crashed.
+      * In that case, the new controller should be able to handle the partially deleted topic,
and finish the deletion.
+      */
+
+    val replicaAssignment = Map(0 -> List(0, 1, 2), 1 -> List(0, 1, 2))
+    val topic = "test"
+    servers = createTestTopicAndCluster(topic, true, replicaAssignment)
+
+    /**
+      * shutdown all brokers in order to create a partially deleted topic on ZK
+      */
+    servers.foreach(_.shutdown())
+
+    /**
+      * delete the partition znode at /brokers/topics/test/partition/0
+      * to simulate the case that a previous controller crashed right after deleting the
partition znode
+      */
+    zkClient.deleteRecursive(TopicPartitionZNode.path(new TopicPartition(topic, 0)))
+    adminZkClient.deleteTopic(topic)
+
+    /**
+      * start up all brokers and verify that topic deletion eventually finishes.
+      */
+    servers.foreach(_.startup())
+    TestUtils.waitUntilTrue(() => servers.exists(_.kafkaController.isActive), "No controller
is elected")
+    TestUtils.verifyTopicDeletion(zkClient, topic, 2, servers)
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
index 6a961a5..14d2df2 100644
--- a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
@@ -119,7 +119,7 @@ class ReplicaStateMachineTest extends JUnitSuite {
     EasyMock.replay(mockControllerBrokerRequestBatch)
     replicaStateMachine.handleStateChanges(replicas, OfflineReplica)
     EasyMock.verify(mockControllerBrokerRequestBatch)
-    assertEquals(NewReplica, replicaState(replica))
+    assertEquals(OfflineReplica, replicaState(replica))
   }
 
   @Test

-- 
To stop receiving notification emails like this one, please contact
junrao@apache.org.

Mime
View raw message