kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-1356 Topic metadata requests takes too long to process; reviewed by Neha Narkhede, Joel Koshy and Guozhang Wang
Date Tue, 08 Apr 2014 17:44:13 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 8f94bc331 -> 0e1d853c0


KAFKA-1356 Topic metadata requests takes too long to process; reviewed by Neha Narkhede, Joel
Koshy and Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0e1d853c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0e1d853c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0e1d853c

Branch: refs/heads/trunk
Commit: 0e1d853c04cb2fe86cf98f0b1006a96e1bb1e177
Parents: 8f94bc3
Author: Timothy Chen <tnachen@gmail.com>
Authored: Tue Apr 8 10:44:06 2014 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Tue Apr 8 10:44:07 2014 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/api/TopicMetadata.scala    |   8 +-
 .../kafka/controller/KafkaController.scala      |   1 -
 .../src/main/scala/kafka/server/KafkaApis.scala | 222 ++++++++++++-------
 .../test/scala/unit/kafka/admin/AdminTest.scala |  10 +-
 .../unit/kafka/server/SimpleFetchTest.scala     |   4 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |   2 +-
 6 files changed, 150 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0e1d853c/core/src/main/scala/kafka/api/TopicMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadata.scala b/core/src/main/scala/kafka/api/TopicMetadata.scala
index 0513a59..51380a6 100644
--- a/core/src/main/scala/kafka/api/TopicMetadata.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadata.scala
@@ -32,9 +32,11 @@ object TopicMetadata {
     val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue))
     val topic = readShortString(buffer)
     val numPartitions = readIntInRange(buffer, "number of partitions", (0, Int.MaxValue))
-    val partitionsMetadata = new ArrayBuffer[PartitionMetadata]()
-    for(i <- 0 until numPartitions)
-      partitionsMetadata += PartitionMetadata.readFrom(buffer, brokers)
+    val partitionsMetadata: Array[PartitionMetadata] = new Array[PartitionMetadata](numPartitions)
+    for(i <- 0 until numPartitions) {
+      val partitionMetadata = PartitionMetadata.readFrom(buffer, brokers)
+      partitionsMetadata(partitionMetadata.partitionId) = partitionMetadata
+    }
     new TopicMetadata(topic, partitionsMetadata, errorCode)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e1d853c/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index c8c02ce..933de9d 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -36,7 +36,6 @@ import org.apache.zookeeper.Watcher.Event.KeeperState
 import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient}
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
 import java.util.concurrent.atomic.AtomicInteger
-import org.apache.log4j.Logger
 import java.util.concurrent.locks.ReentrantLock
 import scala.Some
 import kafka.common.TopicAndPartition

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e1d853c/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 705d87e..4e11785 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -31,7 +31,10 @@ import kafka.utils.{Pool, SystemTime, Logging}
 import kafka.network.RequestChannel.Response
 import kafka.cluster.Broker
 import kafka.controller.KafkaController
+import kafka.utils.Utils.inLock
 import org.I0Itec.zkclient.ZkClient
+import java.util.concurrent.locks.ReentrantReadWriteLock
+import kafka.controller.KafkaController.StateChangeLogger
 
 /**
  * Logic to handle the various Kafka requests
@@ -51,12 +54,78 @@ class KafkaApis(val requestChannel: RequestChannel,
   private val delayedRequestMetrics = new DelayedRequestMetrics
   /* following 3 data structures are updated by the update metadata request
   * and is queried by the topic metadata request. */
-  var metadataCache: mutable.Map[TopicAndPartition, PartitionStateInfo] =
-    new mutable.HashMap[TopicAndPartition, PartitionStateInfo]()
+  var metadataCache = new MetadataCache
   private val aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]()
-  private val partitionMetadataLock = new Object
+  private val partitionMetadataLock = new ReentrantReadWriteLock()
   this.logIdent = "[KafkaApi-%d] ".format(brokerId)
 
