kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-3640: Reduce the latency of topic metadata requests
Date Mon, 02 May 2016 18:20:58 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.9.0 5f316a58a -> 51f981b5b


KAFKA-3640: Reduce the latency of topic metadata requests

Changes based on KAFKA-2073 to reduce the latency of metadata requests.

Author: Flavio Junqueira <fpj@apache.org>

Reviewers: Gwen Shapira

Closes #1303 from fpj/KAFKA-3640


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/51f981b5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/51f981b5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/51f981b5

Branch: refs/heads/0.9.0
Commit: 51f981b5b0dc1bba919892ddaae09ce5832603a8
Parents: 5f316a5
Author: Flavio Junqueira <fpj@apache.org>
Authored: Mon May 2 11:20:52 2016 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Mon May 2 11:20:52 2016 -0700

----------------------------------------------------------------------
 .../src/main/scala/kafka/server/KafkaApis.scala | 147 +++++++++++--------
 .../main/scala/kafka/server/MetadataCache.scala | 139 +++++++++++-------
 .../scala/kafka/server/ReplicaManager.scala     |   2 +-
 3 files changed, 169 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/51f981b5/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index d30bc2d..0154f14 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -18,7 +18,7 @@
 package kafka.server
 
 import java.nio.ByteBuffer
-import java.util
+import java.util.Properties
 
 import kafka.admin.AdminUtils
 import kafka.api._
@@ -39,6 +39,8 @@ import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.Node
 
 import scala.collection._
+import scala.collection.JavaConverters._
+
 /**
  * Logic to handle the various Kafka requests
  */
@@ -227,7 +229,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case (topicAndPartition, metaAndError) => {
           val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicAndPartition.topic)
           try {
-            if (metadataCache.getTopicMetadata(Set(topicAndPartition.topic), request.securityProtocol).size
<= 0) {
+            if (!metadataCache.hasTopicMetadata(topicAndPartition.topic)) {
               (topicAndPartition, ErrorMapping.UnknownTopicOrPartitionCode)
             } else if (metaAndError.metadata != null && metaAndError.metadata.length
> config.offsetMetadataMaxSize) {
               (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode)
@@ -548,45 +550,56 @@ class KafkaApis(val requestChannel: RequestChannel,
     ret.toSeq.sortBy(- _)
   }
 
+  private def createTopic(topic: String,
+                          numPartitions: Int,
+                          replicationFactor: Int,
+                          properties: Properties = new Properties()): TopicMetadata = {
+    try {
+      AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, properties)
+      info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
+        .format(topic, numPartitions, replicationFactor))
+      new TopicMetadata(topic, Seq.empty[PartitionMetadata], Errors.LEADER_NOT_AVAILABLE.code)
+    } catch {
+      case e: TopicExistsException => // let it go, possibly another broker created this
topic
+        new TopicMetadata(topic, Seq.empty[PartitionMetadata], Errors.LEADER_NOT_AVAILABLE.code)
+      case itex: InvalidTopicException =>
+        new TopicMetadata(topic, Seq.empty[PartitionMetadata], Errors.INVALID_TOPIC_EXCEPTION.code)
+    }
+  }
+
+  private def createGroupMetadataTopic(): TopicMetadata = {
+    val aliveBrokers = metadataCache.getAliveBrokers
+    val offsetsTopicReplicationFactor =
+      if (aliveBrokers.nonEmpty)
+        Math.min(config.offsetsTopicReplicationFactor.toInt, aliveBrokers.length)
+      else
+        config.offsetsTopicReplicationFactor.toInt
+    createTopic(GroupCoordinator.GroupMetadataTopicName, config.offsetsTopicPartitions,
+      offsetsTopicReplicationFactor, coordinator.offsetsTopicConfigs)
+  }
+
+  private def getOrCreateGroupMetadataTopic(securityProtocol: SecurityProtocol): TopicMetadata
= {
+    val topicMetadata = metadataCache.getTopicMetadata(Set(GroupCoordinator.GroupMetadataTopicName),
securityProtocol)
+    topicMetadata.headOption.getOrElse(createGroupMetadataTopic())
+  }
+
   private def getTopicMetadata(topics: Set[String], securityProtocol: SecurityProtocol):
Seq[TopicMetadata] = {
     val topicResponses = metadataCache.getTopicMetadata(topics, securityProtocol)
-    if (topics.size > 0 && topicResponses.size != topics.size) {
+    if (topics.isEmpty || topicResponses.size == topics.size) {
+      topicResponses
+    } else {
       val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet
       val responsesForNonExistentTopics = nonExistentTopics.map { topic =>
-        if (topic == GroupCoordinator.GroupMetadataTopicName || config.autoCreateTopicsEnable)
{
-          try {
-            if (topic == GroupCoordinator.GroupMetadataTopicName) {
-              val aliveBrokers = metadataCache.getAliveBrokers
-              val offsetsTopicReplicationFactor =
-                if (aliveBrokers.length > 0)
-                  Math.min(config.offsetsTopicReplicationFactor.toInt, aliveBrokers.length)
-                else
-                  config.offsetsTopicReplicationFactor.toInt
-              AdminUtils.createTopic(zkUtils, topic, config.offsetsTopicPartitions,
-                                     offsetsTopicReplicationFactor,
-                                     coordinator.offsetsTopicConfigs)
-              info("Auto creation of topic %s with %d partitions and replication factor %d
is successful!"
-                .format(topic, config.offsetsTopicPartitions, offsetsTopicReplicationFactor))
-            }
-            else {
-              AdminUtils.createTopic(zkUtils, topic, config.numPartitions, config.defaultReplicationFactor)
-              info("Auto creation of topic %s with %d partitions and replication factor %d
is successful!"
-                   .format(topic, config.numPartitions, config.defaultReplicationFactor))
-            }
-            new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode)
-          } catch {
-            case e: TopicExistsException => // let it go, possibly another broker created
this topic
-              new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode)
-            case itex: InvalidTopicException =>
-              new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.InvalidTopicCode)
-          }
+        if (topic == GroupCoordinator.GroupMetadataTopicName) {
+          createGroupMetadataTopic()
+        } else if (config.autoCreateTopicsEnable) {
+          createTopic(topic, config.numPartitions, config.defaultReplicationFactor)
         } else {
           new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode)
         }
       }
-      topicResponses.appendAll(responsesForNonExistentTopics)
+      topicResponses ++ responsesForNonExistentTopics
     }
