kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject git commit: KAFKA-1356 Topic metadata requests takes too long to process; reviewed by Joel Koshy, Neha Narkhede, Jun Rao and Guozhang Wang
Date Thu, 17 Apr 2014 17:18:56 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.8.1 3c4ca854f -> 82f4a8e1c


KAFKA-1356 Topic metadata requests takes too long to process; reviewed by Joel Koshy, Neha
Narkhede, Jun Rao and Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/82f4a8e1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/82f4a8e1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/82f4a8e1

Branch: refs/heads/0.8.1
Commit: 82f4a8e1c05a4ef197ef94e6f09c3132f5628037
Parents: 3c4ca85
Author: Timothy Chen <tnachen@gmail.com>
Authored: Thu Apr 17 10:18:27 2014 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Thu Apr 17 10:18:27 2014 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/api/TopicMetadata.scala    |   8 +-
 .../kafka/controller/KafkaController.scala      |   1 -
 .../src/main/scala/kafka/server/KafkaApis.scala | 260 +++++++++++--------
 .../test/scala/unit/kafka/admin/AdminTest.scala |  10 +-
 .../unit/kafka/server/SimpleFetchTest.scala     |   4 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |   2 +-
 6 files changed, 160 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/82f4a8e1/core/src/main/scala/kafka/api/TopicMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadata.scala b/core/src/main/scala/kafka/api/TopicMetadata.scala
index 0513a59..51380a6 100644
--- a/core/src/main/scala/kafka/api/TopicMetadata.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadata.scala
@@ -32,9 +32,11 @@ object TopicMetadata {
     val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue))
     val topic = readShortString(buffer)
     val numPartitions = readIntInRange(buffer, "number of partitions", (0, Int.MaxValue))
-    val partitionsMetadata = new ArrayBuffer[PartitionMetadata]()
-    for(i <- 0 until numPartitions)
-      partitionsMetadata += PartitionMetadata.readFrom(buffer, brokers)
+    val partitionsMetadata: Array[PartitionMetadata] = new Array[PartitionMetadata](numPartitions)
+    for(i <- 0 until numPartitions) {
+      val partitionMetadata = PartitionMetadata.readFrom(buffer, brokers)
+      partitionsMetadata(partitionMetadata.partitionId) = partitionMetadata
+    }
     new TopicMetadata(topic, partitionsMetadata, errorCode)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/82f4a8e1/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index d6c0321..e225226 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -34,7 +34,6 @@ import org.apache.zookeeper.Watcher.Event.KeeperState
 import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient}
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
 import java.util.concurrent.atomic.AtomicInteger
-import org.apache.log4j.Logger
 import java.util.concurrent.locks.ReentrantLock
 import scala.Some
 import kafka.common.TopicAndPartition

http://git-wip-us.apache.org/repos/asf/kafka/blob/82f4a8e1/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0f137c5..b9d2260 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -33,7 +33,10 @@ import kafka.utils.{ZkUtils, Pool, SystemTime, Logging}
 import kafka.network.RequestChannel.Response
 import kafka.cluster.Broker
 import kafka.controller.KafkaController
-
+import kafka.utils.Utils.inLock
+import org.I0Itec.zkclient.ZkClient
+import java.util.concurrent.locks.ReentrantReadWriteLock
+import kafka.controller.KafkaController.StateChangeLogger
 
 /**
  * Logic to handle the various Kafka requests
@@ -52,12 +55,130 @@ class KafkaApis(val requestChannel: RequestChannel,
   private val delayedRequestMetrics = new DelayedRequestMetrics
   /* following 3 data structures are updated by the update metadata request
   * and is queried by the topic metadata request. */
-  var metadataCache: mutable.Map[TopicAndPartition, PartitionStateInfo] =
-    new mutable.HashMap[TopicAndPartition, PartitionStateInfo]()
-  private val aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]()
-  private val partitionMetadataLock = new Object
+  var metadataCache = new MetadataCache
   this.logIdent = "[KafkaApi-%d] ".format(brokerId)
 