+  class MetadataCache {
+    private val cache: mutable.Map[String, mutable.Map[Int, PartitionStateInfo]] =
+      new mutable.HashMap[String, mutable.Map[Int, PartitionStateInfo]]()
+
+    def addPartitionInfo(topic: String,
+                         partitionId: Int,
+                         stateInfo: PartitionStateInfo) {
+      cache.get(topic) match {
+        case Some(infos) => infos.put(partitionId, stateInfo)
+        case None => {
+          val newInfos: mutable.Map[Int, PartitionStateInfo] = new mutable.HashMap[Int, PartitionStateInfo]
+          cache.put(topic, newInfos)
+          newInfos.put(partitionId, stateInfo)
+        }
+      }
+    }
+
+    def removePartitionInfo(topic: String, partitionId: Int) = {
+      cache.get(topic) match {
+        case Some(infos) => {
+          infos.remove(partitionId)
+          if(infos.isEmpty) {
+            cache.remove(topic)
+          }
+          true
+        }
+        case None => false
+      }
+    }
+
+    def getPartitionInfos(topic: String) = cache(topic)
+
+    def containsTopicAndPartition(topic: String,
+                                  partitionId: Int): Boolean = {
+      cache.get(topic) match {
+        case Some(partitionInfos) => partitionInfos.contains(partitionId)
+        case None => false
+      }
+    }
+
+    def allTopics = cache.keySet
+
+    def removeTopic(topic: String) = cache.remove(topic)
+
+    def containsTopic(topic: String) = cache.contains(topic)
+
+    def updateCache(updateMetadataRequest: UpdateMetadataRequest,
+                    brokerId: Int,
+                    stateChangeLogger: StateChangeLogger) = {
+      updateMetadataRequest.partitionStateInfos.foreach { case(tp, info) =>
+        if (info.leaderIsrAndControllerEpoch.leaderAndIsr.leader == LeaderAndIsr.LeaderDuringDelete)
{
+	        removePartitionInfo(tp.topic, tp.partition)
+          stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in
response to UpdateMetadata request " +
+                                   "sent by controller %d epoch %d with correlation id %d")
+                                   .format(brokerId, tp, updateMetadataRequest.controllerId,
+                                           updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
+        } else {
+	        addPartitionInfo(tp.topic, tp.partition, info)
+          stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response
to UpdateMetadata request " +
+                                   "sent by controller %d epoch %d with correlation id %d")
+                                   .format(brokerId, info, tp, updateMetadataRequest.controllerId,
+                                           updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
+        }
+      }
+    }
+  }
+
   /**
    * Top-level method that handles all requests and multiplexes to the right api
    */