-    topicResponses
   }
 
   /**
@@ -595,25 +608,23 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleTopicMetadataRequest(request: RequestChannel.Request) {
     val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest]
 
-    //if topics is empty -> fetch all topics metadata but filter out the topic response
that are not authorized
-    val topics = if (metadataRequest.topics.isEmpty) {
-      val topicResponses = metadataCache.getTopicMetadata(metadataRequest.topics.toSet, request.securityProtocol)
-      topicResponses.map(_.topic).filter(topic => authorize(request.session, Describe,
new Resource(Topic, topic))).toSet
+    val topics = metadataRequest.topics.toSet
+    var (authorizedTopics, unauthorizedTopics) = if (metadataRequest.topics.isEmpty) {
+      //if topics is empty -> fetch all topics metadata but filter out the topic response
that are not authorized
+      val authorized = metadataCache.getAllTopics()
+        .filter(topic => authorize(request.session, Describe, new Resource(Topic, topic)))
+      (authorized, mutable.Set[String]())
     } else {
-      metadataRequest.topics.toSet
+      topics.partition(topic => authorize(request.session, Describe, new Resource(Topic,
topic)))
     }
 
-    //when topics is empty this will be a duplicate authorization check but given this should
just be a cache lookup, it should not matter.
-    var (authorizedTopics, unauthorizedTopics) = topics.partition(topic => authorize(request.session,
Describe, new Resource(Topic, topic)))
-
-    if (!authorizedTopics.isEmpty) {
-      val topicResponses = metadataCache.getTopicMetadata(authorizedTopics, request.securityProtocol)
-      if (config.autoCreateTopicsEnable && topicResponses.size != authorizedTopics.size)
{
-        val nonExistentTopics: Set[String] = topics -- topicResponses.map(_.topic).toSet
-        authorizer.foreach {
-          az => if (!az.authorize(request.session, Create, Resource.ClusterResource))
{
-            authorizedTopics --= nonExistentTopics
-            unauthorizedTopics ++= nonExistentTopics
+    if (authorizedTopics.nonEmpty) {
+      val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics)
+      if (config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) {
+        authorizer.foreach { az =>
+          if (!az.authorize(request.session, Create, Resource.ClusterResource)) {
+            authorizedTopics --= nonExistingTopics
+            unauthorizedTopics ++= nonExistingTopics
           }
         }
       }
@@ -621,9 +632,14 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     val unauthorizedTopicMetaData = unauthorizedTopics.map(topic => new TopicMetadata(topic,
Seq.empty[PartitionMetadata], ErrorMapping.TopicAuthorizationCode))
 
-    val topicMetadata = if (authorizedTopics.isEmpty) Seq.empty[TopicMetadata] else getTopicMetadata(authorizedTopics,
request.securityProtocol)
+    val topicMetadata = if (authorizedTopics.isEmpty)
+      Seq.empty[TopicMetadata]
+    else
+      getTopicMetadata(authorizedTopics, request.securityProtocol)
     val brokers = metadataCache.getAliveBrokers
-    trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(topicMetadata.mkString(","),
brokers.mkString(","), metadataRequest.correlationId, metadataRequest.clientId))
+
+    trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(topicMetadata.mkString(","),
+      brokers.mkString(","), request.header.correlationId, request.header.clientId))
     val response = new TopicMetadataResponse(brokers.map(_.getBrokerEndPoint(request.securityProtocol)),
topicMetadata  ++ unauthorizedTopicMetaData, metadataRequest.correlationId)
     requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId,
response)))
   }
@@ -655,7 +671,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       val responseInfo = authorizedTopicPartitions.map( topicAndPartition => {
         val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, topicAndPartition.topic)
         try {
-          if (metadataCache.getTopicMetadata(Set(topicAndPartition.topic), request.securityProtocol).size
<= 0) {
+          if (!metadataCache.hasTopicMetadata(topicAndPartition.topic)) {
             (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)
           } else {
             val payloadOpt = zkUtils.readDataMaybeNull(topicDirs.consumerOffsetDir + "/"
+ topicAndPartition.partition)._1
@@ -699,16 +715,23 @@ class KafkaApis(val requestChannel: RequestChannel,
       val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId)
 
       // get metadata (and create the topic if necessary)
-      val offsetsTopicMetadata = getTopicMetadata(Set(GroupCoordinator.GroupMetadataTopicName),
request.securityProtocol).head
-      val coordinatorEndpoint = offsetsTopicMetadata.partitionsMetadata.find(_.partitionId
== partition).flatMap {
-        partitionMetadata => partitionMetadata.leader
-      }
+      val offsetsTopicMetadata = getOrCreateGroupMetadataTopic(request.securityProtocol)
 
-      val responseBody = coordinatorEndpoint match {
-        case None =>
-          new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode())
-        case Some(endpoint) =>
-          new GroupCoordinatorResponse(Errors.NONE.code, new Node(endpoint.id, endpoint.host,
endpoint.port))
+      val responseBody = if (offsetsTopicMetadata.errorCode != Errors.NONE.code) {
+        new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode)
+      } else {
+        val coordinatorEndpoint = offsetsTopicMetadata.partitionsMetadata
+          .find(_.partitionId == partition)
+          .flatMap {
+            partitionMetadata => partitionMetadata.leader
+          }
+
+        coordinatorEndpoint match {
+          case Some(endpoint) =>
+            new GroupCoordinatorResponse(Errors.NONE.code, new Node(endpoint.id, endpoint.host,
endpoint.port))
+          case _ =>
+            new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode)
+        }
       }
 
       trace("Sending consumer metadata %s for correlation id %d to client %s."
@@ -718,8 +741,6 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleDescribeGroupRequest(request: RequestChannel.Request) {
-    import JavaConverters._
-
     val describeRequest = request.body.asInstanceOf[DescribeGroupsRequest]
     val responseHeader = new ResponseHeader(request.header.correlationId)
 
@@ -744,8 +765,6 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleListGroupsRequest(request: RequestChannel.Request) {
-    import JavaConverters._
-
     val responseHeader = new ResponseHeader(request.header.correlationId)
     val responseBody = if (!authorize(request.session, Describe, Resource.ClusterResource))
{
       ListGroupsResponse.fromError(Errors.CLUSTER_AUTHORIZATION_FAILED)

http://git-wip-us.apache.org/repos/asf/kafka/blob/51f981b5/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 9a9205f..f057e15 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -17,14 +17,16 @@
 
 package kafka.server
 
-import kafka.cluster.{BrokerEndPoint,Broker}
-import kafka.common.{ErrorMapping, ReplicaNotAvailableException, LeaderNotAvailableException}
+import kafka.cluster._
 import kafka.common.TopicAndPartition
 
 import kafka.api._
-import kafka.controller.KafkaController.StateChangeLogger
-import org.apache.kafka.common.protocol.SecurityProtocol
-import scala.collection.{Seq, Set, mutable}
+import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch}
+import org.apache.kafka.common.Node
+
+import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
+import scala.collection.{mutable, Seq, Set}
+import scala.collection.JavaConverters._
 import kafka.utils.Logging
 import kafka.utils.CoreUtils._
 
@@ -35,6 +37,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
  *  UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously.
  */
 private[server] class MetadataCache(brokerId: Int) extends Logging {
+  private val stateChangeLogger = KafkaController.stateChangeLogger
   private val cache: mutable.Map[String, mutable.Map[Int, PartitionStateInfo]] =
     new mutable.HashMap[String, mutable.Map[Int, PartitionStateInfo]]()
   private var aliveBrokers: Map[Int, Broker] = Map()
@@ -42,63 +45,93 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
 
   this.logIdent = "[Kafka Metadata Cache on broker %d] ".format(brokerId)
 
-  def getTopicMetadata(topics: Set[String], protocol: SecurityProtocol) = {
+  private def getAliveEndpoints(brokers: Iterable[Int], protocol: SecurityProtocol): Seq[BrokerEndPoint]
= {
+    val result = new mutable.ArrayBuffer[BrokerEndPoint](math.min(aliveBrokers.size, brokers.size))
+    brokers.foreach { brokerId =>
+      getAliveEndpoint(brokerId, protocol).foreach(result +=)
+    }
+    result
+  }
+
+  private def getAliveEndpoint(brokerId: Int, protocol: SecurityProtocol): Option[BrokerEndPoint]
=
+    aliveBrokers.get(brokerId).map(_.getBrokerEndPoint(protocol))
+
+  private def getPartitionMetadata(topic: String, protocol: SecurityProtocol): Option[Iterable[PartitionMetadata]]
= {
+    cache.get(topic).map { partitions =>
+      partitions.map { case (partitionId, partitionState) =>
+        val topicPartition = TopicAndPartition(topic, partitionId)
+
+        val leaderAndIsr = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
+        val maybeLeader = aliveBrokers.get(leaderAndIsr.leader)
+
+        val replicas = partitionState.allReplicas
+        val replicaInfo = getAliveEndpoints(replicas, protocol)
+
+        maybeLeader match {
+          case None =>
+            debug("Error while fetching metadata for %s: leader not available".format(topicPartition))
+            new PartitionMetadata(partitionId, None, replicaInfo, Seq.empty[BrokerEndPoint],
+              Errors.LEADER_NOT_AVAILABLE.code)
+
+          case Some(leader) =>
+            val isr = leaderAndIsr.isr
+            val isrInfo = getAliveEndpoints(isr, protocol)
+
+            if (replicaInfo.size < replicas.size) {
+              debug("Error while fetching metadata for %s: replica information not available
for following brokers %s"
+                .format(topicPartition, replicas.filterNot(replicaInfo.map(_.id).contains).mkString(",")))
+
+              new PartitionMetadata(partitionId, Some(leader.getBrokerEndPoint(protocol)),
replicaInfo, isrInfo, Errors.REPLICA_NOT_AVAILABLE.code)
+            } else if (isrInfo.size < isr.size) {
+              debug("Error while fetching metadata for %s: in sync replica information not
available for following brokers %s"
+                .format(topicPartition, isr.filterNot(isrInfo.map(_.id).contains).mkString(",")))
+              new PartitionMetadata(partitionId, Some(leader.getBrokerEndPoint(protocol)),
replicaInfo, isrInfo, Errors.REPLICA_NOT_AVAILABLE.code)
+            } else {
+              new PartitionMetadata(partitionId, Some(leader.getBrokerEndPoint(protocol)),
replicaInfo, isrInfo, Errors.NONE.code)
+            }
+        }
+      }
+    }
+  }
 
-    val isAllTopics = topics.isEmpty
-    val topicsRequested = if(isAllTopics) cache.keySet else topics
-    val topicResponses: mutable.ListBuffer[TopicMetadata] = new mutable.ListBuffer[TopicMetadata]
+  def getTopicMetadata(topics: Set[String], protocol: SecurityProtocol): Seq[TopicMetadata]
= {
     inReadLock(partitionMetadataLock) {
-      for (topic <- topicsRequested) {
-        if (isAllTopics || cache.contains(topic)) {
-          val partitionStateInfos = cache(topic)
-          val partitionMetadata = partitionStateInfos.map {
-            case (partitionId, partitionState) =>
-              val replicas = partitionState.allReplicas
-              val replicaInfo: Seq[BrokerEndPoint] = replicas.map(aliveBrokers.getOrElse(_,
null)).filter(_ != null).toSeq.map(_.getBrokerEndPoint(protocol))
-              var leaderInfo: Option[BrokerEndPoint] = None
-              var leaderBrokerInfo: Option[Broker] = None
-              var isrInfo: Seq[BrokerEndPoint] = Nil
-              val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch
-              val leader = leaderIsrAndEpoch.leaderAndIsr.leader
-              val isr = leaderIsrAndEpoch.leaderAndIsr.isr
-              val topicPartition = TopicAndPartition(topic, partitionId)
-              try {
-                leaderBrokerInfo = aliveBrokers.get(leader)
-                if (!leaderBrokerInfo.isDefined)
-                  throw new LeaderNotAvailableException("Leader not available for %s".format(topicPartition))
-                else
-                  leaderInfo = Some(leaderBrokerInfo.get.getBrokerEndPoint(protocol))
-                isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).map(_.getBrokerEndPoint(protocol))
-                if (replicaInfo.size < replicas.size)
-                  throw new ReplicaNotAvailableException("Replica information not available
for following brokers: " +
-                    replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(","))
-                if (isrInfo.size < isr.size)
-                  throw new ReplicaNotAvailableException("In Sync Replica information not
available for following brokers: " +
-                    isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
-                new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
-              } catch {
-                case e: Throwable =>
-                  debug("Error while fetching metadata for %s: %s".format(topicPartition,
e.toString))
-                  new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo,
-                    ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
-              }
-          }
-          topicResponses += new TopicMetadata(topic, partitionMetadata.toSeq)
+      val topicsRequested = if (topics.isEmpty) cache.keySet else topics
+      topicsRequested.toSeq.flatMap { topic =>
+        getPartitionMetadata(topic, protocol).map { partitionMetadata =>
+          new TopicMetadata(topic, partitionMetadata.toBuffer, Errors.NONE.code)
         }
       }
     }
-    topicResponses
   }
 
-  def getAliveBrokers = {
+  def hasTopicMetadata(topic: String): Boolean = {
+    inReadLock(partitionMetadataLock) {
+      cache.contains(topic)
+    }
+  }
+
+  def getAllTopics(): Set[String] = {
+    inReadLock(partitionMetadataLock) {
+      cache.keySet.toSet
+    }
+  }
+
+  def getNonExistingTopics(topics: Set[String]): Set[String] = {
+    inReadLock(partitionMetadataLock) {
+      topics -- cache.keySet
+    }
+  }
+
+  def getAliveBrokers: Seq[Broker] = {
     inReadLock(partitionMetadataLock) {
-      aliveBrokers.values.toSeq
+      aliveBrokers.values.toBuffer
     }
   }
 
-  def addOrUpdatePartitionInfo(topic: String,
-                               partitionId: Int,
-                               stateInfo: PartitionStateInfo) {
+  private def addOrUpdatePartitionInfo(topic: String,
+                                       partitionId: Int,
+                                       stateInfo: PartitionStateInfo) {
     inWriteLock(partitionMetadataLock) {
       cache.get(topic) match {
         case Some(infos) => infos.put(partitionId, stateInfo)
@@ -120,9 +153,7 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
     }
   }
 
-  def updateCache(updateMetadataRequest: UpdateMetadataRequest,
-                  brokerId: Int,
-                  stateChangeLogger: StateChangeLogger) {
+  def updateCache(updateMetadataRequest: UpdateMetadataRequest) {
     inWriteLock(partitionMetadataLock) {
       aliveBrokers = updateMetadataRequest.aliveBrokers.map(b => (b.id, b)).toMap
       updateMetadataRequest.partitionStateInfos.foreach { case(tp, info) =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/51f981b5/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 7a99aad..6effc62 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -573,7 +573,7 @@ class ReplicaManager(val config: KafkaConfig,
         stateChangeLogger.warn(stateControllerEpochErrorMessage)
         throw new ControllerMovedException(stateControllerEpochErrorMessage)
       } else {
-        metadataCache.updateCache(updateMetadataRequest, localBrokerId, stateChangeLogger)
+        metadataCache.updateCache(updateMetadataRequest)
         controllerEpoch = updateMetadataRequest.controllerEpoch
       }
     }


Mime
View raw message