+  class MetadataCache {
+    private val cache: mutable.Map[String, mutable.Map[Int, PartitionStateInfo]] =
+      new mutable.HashMap[String, mutable.Map[Int, PartitionStateInfo]]()
+
+    private val aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]()
+    private val partitionMetadataLock = new ReentrantReadWriteLock()
+
+    def getTopicMetadata(topics: Set[String]): Tuple2[mutable.ListBuffer[TopicMetadata],
mutable.ListBuffer[String]] = {
+      val isAllTopics = topics.isEmpty
+      val topicsRequested = if(isAllTopics) cache.keySet else topics
+      val topicResponses: mutable.ListBuffer[TopicMetadata] = new mutable.ListBuffer[TopicMetadata]
+      val topicsToBeCreated: mutable.ListBuffer[String] = new mutable.ListBuffer[String]
+      inLock(partitionMetadataLock.readLock()) {
+        for (topic <- topicsRequested) {
+          if (isAllTopics || this.containsTopic(topic)) {
+            val partitionStateInfos = cache(topic)
+            val partitionMetadata = partitionStateInfos.map { case (partitionId, partitionState)
=>
+              val replicas = partitionState.allReplicas
+              val replicaInfo: Seq[Broker] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_
!= null).toSeq
+              var leaderInfo: Option[Broker] = None
+              var isrInfo: Seq[Broker] = Nil
+              val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch
+              val leader = leaderIsrAndEpoch.leaderAndIsr.leader
+              val isr = leaderIsrAndEpoch.leaderAndIsr.isr
+              debug("topic %s partition %s".format(topic, partitionId) + ";replicas = " +
replicas + ", in sync replicas = " + isr + ", leader = " + leader)
+              try {
+                leaderInfo = aliveBrokers.get(leader)
+                if (!leaderInfo.isDefined)
+                  throw new LeaderNotAvailableException("Leader not available for topic %s
partition %s".format(topic, partitionId))
+                isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null)
+                if (replicaInfo.size < replicas.size)
+                  throw new ReplicaNotAvailableException("Replica information not available
for following brokers: " +
+                    replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(","))
+                if (isrInfo.size < isr.size)
+                  throw new ReplicaNotAvailableException("In Sync Replica information not
available for following brokers: " +
+                    isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
+                new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
+              } catch {
+                case e: Throwable =>
+                  debug("Error while fetching metadata for topic %s partition %s. Possible
cause: %s".format(topic, partitionId, e.getMessage))
+                  new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo,
+                    ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+              }
+            }
+            topicResponses += new TopicMetadata(topic, partitionMetadata.toSeq)
+          } else if (config.autoCreateTopicsEnable) {
+            topicsToBeCreated += topic
+          } else {
+            topicResponses += new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode)
+          }
+        }
+      }
+      (topicResponses, topicsToBeCreated)
+    }
+
+    def addPartitionInfo(topic: String,
+                         partitionId: Int,
+                         stateInfo: PartitionStateInfo) {
+      inLock(partitionMetadataLock.writeLock()) {
+        addPartitionInfoInternal(topic, partitionId, stateInfo)
+      }
+    }
+
+    def getPartitionInfos(topic: String) = {
+      inLock(partitionMetadataLock.readLock()) {
+        cache(topic)
+      }
+    }
+
+    def containsTopicAndPartition(topic: String,
+                                  partitionId: Int): Boolean = {
+      inLock(partitionMetadataLock.readLock()) {
+        cache.get(topic) match {
+          case Some(partitionInfos) => partitionInfos.contains(partitionId)
+          case None => false
+        }
+      }
+    }
+
+    def containsTopic(topic: String) = cache.contains(topic)
+
+    def updateCache(updateMetadataRequest: UpdateMetadataRequest,
+                    brokerId: Int,
+                    stateChangeLogger: StateChangeLogger) {
+      inLock(partitionMetadataLock.writeLock()) {
+        updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b))
+        updateMetadataRequest.partitionStateInfos.foreach { partitionState =>
+          addPartitionInfoInternal(partitionState._1.topic, partitionState._1.partition,
partitionState._2)
+          stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response
to UpdateMetadata request " +
+            "sent by controller %d epoch %d with correlation id %d").format(brokerId, partitionState._2,
partitionState._1,
+              updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch,
updateMetadataRequest.correlationId))
+        }
+        // remove the topics that don't exist in the UpdateMetadata request since those are
the topics that are
+        // currently being deleted by the controller
+        val topicsKnownToThisBroker = cache.keySet
+        val topicsKnownToTheController = updateMetadataRequest.partitionStateInfos.map {
+          case(topicAndPartition, partitionStateInfo) => topicAndPartition.topic }.toSet
+        val deletedTopics = topicsKnownToThisBroker -- topicsKnownToTheController
+        deletedTopics.foreach { topic =>
+          cache.remove(topic)
+          stateChangeLogger.trace(("Broker %d deleted partitions for topic %s from metadata
cache in response to UpdateMetadata request " +
+            "sent by controller %d epoch %d with correlation id %d").format(brokerId, topic,
+              updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch,
updateMetadataRequest.correlationId))
+        }
+      }
+    }
+
+    private def addPartitionInfoInternal(topic: String,
+                                         partitionId: Int,
+                                         stateInfo: PartitionStateInfo) {
+      cache.get(topic) match {
+        case Some(infos) => infos.put(partitionId, stateInfo)
+        case None => {
+          val newInfos: mutable.Map[Int, PartitionStateInfo] = new mutable.HashMap[Int, PartitionStateInfo]
+          cache.put(topic, newInfos)
+          newInfos.put(partitionId, stateInfo)
+        }
+      }
+    }
+  }
+
   /**
    * Top-level method that handles all requests and multiplexes to the right api
    */