@@ -87,8 +156,10 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   // ensureTopicExists is only for client facing requests
   private def ensureTopicExists(topic: String) = {
-    if(!metadataCache.exists { case(topicAndPartition, partitionStateInfo) => topicAndPartition.topic.equals(topic)}
)
-      throw new UnknownTopicOrPartitionException("Topic " + topic + " either doesn't exist
or is in the process of being deleted")
+    inLock(partitionMetadataLock.readLock()) {
+      if (!metadataCache.containsTopic(topic))
+        throw new UnknownTopicOrPartitionException("Topic " + topic + " either doesn't exist
or is in the process of being deleted")
+    }
   }
 
   def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
@@ -132,26 +203,11 @@ class KafkaApis(val requestChannel: RequestChannel,
       stateChangeLogger.warn(stateControllerEpochErrorMessage)
       throw new ControllerMovedException(stateControllerEpochErrorMessage)
     }
-    partitionMetadataLock synchronized {
+    inLock(partitionMetadataLock.writeLock()) {
       replicaManager.controllerEpoch = updateMetadataRequest.controllerEpoch
       // cache the list of alive brokers in the cluster
       updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b))
-      updateMetadataRequest.partitionStateInfos.foreach { partitionState =>
-        if (partitionState._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader == LeaderAndIsr.LeaderDuringDelete)
{
-          val partition = partitionState._1
-          metadataCache.remove(partition)
-          stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in
response to UpdateMetadata request " +
-                                   "sent by controller %d epoch %d with correlation id %d")
-                                   .format(brokerId, partition, updateMetadataRequest.controllerId,
-                                           updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
-        } else {
-          metadataCache.put(partitionState._1, partitionState._2)
-          stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response
to UpdateMetadata request " +
-                                   "sent by controller %d epoch %d with correlation id %d")
-                                   .format(brokerId, partitionState._2, partitionState._1,
updateMetadataRequest.controllerId,
-                                           updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
-        }
-      }
+      metadataCache.updateCache(updateMetadataRequest, brokerId, stateChangeLogger)
     }
     val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId)
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(updateMetadataResponse)))
@@ -604,62 +660,68 @@ class KafkaApis(val requestChannel: RequestChannel,
   private def getTopicMetadata(topics: Set[String]): Seq[TopicMetadata] = {
     val config = replicaManager.config
 
-    partitionMetadataLock synchronized {
-      topics.map { topic =>
-        if(metadataCache.keySet.map(_.topic).contains(topic)) {
-          val partitionStateInfo = metadataCache.filter(p => p._1.topic.equals(topic))
-          val sortedPartitions = partitionStateInfo.toList.sortWith((m1,m2) => m1._1.partition
< m2._1.partition)
-          val partitionMetadata = sortedPartitions.map { case(topicAndPartition, partitionState)
=>
-            val replicas = metadataCache(topicAndPartition).allReplicas
-            val replicaInfo: Seq[Broker] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_
!= null).toSeq
-            var leaderInfo: Option[Broker] = None
-            var isrInfo: Seq[Broker] = Nil
-            val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch
-            val leader = leaderIsrAndEpoch.leaderAndIsr.leader
-            val isr = leaderIsrAndEpoch.leaderAndIsr.isr
-            debug("%s".format(topicAndPartition) + ";replicas = " + replicas + ", in sync
replicas = " + isr + ", leader = " + leader)
-            try {
-              if(aliveBrokers.keySet.contains(leader))
-                leaderInfo = Some(aliveBrokers(leader))
-              else throw new LeaderNotAvailableException("Leader not available for partition
%s".format(topicAndPartition))
-              isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null)
-              if(replicaInfo.size < replicas.size)
-                throw new ReplicaNotAvailableException("Replica information not available
for following brokers: " +
-                  replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(","))
-              if(isrInfo.size < isr.size)
-                throw new ReplicaNotAvailableException("In Sync Replica information not available
for following brokers: " +
-                  isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
-              new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo,
isrInfo, ErrorMapping.NoError)
-            } catch {
-              case e: Throwable =>
-                error("Error while fetching metadata for partition %s".format(topicAndPartition),
e)
-                new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo,
isrInfo,
-                  ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
-            }
+    // Returning all topics when requested topics are empty
+    val isAllTopics = topics.isEmpty
+    val topicResponses: mutable.ListBuffer[TopicMetadata] = new mutable.ListBuffer[TopicMetadata]
+    val topicsToBeCreated: mutable.ListBuffer[String] = new mutable.ListBuffer[String]
+
+    inLock(partitionMetadataLock.readLock()) {
+      val topicsRequested = if (isAllTopics) metadataCache.allTopics else topics
+      for (topic <- topicsRequested) {
+        if (isAllTopics || metadataCache.containsTopic(topic)) {
+          val partitionStateInfos = metadataCache.getPartitionInfos(topic)
+          val partitionMetadata = partitionStateInfos.map {
+            case (partitionId, partitionState) =>
+              val replicas = partitionState.allReplicas
+              val replicaInfo: Seq[Broker] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_
!= null).toSeq
+              var leaderInfo: Option[Broker] = None
+              var isrInfo: Seq[Broker] = Nil
+              val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch
+              val leader = leaderIsrAndEpoch.leaderAndIsr.leader
+              val isr = leaderIsrAndEpoch.leaderAndIsr.isr
+              debug("topic %s partition %s".format(topic, partitionId) + ";replicas = " +
replicas + ", in sync replicas = " + isr + ", leader = " + leader)
+              try {
+                leaderInfo = aliveBrokers.get(leader)
+                if (!leaderInfo.isDefined)
+                  throw new LeaderNotAvailableException("Leader not available for topic %s
partition %s".format(topic, partitionId))
+                isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null)
+                if (replicaInfo.size < replicas.size)
+                  throw new ReplicaNotAvailableException("Replica information not available
for following brokers: " +
+                    replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(","))
+                if (isrInfo.size < isr.size)
+                  throw new ReplicaNotAvailableException("In Sync Replica information not
available for following brokers: " +
+                    isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
+                new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
+              } catch {
+                case e: Throwable =>
+                  error("Error while fetching metadata for topic %s partition %s".format(topic,
partitionId), e)
+                  new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo,
+                    ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+              }
           }
-          new TopicMetadata(topic, partitionMetadata)
+          topicResponses += new TopicMetadata(topic, partitionMetadata.toSeq)
+        } else if (config.autoCreateTopicsEnable || topic == OffsetManager.OffsetsTopicName)
{
+          topicsToBeCreated += topic
         } else {
-          // topic doesn't exist, send appropriate error code after handling auto create
topics
-          val isOffsetsTopic = topic == OffsetManager.OffsetsTopicName
-          if (config.autoCreateTopicsEnable || isOffsetsTopic) {
-            try {
-              if (isOffsetsTopic)
-                AdminUtils.createTopic(zkClient, topic,
-                  config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor, offsetManager.offsetsTopicConfig)
-              else
-                AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
-              info("Auto creation of topic %s with %d partitions and replication factor %d
is successful!"
-                .format(topic, config.numPartitions, config.defaultReplicationFactor))
-            } catch {
-              case e: TopicExistsException => // let it go, possibly another broker created
this topic
-            }
-            new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode)
-          } else {
-            new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode)
-          }
+          topicResponses += new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode)
         }
       }
-    }.toSeq
+    }
+
+    topicResponses.appendAll(topicsToBeCreated.map { topic =>
+      try {
+        if (topic == OffsetManager.OffsetsTopicName)
+          AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor,
offsetManager.offsetsTopicConfig)
+        else
+          AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
+          info("Auto creation of topic %s with %d partitions and replication factor %d is
successful!".format(topic, config.numPartitions, config.defaultReplicationFactor))
+      } catch {
+        case e: TopicExistsException => // let it go, possibly another broker created
this topic
+      }
+      new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode)
+    })
+
+    topicResponses
   }
 
   /**
@@ -667,17 +729,7 @@ class KafkaApis(val requestChannel: RequestChannel,
    */
   def handleTopicMetadataRequest(request: RequestChannel.Request) {
     val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest]
