kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: kafka-972; MetadataRequest returns stale list of brokers; patched by Ashish Singh; reviewed by Jun Rao
Date Tue, 14 Jul 2015 00:16:41 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 69b451e28 -> bdbb9672f


kafka-972; MetadataRequest returns stale list of brokers; patched by Ashish Singh; reviewed
by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bdbb9672
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bdbb9672
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bdbb9672

Branch: refs/heads/trunk
Commit: bdbb9672f5e035fd00801037e2affe64811ec6ab
Parents: 69b451e
Author: Ashish Singh <asingh@cloudera.com>
Authored: Mon Jul 13 17:16:34 2015 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Mon Jul 13 17:16:34 2015 -0700

----------------------------------------------------------------------
 .../kafka/controller/KafkaController.scala      | 21 +++++--
 .../kafka/integration/TopicMetadataTest.scala   | 66 ++++++++++++++++++--
 2 files changed, 77 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bdbb9672/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 20f1499..b4fc755 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -387,8 +387,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val
brokerSt
   /**
    * This callback is invoked by the replica state machine's broker change listener, with
the list of newly started
    * brokers as input. It does the following -
-   * 1. Triggers the OnlinePartition state change for all new/offline partitions
-   * 2. It checks whether there are reassigned replicas assigned to any newly started brokers.
 If
+   * 1. Sends update metadata request to all live and shutting down brokers
+   * 2. Triggers the OnlinePartition state change for all new/offline partitions
+   * 3. It checks whether there are reassigned replicas assigned to any newly started brokers.
 If
    *    so, it performs the reassignment logic for each topic/partition.
    *
    * Note that we don't need to refresh the leader/isr cache for all topic/partitions at
this point for two reasons:
@@ -400,10 +401,11 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient,
val brokerSt
   def onBrokerStartup(newBrokers: Seq[Int]) {
     info("New broker startup callback for %s".format(newBrokers.mkString(",")))
     val newBrokersSet = newBrokers.toSet
-    // send update metadata request for all partitions to the newly restarted brokers. In
cases of controlled shutdown
-    // leaders will not be elected when a new broker comes up. So at least in the common
controlled shutdown case, the
-    // metadata will reach the new brokers faster
-    sendUpdateMetadataRequest(newBrokers)
+    // send update metadata request to all live and shutting down brokers. Old brokers will
get to know of the new
+    // broker via this update.
+    // In cases of controlled shutdown leaders will not be elected when a new broker comes
up. So at least in the
+    // common controlled shutdown case, the metadata will reach the new brokers faster
+    sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
     // the very first thing to do when a new broker comes up is send it the entire list of
partitions that it is
     // supposed to host. Based on that the broker starts the high watermark threads for the
input list of partitions
     val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet)
@@ -433,6 +435,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val
brokerSt
    * 1. Mark partitions with dead leaders as offline
    * 2. Triggers the OnlinePartition state change for all new/offline partitions
    * 3. Invokes the OfflineReplica state change on the input list of newly started brokers
+   * 4. If no partitions are effected then send UpdateMetadataRequest to live or shutting
down brokers
    *
    * Note that we don't need to refresh the leader/isr cache for all topic/partitions at
this point.  This is because
    * the partition state machine will refresh our cache for us when performing leader election
for all new/offline
@@ -464,6 +467,12 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val
brokerSt
       // since topic deletion cannot be retried until at least one replica is in TopicDeletionStarted
state
       deleteTopicManager.failReplicaDeletion(replicasForTopicsToBeDeleted)
     }
+
+    // If broker failure did not require leader re-election, inform brokers of failed broker
+    // Note that during leader re-election, brokers update their metadata
+    if (partitionsWithoutLeader.isEmpty) {
+      sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdbb9672/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index a95ee5e..5b6c9d6 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -25,7 +25,7 @@ import kafka.api.{TopicMetadataResponse, TopicMetadataRequest}
 import kafka.client.ClientUtils
 import kafka.cluster.{Broker, BrokerEndPoint}
 import kafka.common.ErrorMapping
-import kafka.server.{NotRunning, KafkaConfig, KafkaServer}
+import kafka.server.{KafkaConfig, KafkaServer, NotRunning}
 import kafka.utils.TestUtils
 import kafka.utils.TestUtils._
 import kafka.zk.ZooKeeperTestHarness
@@ -36,7 +36,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
   private var server1: KafkaServer = null
   var brokerEndPoints: Seq[BrokerEndPoint] = null
   var adHocConfigs: Seq[KafkaConfig] = null
-  val numConfigs: Int = 2
+  val numConfigs: Int = 4
 
   override def setUp() {
     super.setUp()
@@ -171,13 +171,15 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness
{
 
 
   def testIsrAfterBrokerShutDownAndJoinsBack {
+    val numBrokers = 2 //just 2 brokers are enough for the test
+
     // start adHoc brokers
-    val adHocServers = adHocConfigs.map(p => createServer(p))
+    val adHocServers = adHocConfigs.take(numBrokers - 1).map(p => createServer(p))
     val allServers: Seq[KafkaServer] = Seq(server1) ++ adHocServers
 
     // create topic
     val topic: String = "test"
-    AdminUtils.createTopic(zkClient, topic, 1, numConfigs)
+    AdminUtils.createTopic(zkClient, topic, 1, numBrokers)
 
     // shutdown a broker
     adHocServers.last.shutdown()
@@ -192,4 +194,60 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness
{
     // shutdown adHoc brokers
     adHocServers.map(p => p.shutdown())
   }
+
+  private def checkMetadata(servers: Seq[KafkaServer], expectedBrokersCount: Int): Unit =
{
+    var topicMetadata: TopicMetadataResponse = new TopicMetadataResponse(Seq(), Seq(), -1)
+
+    // Get topic metadata from old broker
+    // Wait for metadata to get updated by checking metadata from a new broker
+    waitUntilTrue(() => {
+    topicMetadata = ClientUtils.fetchTopicMetadata(
+      Set.empty, brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata", 2000, 0)
+    topicMetadata.brokers.size == expectedBrokersCount},
+      "Alive brokers list is not correctly propagated by coordinator to brokers"
+    )
+
+    // Assert that topic metadata at new brokers is updated correctly
+    servers.filter(x => x.brokerState.currentState != NotRunning.state).foreach(x =>
+      waitUntilTrue(() =>
+        topicMetadata == ClientUtils.fetchTopicMetadata(
+          Set.empty,
+          Seq(new Broker(x.config.brokerId,
+            x.config.hostName,
+            x.boundPort()).getBrokerEndPoint(SecurityProtocol.PLAINTEXT)),
+          "TopicMetadataTest-testBasicTopicMetadata",
+          2000, 0), "Topic metadata is not correctly updated"))
+  }
+
+
+  def testAliveBrokerListWithNoTopics {
+    checkMetadata(Seq(server1), 1)
+  }
+
+  def testAliveBrokersListWithNoTopicsAfterNewBrokerStartup {
+    var adHocServers = adHocConfigs.takeRight(adHocConfigs.size - 1).map(p => createServer(p))
+
+    checkMetadata(adHocServers, numConfigs - 1)
+
+    // Add a broker
+    adHocServers = adHocServers ++ Seq(createServer(adHocConfigs.head))
+
+    checkMetadata(adHocServers, numConfigs)
+    adHocServers.map(p => p.shutdown())
+  }
+
+
+  def testAliveBrokersListWithNoTopicsAfterABrokerShutdown {
+    val adHocServers = adHocConfigs.map(p => createServer(p))
+
+    checkMetadata(adHocServers, numConfigs)
+
+    // Shutdown a broker
+    adHocServers.last.shutdown()
+    adHocServers.last.awaitShutdown()
+
+    checkMetadata(adHocServers, numConfigs - 1)
+
+    adHocServers.map(p => p.shutdown())
+  }
 }


Mime
View raw message