@@ -87,7 +208,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   // ensureTopicExists is only for client facing requests
   private def ensureTopicExists(topic: String) = {
-    if(!metadataCache.exists { case(topicAndPartition, partitionStateInfo) => topicAndPartition.topic.equals(topic)}
)
+    if (!metadataCache.containsTopic(topic))
       throw new UnknownTopicOrPartitionException("Topic " + topic + " either doesn't exist
or is in the process of being deleted")
   }
 
@@ -132,33 +253,9 @@ class KafkaApis(val requestChannel: RequestChannel,
       stateChangeLogger.warn(stateControllerEpochErrorMessage)
       throw new ControllerMovedException(stateControllerEpochErrorMessage)
     }
-    partitionMetadataLock synchronized {
-      replicaManager.controllerEpoch = updateMetadataRequest.controllerEpoch
+    replicaManager.controllerEpoch = updateMetadataRequest.controllerEpoch
       // cache the list of alive brokers in the cluster
-      updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b))
-      updateMetadataRequest.partitionStateInfos.foreach { partitionState =>
-        metadataCache.put(partitionState._1, partitionState._2)
-        stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response
to UpdateMetadata request " +
-          "sent by controller %d epoch %d with correlation id %d").format(brokerId, partitionState._2,
partitionState._1,
-          updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
-      }
-      // remove the topics that don't exist in the UpdateMetadata request since those are
the topics that are
-      // currently being deleted by the controller
-      val topicsKnownToThisBroker = metadataCache.map {
-        case(topicAndPartition, partitionStateInfo) => topicAndPartition.topic }.toSet
-      val topicsKnownToTheController = updateMetadataRequest.partitionStateInfos.map {
-        case(topicAndPartition, partitionStateInfo) => topicAndPartition.topic }.toSet
-      val deletedTopics = topicsKnownToThisBroker -- topicsKnownToTheController
-      val partitionsToBeDeleted = metadataCache.filter {
-        case(topicAndPartition, partitionStateInfo) => deletedTopics.contains(topicAndPartition.topic)
-      }.keySet
-      partitionsToBeDeleted.foreach { partition =>
-        metadataCache.remove(partition)
-        stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in response
to UpdateMetadata request " +
-          "sent by controller %d epoch %d with correlation id %d").format(brokerId, partition,
-          updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
-      }
-    }
+    metadataCache.updateCache(updateMetadataRequest, brokerId, stateChangeLogger)
     val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId)
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(updateMetadataResponse)))
   }
