kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1396465 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/admin/ main/scala/kafka/consumer/ main/scala/kafka/producer/ main/scala/kafka/producer/async/ main/scala/kafka/server/ main/scala/kafka/utils/ test/scala/unit/kafka/admi...
Date Wed, 10 Oct 2012 04:24:27 GMT
Author: jkreps
Date: Wed Oct 10 04:24:27 2012
New Revision: 1396465

URL: http://svn.apache.org/viewvc?rev=1396465&view=rev
Log:
KAFKA-543 Avoid sending duplicate topic names in TopicMetadataRequest. Reviewed by Neha.


Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala?rev=1396465&r1=1396464&r2=1396465&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala Wed Oct
10 04:24:27 2012
@@ -23,6 +23,7 @@ import kafka.cluster.Broker
 import kafka.utils.{Logging, Utils, ZkUtils}
 import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
+import scala.collection._
 import scala.collection.mutable
 import kafka.common.{BrokerNotAvailableException, LeaderNotAvailableException, ReplicaNotAvailableException,
ErrorMapping}
 
@@ -85,57 +86,61 @@ object AdminUtils extends Logging {
       case e2 => throw new AdministrationException(e2.toString)
     }
   }
+  
+  def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient): TopicMetadata = 
+    fetchTopicMetadataFromZk(topic, zkClient, new mutable.HashMap[Int, Broker])
 
-  def getTopicMetaDataFromZK(topics: Seq[String], zkClient: ZkClient): Seq[TopicMetadata]
= {
+  def fetchTopicMetadataFromZk(topics: Set[String], zkClient: ZkClient): Set[TopicMetadata]
= {
     val cachedBrokerInfo = new mutable.HashMap[Int, Broker]()
-    topics.map { topic =>
-      if(ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) {
-        val topicPartitionAssignment = ZkUtils.getPartitionAssignmentForTopics(zkClient,
List(topic)).get(topic).get
-        val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1
< m2._1)
-
-        val partitionMetadata = sortedPartitions.map { partitionMap =>
-          val partition = partitionMap._1
-          val replicas = partitionMap._2
-          val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partition)
-          val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
-          debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ",
leader = " + leader)
-
-          var leaderInfo: Option[Broker] = None
-          var replicaInfo: Seq[Broker] = Nil
-          var isrInfo: Seq[Broker] = Nil
-          try {
-            try {
-              leaderInfo = leader match {
-                case Some(l) => Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo,
List(l)).head)
-                case None => throw new LeaderNotAvailableException("No leader exists for
partition " + partition)
-              }
-            }catch {
-              case e => throw new LeaderNotAvailableException("Leader not available for
topic %s partition %d"
-                .format(topic, partition))
-            }
-
-            try {
-              replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id
=> id.toInt))
-              isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas)
-            }catch {
-              case e => throw new ReplicaNotAvailableException(e)
-            }
-
-            new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
-          }catch {
-            case e: ReplicaNotAvailableException =>
-              new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo,
-                                    ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
-            case le: LeaderNotAvailableException =>
-              new PartitionMetadata(partition, None, replicaInfo, isrInfo,
-                                    ErrorMapping.codeFor(le.getClass.asInstanceOf[Class[Throwable]]))
+    topics.map(topic => fetchTopicMetadataFromZk(topic, zkClient, cachedBrokerInfo))
+  }
+  
+  private def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient, cachedBrokerInfo:
mutable.HashMap[Int, Broker]): TopicMetadata = {
+    if(ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) {
+      val topicPartitionAssignment = ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic).get
+      val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1
< m2._1)
+
+      val partitionMetadata = sortedPartitions.map { partitionMap =>
+      val partition = partitionMap._1
+      val replicas = partitionMap._2
+      val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partition)
+      val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
+      debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader
= " + leader)
+
+      var leaderInfo: Option[Broker] = None
+      var replicaInfo: Seq[Broker] = Nil
+      var isrInfo: Seq[Broker] = Nil
+      try {
+        try {
+          leaderInfo = leader match {
+            case Some(l) => Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head)
+            case None => throw new LeaderNotAvailableException("No leader exists for partition
" + partition)
           }
+        } catch {
+          case e => throw new LeaderNotAvailableException("Leader not available for topic
%s partition %d".format(topic, partition))
+        }
+
+        try {
+          replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id
=> id.toInt))
+          isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas)
+        } catch {
+          case e => throw new ReplicaNotAvailableException(e)
+        }
+
+          new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
+        } catch {
+          case e: ReplicaNotAvailableException =>
+            new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo,
+                                  ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+          case le: LeaderNotAvailableException =>
+            new PartitionMetadata(partition, None, replicaInfo, isrInfo,
+                                  ErrorMapping.codeFor(le.getClass.asInstanceOf[Class[Throwable]]))
         }
-        new TopicMetadata(topic, partitionMetadata)
-      } else {
-        // topic doesn't exist, send appropriate error code
-        new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode)
       }
+      new TopicMetadata(topic, partitionMetadata)
+    } else {
+      // topic doesn't exist, send appropriate error code
+      new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode)
     }
   }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala?rev=1396465&r1=1396464&r2=1396465&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ListTopicCommand.scala Wed