-    var uniqueTopics = Set.empty[String]
-    uniqueTopics = {
-      if(metadataRequest.topics.size > 0)
-        metadataRequest.topics.toSet
-      else {
-        partitionMetadataLock synchronized {
-          metadataCache.keySet.map(_.topic)
-        }
-      }
-    }
-    val topicMetadata = getTopicMetadata(uniqueTopics)
+    val topicMetadata = getTopicMetadata(metadataRequest.topics.toSet)
     trace("Sending topic metadata %s for correlation id %d to client %s".format(topicMetadata.mkString(","),
metadataRequest.correlationId, metadataRequest.clientId))
     val response = new TopicMetadataResponse(topicMetadata, metadataRequest.correlationId)
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e1d853c/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index d5644ea..00b17c4 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -320,9 +320,9 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging
{
     try {
       // wait for the update metadata request to trickle to the brokers
       assertTrue("Topic test not created after timeout", TestUtils.waitUntilTrue(() =>
-        activeServers.foldLeft(true)(_ && _.apis.metadataCache(TopicAndPartition(topic,
partition)).leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3), 1000))
+        activeServers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfos(topic)(partition).leaderIsrAndControllerEpoch.leaderAndIsr.isr.size
!= 3), 1000))
       assertEquals(0, partitionsRemaining.size)