@@ -550,89 +647,26 @@ class KafkaApis(val requestChannel: RequestChannel,
    */
   def handleTopicMetadataRequest(request: RequestChannel.Request) {
     val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest]
-    val topicsMetadata = new mutable.ArrayBuffer[TopicMetadata]()
-    val config = replicaManager.config
-    var uniqueTopics = Set.empty[String]
-    uniqueTopics = {
-      if(metadataRequest.topics.size > 0)
-        metadataRequest.topics.toSet
-      else {
-        partitionMetadataLock synchronized {
-          metadataCache.keySet.map(_.topic)
-        }
-      }
-    }
-    val topicMetadataList =
-      partitionMetadataLock synchronized {
-        uniqueTopics.map { topic =>
-          if(metadataCache.keySet.map(_.topic).contains(topic)) {
-            debug("Topic %s exists in metadata cache on broker %d".format(topic, config.brokerId))
-            val partitionStateInfo = metadataCache.filter(p => p._1.topic.equals(topic))
-            val sortedPartitions = partitionStateInfo.toList.sortWith((m1,m2) => m1._1.partition
< m2._1.partition)
-            val partitionMetadata = sortedPartitions.map { case(topicAndPartition, partitionState)
=>
-              val replicas = metadataCache(topicAndPartition).allReplicas
-              var replicaInfo: Seq[Broker] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_
!= null).toSeq
-              var leaderInfo: Option[Broker] = None
-              var isrInfo: Seq[Broker] = Nil
-              val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch
-              val leader = leaderIsrAndEpoch.leaderAndIsr.leader
-              val isr = leaderIsrAndEpoch.leaderAndIsr.isr
-              debug("%s".format(topicAndPartition) + ";replicas = " + replicas + ", in sync
replicas = " + isr + ", leader = " + leader)
-              try {
-                if(aliveBrokers.keySet.contains(leader))
-                  leaderInfo = Some(aliveBrokers(leader))
-                else throw new LeaderNotAvailableException("Leader not available for partition
%s".format(topicAndPartition))
-                isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null)
-                if(replicaInfo.size < replicas.size)
-                  throw new ReplicaNotAvailableException("Replica information not available
for following brokers: " +
-                    replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(","))
-                if(isrInfo.size < isr.size)
-                  throw new ReplicaNotAvailableException("In Sync Replica information not
available for following brokers: " +
-                    isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
-                new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo,
isrInfo, ErrorMapping.NoError)
-              } catch {
-                case e: Throwable =>
-                  error("Error while fetching metadata for partition %s".format(topicAndPartition),
e)
-                  new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo,
isrInfo,
-                    ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
-              }
-            }
-            new TopicMetadata(topic, partitionMetadata)
-          } else {
-            debug("Topic %s does not exist in metadata cache on broker %d".format(topic,
config.brokerId))
-            // topic doesn't exist, send appropriate error code
-            new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode)
-          }
-        }
-      }
+    val topicMetadata = getTopicMetadata(metadataRequest.topics.toSet)
+    trace("Sending topic metadata %s for correlation id %d to client %s".format(topicMetadata.mkString(","),
metadataRequest.correlationId, metadataRequest.clientId))
+    val response = new TopicMetadataResponse(topicMetadata, metadataRequest.correlationId)
+    requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
+  }
 