Oct 10 04:24:27 2012
@@ -77,7 +77,7 @@ object ListTopicCommand {
   }
 
   def showTopic(topic: String, zkClient: ZkClient) {
-    val topicMetaData = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
+    val topicMetaData = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
     topicMetaData.errorCode match {
       case ErrorMapping.UnknownTopicOrPartitionCode =>
         println("topic " + topic + " doesn't exist!")

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala?rev=1396465&r1=1396464&r2=1396465&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
Wed Oct 10 04:24:27 2012
@@ -52,7 +52,7 @@ class ConsumerFetcherManager(private val
           cond.await()
 
         val brokers = getAllBrokersInCluster(zkClient)
-        val topicsMetadata = getTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSeq,
brokers).topicsMetadata
+        val topicsMetadata = getTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet,
brokers).topicsMetadata
         val leaderForPartitionsMap = new HashMap[(String, Int), Broker]
         topicsMetadata.foreach(
           tmd => {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1396465&r1=1396464&r2=1396465&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
Wed Oct 10 04:24:27 2012
@@ -414,7 +414,7 @@ private[kafka] class ZookeeperConsumerCo
       val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic
       val consumersPerTopicMap = getConsumersPerTopic(zkClient, group)
       val brokers = getAllBrokersInCluster(zkClient)
-      val topicsMetadata = getTopicMetadata(myTopicThreadIdsMap.keySet.toSeq, brokers).topicsMetadata
+      val topicsMetadata = getTopicMetadata(myTopicThreadIdsMap.keySet, brokers).topicsMetadata
       val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
       val leaderIdForPartitionsMap = new mutable.HashMap[(String, Int), Int]
       topicsMetadata.foreach(m =>{

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala?rev=1396465&r1=1396464&r2=1396465&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
Wed Oct 10 04:24:27 2012
@@ -45,7 +45,7 @@ class BrokerPartitionInfo(producerConfig
         case Some(m) => m
         case None =>
           // refresh the topic metadata cache
-          updateInfo(List(topic))
+          updateInfo(Set(topic))
           val topicMetadata = topicPartitionInfo.get(topic)
           topicMetadata match {
             case Some(m) => m
@@ -69,7 +69,7 @@ class BrokerPartitionInfo(producerConfig
    * It updates the cache by issuing a get topic metadata request to a random broker.
    * @param topic the topic for which the metadata is to be fetched
    */
-  def updateInfo(topics: Seq[String]) = {
+  def updateInfo(topics: Set[String]) = {
     var topicsMetadata: Seq[TopicMetadata] = Nil
     val topicMetadataResponse = Utils.getTopicMetadata(topics, brokers)
     topicsMetadata = topicMetadataResponse.topicsMetadata

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1396465&r1=1396464&r2=1396465&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
Wed Oct 10 04:24:27 2012
@@ -55,7 +55,7 @@ class DefaultEventHandler[K,V](config: P
           // back off and update the topic metadata cache before attempting another send
operation
           Thread.sleep(config.producerRetryBackoffMs)
           // get topics of the outstanding produce requests and refresh metadata for those
-          Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.getTopic)))
+          Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.getTopic).toSet))
           remainingRetries -= 1
           ProducerStats.resendRate.mark()
         }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1396465&r1=1396464&r2=1396465&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Wed Oct
10 04:24:27 2012
@@ -358,33 +358,33 @@ class KafkaApis(val requestChannel: Requ
 
     val topicsMetadata = new mutable.ArrayBuffer[TopicMetadata]()
     val config = replicaManager.config
-    val topicMetadataList = AdminUtils.getTopicMetaDataFromZK(metadataRequest.topics, zkClient)
-    metadataRequest.topics.zip(topicMetadataList).foreach(
+    val uniqueTopics = metadataRequest.topics.toSet
+    val topicMetadataList = AdminUtils.fetchTopicMetadataFromZk(uniqueTopics, zkClient)
+    topicMetadataList.foreach(
       topicAndMetadata => {
-        val topic = topicAndMetadata._1
-        topicAndMetadata._2.errorCode match {
-          case ErrorMapping.NoError => topicsMetadata += topicAndMetadata._2
+        topicAndMetadata.errorCode match {
+          case ErrorMapping.NoError => topicsMetadata += topicAndMetadata
           case ErrorMapping.UnknownTopicOrPartitionCode =>
             try {
               /* check if auto creation of topics is turned on */
               if (config.autoCreateTopics) {
-                CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
+                CreateTopicCommand.createTopic(zkClient, topicAndMetadata.topic, config.numPartitions,
config.defaultReplicationFactor)
                 info("Auto creation of topic %s with %d partitions and replication factor
%d is successful!"
-                             .format(topic, config.numPartitions, config.defaultReplicationFactor))
-                val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
+                             .format(topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor))
+                val newTopicMetadata = AdminUtils.fetchTopicMetadataFromZk(topicAndMetadata.topic,
zkClient)
                 topicsMetadata += newTopicMetadata
                 newTopicMetadata.errorCode match {
                   case ErrorMapping.NoError =>
-                  case _ => throw new KafkaException("Topic metadata for automatically
created topic %s does not exist".format(topic))
+                  case _ => throw new KafkaException("Topic metadata for automatically
created topic %s does not exist".format(topicAndMetadata.topic))
                 }
               }
             } catch {
               case e => error("Error while retrieving topic metadata", e)
             }
           case _ => 
-            error("Error while fetching topic metadata for topic " + topic,
-                  ErrorMapping.exceptionFor(topicAndMetadata._2.errorCode).getCause)
-            topicsMetadata += topicAndMetadata._2
+            error("Error while fetching topic metadata for topic " + topicAndMetadata.topic,
+                  ErrorMapping.exceptionFor(topicAndMetadata.errorCode).getCause)
+            topicsMetadata += topicAndMetadata
         }
       })
     topicsMetadata.foreach(metadata => trace("Sending topic metadata " + metadata.toString))

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1396465&r1=1396464&r2=1396465&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Wed Oct 10 04:24:27
2012
@@ -677,10 +677,10 @@ object Utils extends Logging {
     }
   }
 
-  def getTopicMetadata(topics: Seq[String], brokers: Seq[Broker]): TopicMetadataResponse
= {
+  def getTopicMetadata(topics: Set[String], brokers: Seq[Broker]): TopicMetadataResponse
= {
     var fetchMetaDataSucceeded: Boolean = false
     var i: Int = 0
-    val topicMetadataRequest = new TopicMetadataRequest(topics)
+    val topicMetadataRequest = new TopicMetadataRequest(topics.toSeq)
     var topicMetadataResponse: TopicMetadataResponse = null
     var t: Throwable = null
     while(i < brokers.size && !fetchMetaDataSucceeded) {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala?rev=1396465&r1=1396464&r2=1396465&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala Wed
Oct 10 04:24:27 2012
@@ -158,13 +158,11 @@ class AdminTest extends JUnit3Suite with
     AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
     // create leaders for all partitions
     TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap)
-    val actualReplicaAssignment = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
-                                  .partitionsMetadata.map(p => p.replicas)
+    val actualReplicaAssignment = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata.map(p
=> p.replicas)
     val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList
     assertEquals(expectedReplicaAssignment.size, actualReplicaList.size)
-    for( i <- 0 until actualReplicaList.size ) {
+    for(i <- 0 until actualReplicaList.size)
       assertEquals(expectedReplicaAssignment.get(i).get, actualReplicaList(i))
-    }
 
     try {
       AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment,
zkClient)
@@ -191,7 +189,7 @@ class AdminTest extends JUnit3Suite with
     // create leaders for all partitions
     TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap)
 
-    val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
+    val newTopicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
     newTopicMetadata.errorCode match {
       case ErrorMapping.UnknownTopicOrPartitionCode =>
         fail("Topic " + topic + " should've been automatically created")

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1396465&r1=1396464&r2=1396465&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
Wed Oct 10 04:24:27 2012
@@ -318,8 +318,7 @@ class PrimitiveApiTest extends JUnit3Sui
     val newTopic = "new-topic"
     CreateTopicCommand.createTopic(zkClient, newTopic, 1, 1, config.brokerId.toString)
     assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
-      AdminUtils.getTopicMetaDataFromZK(List(newTopic),
-        zkClient).head.errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
+      AdminUtils.fetchTopicMetadataFromZk(newTopic, zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode,
zookeeper.tickTime))
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, newTopic, 0, 500)
     val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(newTopic, 0, 0,
10000).build())
     assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1396465&r1=1396464&r2=1396465&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
Wed Oct 10 04:24:27 2012
@@ -88,8 +88,7 @@ class ProducerTest extends JUnit3Suite w
   def testUpdateBrokerPartitionInfo() {
     CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2)
     assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
-      AdminUtils.getTopicMetaDataFromZK(List("new-topic"),
-        zkClient).head.errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
+      AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode,
zookeeper.tickTime))
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
 
     val props1 = new util.Properties()
@@ -154,8 +153,7 @@ class ProducerTest extends JUnit3Suite w
     // create topic with 1 partition and await leadership
     CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2)
     assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
-      AdminUtils.getTopicMetaDataFromZK(List("new-topic"),
-        zkClient).head.errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
+      AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode,
zookeeper.tickTime))
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
 
     val producer1 = new Producer[String, String](producerConfig1)
@@ -206,8 +204,7 @@ class ProducerTest extends JUnit3Suite w
     // create topic
     CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0,0,0,0")
     assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
-      AdminUtils.getTopicMetaDataFromZK(List("new-topic"),
-        zkClient).head.errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
+      AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode,
zookeeper.tickTime))
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 1, 500)
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 2, 500)
@@ -267,8 +264,7 @@ class ProducerTest extends JUnit3Suite w
     // create topics in ZK
     CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1")
     assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
-      AdminUtils.getTopicMetaDataFromZK(List("new-topic"),
-        zkClient).head.errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
+      AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode,
zookeeper.tickTime))
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
 
     // do a simple test to make sure plumbing is okay



Mime
View raw message