kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7537: Avoid sending full UpdateMetadataRequest to existing brokers in the cluster on broker changes to reduce controller memory footprint (#5869)
Date Tue, 06 Nov 2018 23:29:07 GMT
This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7b5ffa0   KAFKA-7537: Avoid sending full UpdateMetadataRequest to existing brokers
in the cluster on broker changes to reduce controller memory footprint (#5869)
7b5ffa0 is described below

commit 7b5ffa0a070065e5e8320f481bbd8a3a26378f91
Author: Zhanxiang (Patrick) Huang <hzxa21@hotmail.com>
AuthorDate: Tue Nov 6 15:28:53 2018 -0800

     KAFKA-7537: Avoid sending full UpdateMetadataRequest to existing brokers in the cluster
on broker changes to reduce controller memory footprint (#5869)
    
    This PR avoids sending out full UpdateMetadataReuqest in the following scenarios:
    
    1. On broker startup, send out full UpdateMetadataRequest to newly added brokers and only
send out UpdateMetadataReuqest with empty partition states to existing brokers.
    2. On broker failure, if it doesn't require leader election, only include the states of
partitions that are hosted by the dead broker(s) in the UpdateMetadataReuqest instead of including
all partition states.
    
    This PR also introduces a minor optimization in the MetadataCache update to avoid copying
the previous partition states upon receiving UpdateMetadataRequest with no partition states.
    
    Reviewers: Jun Rao <junrao@gmail.com>
---
 .../controller/ControllerChannelManager.scala      |  7 +--
 .../scala/kafka/controller/KafkaController.scala   | 21 +++++----
 .../main/scala/kafka/server/MetadataCache.scala    | 44 ++++++++++--------
 .../controller/ControllerIntegrationTest.scala     | 53 ++++++++++++++++++++++
 4 files changed, 90 insertions(+), 35 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 85da8b8..a11f553 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -383,13 +383,8 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge
       }
     }
 
-    val givenPartitions = if (partitions.isEmpty)
-      controllerContext.partitionLeadershipInfo.keySet
-    else
-      partitions
-
     updateMetadataRequestBrokerSet ++= brokerIds.filter(_ >= 0)
-    givenPartitions.foreach(partition => updateMetadataRequestPartitionInfo(partition,
+    partitions.foreach(partition => updateMetadataRequestPartitionInfo(partition,
       beingDeleted = controller.topicDeletionManager.topicsToBeDeleted.contains(partition.topic)))
   }
 
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 740ab7f..a52f3f0 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -254,7 +254,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient,
time: Ti
     // they can process the LeaderAndIsrRequests that are generated by replicaStateMachine.startup()
and
     // partitionStateMachine.startup().
     info("Sending update metadata request")
-    sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
+    sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty)
 
     replicaStateMachine.startup()
     partitionStateMachine.startup()
@@ -357,11 +357,14 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient,
time: Ti
     info(s"New broker startup callback for ${newBrokers.mkString(",")}")
     newBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove)
     val newBrokersSet = newBrokers.toSet
-    // send update metadata request to all live and shutting down brokers. Old brokers will
get to know of the new
-    // broker via this update.
+    val existingBrokers = controllerContext.liveOrShuttingDownBrokerIds -- newBrokers
+    // Send update metadata request to all the existing brokers in the cluster so that they
know about the new brokers
+    // via this update. No need to include any partition states in the request since there
are no partition state changes.
+    sendUpdateMetadataRequest(existingBrokers.toSeq, Set.empty)
+    // Send update metadata request to all the new brokers in the cluster with a full set
of partition states for initialization.
     // In cases of controlled shutdown leaders will not be elected when a new broker comes
up. So at least in the
-    // common controlled shutdown case, the metadata will reach the new brokers faster
-    sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
+    // common controlled shutdown case, the metadata will reach the new brokers faster.
+    sendUpdateMetadataRequest(newBrokers, controllerContext.partitionLeadershipInfo.keySet)
     // the very first thing to do when a new broker comes up is send it the entire list of
partitions that it is
     // supposed to host. Based on that the broker starts the high watermark threads for the
input list of partitions
     val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet)