-    // handle auto create topics
-    topicMetadataList.foreach { topicMetadata =>
-      topicMetadata.errorCode match {
-        case ErrorMapping.NoError => topicsMetadata += topicMetadata
-        case ErrorMapping.UnknownTopicOrPartitionCode =>
-          if (config.autoCreateTopicsEnable) {
-            try {
-              AdminUtils.createTopic(zkClient, topicMetadata.topic, config.numPartitions,
config.defaultReplicationFactor)
-              info("Auto creation of topic %s with %d partitions and replication factor %d
is successful!"
-                .format(topicMetadata.topic, config.numPartitions, config.defaultReplicationFactor))
-            } catch {
-              case e: TopicExistsException => // let it go, possibly another broker created
this topic
-            }
-            topicsMetadata += new TopicMetadata(topicMetadata.topic, topicMetadata.partitionsMetadata,
ErrorMapping.LeaderNotAvailableCode)
-          } else {
-            debug("Auto create topic skipped for %s".format(topicMetadata.topic))
-            topicsMetadata += topicMetadata
-          }
-        case _ =>
-          debug("Error while fetching topic metadata for topic %s due to %s ".format(topicMetadata.topic,
-            ErrorMapping.exceptionFor(topicMetadata.errorCode).getClass.getName))
-          topicsMetadata += topicMetadata
+  private def getTopicMetadata(topics: Set[String]): Seq[TopicMetadata] = {
+    val (topicResponses, topicsToBeCreated) = metadataCache.getTopicMetadata(topics)
+
+    topicResponses.appendAll(topicsToBeCreated.map { topic =>
+      try {
+        AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
+        info("Auto creation of topic %s with %d partitions and replication factor %d is successful!".format(topic,
config.numPartitions, config.defaultReplicationFactor))
+      } catch {
+        case e: TopicExistsException => // let it go, possibly another broker created
this topic
       }
-    }
-    trace("Sending topic metadata %s for correlation id %d to client %s".format(topicsMetadata.mkString(","),
metadataRequest.correlationId, metadataRequest.clientId))
-    val response = new TopicMetadataResponse(topicsMetadata.toSeq, metadataRequest.correlationId)
-    requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
+      new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode)
+    })
+
+    topicResponses
   }
 
   /* 

http://git-wip-us.apache.org/repos/asf/kafka/blob/82f4a8e1/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index d5644ea..00b17c4 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -320,9 +320,9 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging
{
     try {
       // wait for the update metadata request to trickle to the brokers
       assertTrue("Topic test not created after timeout", TestUtils.waitUntilTrue(() =>
-        activeServers.foldLeft(true)(_ && _.apis.metadataCache(TopicAndPartition(topic,
partition)).leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3), 1000))
+        activeServers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfos(topic)(partition).leaderIsrAndControllerEpoch.leaderAndIsr.isr.size
!= 3), 1000))
       assertEquals(0, partitionsRemaining.size)
-      var partitionStateInfo = activeServers.head.apis.metadataCache(TopicAndPartition(topic,
partition))
+      var partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfos(topic)(partition)
       var leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
       assertEquals(0, leaderAfterShutdown)
       assertEquals(2, partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size)
@@ -331,15 +331,15 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging
{
       partitionsRemaining = controller.shutdownBroker(1)
       assertEquals(0, partitionsRemaining.size)
       activeServers = servers.filter(s => s.config.brokerId == 0)
-      partitionStateInfo = activeServers.head.apis.metadataCache(TopicAndPartition(topic,
partition))
+      partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfos(topic)(partition)
       leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
       assertEquals(0, leaderAfterShutdown)
 
-      assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache(TopicAndPartition(topic,
partition)).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
+      assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfos(topic)(partition).leaderIsrAndControllerEpoch.leaderAndIsr.leader
== 0))
       partitionsRemaining = controller.shutdownBroker(0)
       assertEquals(1, partitionsRemaining.size)
       // leader doesn't change since all the replicas are shut down
-      assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache(TopicAndPartition(topic,
partition)).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
+      assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfos(topic)(partition).leaderIsrAndControllerEpoch.leaderAndIsr.leader
== 0))
     }
     finally {
       servers.foreach(_.shutdown())

http://git-wip-us.apache.org/repos/asf/kafka/blob/82f4a8e1/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 1317b4c..a6b4b2d 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -93,7 +93,7 @@ class SimpleFetchTest extends JUnit3Suite {
     val requestChannel = new RequestChannel(2, 5)
     val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId,
configs.head, controller)
     val partitionStateInfo = EasyMock.createNiceMock(classOf[PartitionStateInfo])
-    apis.metadataCache.put(TopicAndPartition(topic, partitionId), partitionStateInfo)
+    apis.metadataCache.addPartitionInfo(topic, partitionId, partitionStateInfo)
     EasyMock.replay(partitionStateInfo)
     // This request (from a follower) wants to read up to 2*HW but should only get back up
to HW bytes into the log
     val goodFetch = new FetchRequestBuilder()
@@ -164,7 +164,7 @@ class SimpleFetchTest extends JUnit3Suite {
     val requestChannel = new RequestChannel(2, 5)
     val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId,
configs.head, controller)
     val partitionStateInfo = EasyMock.createNiceMock(classOf[PartitionStateInfo])
-    apis.metadataCache.put(TopicAndPartition(topic, partitionId), partitionStateInfo)
+    apis.metadataCache.addPartitionInfo(topic, partitionId, partitionStateInfo)
     EasyMock.replay(partitionStateInfo)
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/82f4a8e1/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 33ced14..7cf7257 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -523,7 +523,7 @@ object TestUtils extends Logging {
   def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition:
Int, timeout: Long) = {
     Assert.assertTrue("Partition [%s,%d] metadata not propagated after timeout".format(topic,
partition),
       TestUtils.waitUntilTrue(() =>
-        servers.foldLeft(true)(_ && _.apis.metadataCache.keySet.contains(TopicAndPartition(topic,
partition))), timeout))
+        servers.foldLeft(true)(_ && _.apis.metadataCache.containsTopicAndPartition(topic,
partition)), timeout))
   }
   
   def writeNonsenseToFile(fileName: File, position: Long, size: Int) {


Mime
View raw message