-      var partitionStateInfo = activeServers.head.apis.metadataCache(TopicAndPartition(topic,
partition))
+      var partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfos(topic)(partition)
       var leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
       assertEquals(0, leaderAfterShutdown)
       assertEquals(2, partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size)
@@ -331,15 +331,15 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging
{
       partitionsRemaining = controller.shutdownBroker(1)
       assertEquals(0, partitionsRemaining.size)
       activeServers = servers.filter(s => s.config.brokerId == 0)
-      partitionStateInfo = activeServers.head.apis.metadataCache(TopicAndPartition(topic,
partition))
+      partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfos(topic)(partition)
       leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
       assertEquals(0, leaderAfterShutdown)
 
-      assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache(TopicAndPartition(topic,
partition)).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
+      assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfos(topic)(partition).leaderIsrAndControllerEpoch.leaderAndIsr.leader
== 0))
       partitionsRemaining = controller.shutdownBroker(0)
       assertEquals(1, partitionsRemaining.size)
       // leader doesn't change since all the replicas are shut down
-      assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache(TopicAndPartition(topic,
partition)).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
+      assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfos(topic)(partition).leaderIsrAndControllerEpoch.leaderAndIsr.leader
== 0))
     }
     finally {
       servers.foreach(_.shutdown())

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e1d853c/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 22bb6f2..17b08e1 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -96,7 +96,7 @@ class SimpleFetchTest extends JUnit3Suite {
     val apis = new KafkaApis(requestChannel, replicaManager, offsetManager, zkClient, configs.head.brokerId,
configs.head, controller)
 
     val partitionStateInfo = EasyMock.createNiceMock(classOf[PartitionStateInfo])
-    apis.metadataCache.put(TopicAndPartition(topic, partitionId), partitionStateInfo)
+    apis.metadataCache.addPartitionInfo(topic, partitionId, partitionStateInfo)
     EasyMock.replay(partitionStateInfo)
     // This request (from a follower) wants to read up to 2*HW but should only get back up
to HW bytes into the log
     val goodFetch = new FetchRequestBuilder()
@@ -169,7 +169,7 @@ class SimpleFetchTest extends JUnit3Suite {
     val requestChannel = new RequestChannel(2, 5)
     val apis = new KafkaApis(requestChannel, replicaManager, offsetManager, zkClient, configs.head.brokerId,
configs.head, controller)
     val partitionStateInfo = EasyMock.createNiceMock(classOf[PartitionStateInfo])
-    apis.metadataCache.put(TopicAndPartition(topic, partitionId), partitionStateInfo)
+    apis.metadataCache.addPartitionInfo(topic, partitionId, partitionStateInfo)
     EasyMock.replay(partitionStateInfo)
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e1d853c/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 2054c25..71ab6e1 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -531,7 +531,7 @@ object TestUtils extends Logging {
   def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition:
Int, timeout: Long) = {
     assertTrue("Partition [%s,%d] metadata not propagated after timeout".format(topic, partition),
       TestUtils.waitUntilTrue(() =>
-        servers.foldLeft(true)(_ && _.apis.metadataCache.keySet.contains(TopicAndPartition(topic,
partition))), timeout))
+        servers.foldLeft(true)(_ && _.apis.metadataCache.containsTopicAndPartition(topic,
partition)), timeout))
   }
   
   def writeNonsenseToFile(fileName: File, position: Long, size: Int) {


Mime
View raw message