@@ -421,7 +424,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient,
time: Ti
 
   private def onBrokerUpdate(updatedBrokerId: Int) {
     info(s"Broker info update callback for $updatedBrokerId")
-    sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
+    sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty)
   }
 
   /**
@@ -458,10 +461,10 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient,
time: Ti
       topicDeletionManager.failReplicaDeletion(newOfflineReplicasForDeletion)
     }
 
-    // If replica failure did not require leader re-election, inform brokers of the offline
replica
+    // If replica failure did not require leader re-election, inform brokers of the offline
brokers
     // Note that during leader re-election, brokers update their metadata
     if (partitionsWithoutLeader.isEmpty) {
-      sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
+      sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty)
     }
   }
 
@@ -887,7 +890,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient,
time: Ti
    *
    * @param brokers The brokers that the update metadata request should be sent to
    */
-  private[controller] def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicPartition]
= Set.empty[TopicPartition]) {
+  private[controller] def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicPartition])
{
     try {
       brokerRequestBatch.newBatch()
       brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions)
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 3114663..3fefc7b 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -214,13 +214,6 @@ class MetadataCache(brokerId: Int) extends Logging {
   def updateMetadata(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest): Seq[TopicPartition]
= {
     inWriteLock(partitionMetadataLock) {
 
-      //since kafka may do partial metadata updates, we start by copying the previous state
-      val partitionStates = new mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataRequest.PartitionState]](metadataSnapshot.partitionStates.size)
-      metadataSnapshot.partitionStates.foreach { case (topic, oldPartitionStates) =>
-        val copy = new mutable.LongMap[UpdateMetadataRequest.PartitionState](oldPartitionStates.size)
-        copy ++= oldPartitionStates
-        partitionStates += (topic -> copy)
-      }
       val aliveBrokers = new mutable.LongMap[Broker](metadataSnapshot.aliveBrokers.size)
       val aliveNodes = new mutable.LongMap[collection.Map[ListenerName, Node]](metadataSnapshot.aliveNodes.size)
       val controllerId = updateMetadataRequest.controllerId match {
@@ -248,21 +241,32 @@ class MetadataCache(brokerId: Int) extends Logging {
       }
 
       val deletedPartitions = new mutable.ArrayBuffer[TopicPartition]
-      updateMetadataRequest.partitionStates.asScala.foreach { case (tp, info) =>
-        val controllerId = updateMetadataRequest.controllerId
-        val controllerEpoch = updateMetadataRequest.controllerEpoch
-        if (info.basePartitionState.leader == LeaderAndIsr.LeaderDuringDelete) {
-          removePartitionInfo(partitionStates, tp.topic, tp.partition)
-          stateChangeLogger.trace(s"Deleted partition $tp from metadata cache in response
to UpdateMetadata " +
-            s"request sent by controller $controllerId epoch $controllerEpoch with correlation
id $correlationId")
-          deletedPartitions += tp
-        } else {
-          addOrUpdatePartitionInfo(partitionStates, tp.topic, tp.partition, info)
-          stateChangeLogger.trace(s"Cached leader info $info for partition $tp in response
to " +
-            s"UpdateMetadata request sent by controller $controllerId epoch $controllerEpoch
with correlation id $correlationId")
+      if (updateMetadataRequest.partitionStates().isEmpty) {
+        metadataSnapshot = MetadataSnapshot(metadataSnapshot.partitionStates, controllerId,
aliveBrokers, aliveNodes)
+      } else {
+        //since kafka may do partial metadata updates, we start by copying the previous state
+        val partitionStates = new mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataRequest.PartitionState]](metadataSnapshot.partitionStates.size)
+        metadataSnapshot.partitionStates.foreach { case (topic, oldPartitionStates) =>
+          val copy = new mutable.LongMap[UpdateMetadataRequest.PartitionState](oldPartitionStates.size)
+          copy ++= oldPartitionStates
+          partitionStates += (topic -> copy)
+        }
+        updateMetadataRequest.partitionStates.asScala.foreach { case (tp, info) =>
+          val controllerId = updateMetadataRequest.controllerId
+          val controllerEpoch = updateMetadataRequest.controllerEpoch
+          if (info.basePartitionState.leader == LeaderAndIsr.LeaderDuringDelete) {
+            removePartitionInfo(partitionStates, tp.topic, tp.partition)
+            stateChangeLogger.trace(s"Deleted partition $tp from metadata cache in response
to UpdateMetadata " +
+              s"request sent by controller $controllerId epoch $controllerEpoch with correlation
id $correlationId")
+            deletedPartitions += tp
+          } else {
+            addOrUpdatePartitionInfo(partitionStates, tp.topic, tp.partition, info)
+            stateChangeLogger.trace(s"Cached leader info $info for partition $tp in response
to " +
+              s"UpdateMetadata request sent by controller $controllerId epoch $controllerEpoch
with correlation id $correlationId")
+          }
         }
+        metadataSnapshot = MetadataSnapshot(partitionStates, controllerId, aliveBrokers,
aliveNodes)
       }
-      metadataSnapshot = MetadataSnapshot(partitionStates, controllerId, aliveBrokers, aliveNodes)
       deletedPartitions
     }
   }
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index e2f50b9..c0eda3d 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -83,6 +83,59 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     waitUntilControllerEpoch(firstControllerEpoch + 1, "controller epoch was not incremented
after controller move")
   }
 
+  // This test case is used to ensure that there will be no correctness issue after we avoid
sending out full
+  // UpdateMetadataRequest to all brokers in the cluster
+  @Test
+  def testMetadataPropagationOnBrokerChange(): Unit = {
+    servers = makeServers(3)
+    TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+    // Need to make sure the broker we shutdown and startup are not the controller. Otherwise
we will send out
+    // full UpdateMetadataReuqest to all brokers during controller failover.
+    val testBroker = servers.filter(e => e.config.brokerId != controllerId).head
+    val remainingBrokers = servers.filter(_.config.brokerId != testBroker.config.brokerId)
+    val topic = "topic1"
+    // Make sure shutdown the test broker will not require any leadership change to test
avoid sending out full
+    // UpdateMetadataRequest on broker failure
+    val assignment = Map(
+      0 -> Seq(remainingBrokers(0).config.brokerId, testBroker.config.brokerId),
+      1 -> remainingBrokers.map(_.config.brokerId))
+
+    // Create topic
+    TestUtils.createTopic(zkClient, topic, assignment, servers)
+
+    // Shutdown the broker
+    testBroker.shutdown()
+    testBroker.awaitShutdown()
+    TestUtils.waitUntilBrokerMetadataIsPropagated(remainingBrokers)
+    remainingBrokers.foreach { server =>
+      val offlineReplicaPartitionInfo = server.metadataCache.getPartitionInfo(topic, 0).get
+      assertEquals(1, offlineReplicaPartitionInfo.offlineReplicas.size())
+      assertEquals(testBroker.config.brokerId, offlineReplicaPartitionInfo.offlineReplicas.get(0))
+      assertEquals(assignment(0).asJava, offlineReplicaPartitionInfo.basePartitionState.replicas)
+      assertEquals(Seq(remainingBrokers.head.config.brokerId).asJava, offlineReplicaPartitionInfo.basePartitionState.isr)
+      val onlinePartitionInfo = server.metadataCache.getPartitionInfo(topic, 1).get
+      assertEquals(assignment(1).asJava, onlinePartitionInfo.basePartitionState.replicas)
+      assertTrue(onlinePartitionInfo.offlineReplicas.isEmpty)
+    }
+
+    // Startup the broker
+    testBroker.startup()
+    TestUtils.waitUntilTrue( () => {
+      !servers.exists { server =>
+        assignment.exists { case (partitionId, replicas) =>
+          val partitionInfoOpt = server.metadataCache.getPartitionInfo(topic, partitionId)
+          if (partitionInfoOpt.isDefined) {
+            val partitionInfo = partitionInfoOpt.get
+            !partitionInfo.offlineReplicas.isEmpty || !partitionInfo.basePartitionState.replicas.asScala.equals(replicas)
+          } else {
+            true
+          }
+        }
+      }
+    }, "Inconsistent metadata after broker startup")
+  }
+
   @Test
   def testTopicCreation(): Unit = {
     servers = makeServers(1)


Mime
View raw message