kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1390798 - in /incubator/kafka/branches/0.8/core/src/main/scala/kafka: consumer/ producer/ utils/
Date Thu, 27 Sep 2012 01:19:30 GMT
Author: junrao
Date: Thu Sep 27 01:19:29 2012
New Revision: 1390798

URL: http://svn.apache.org/viewvc?rev=1390798&view=rev
Log:
Use getMetadata Api in ZookeeperConsumerConnector; patched by Yang Ye; reviewed by Jun Rao;
KAFKA-473

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala?rev=1390798&r1=1390797&r2=1390798&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
Thu Sep 27 01:19:29 2012
@@ -21,13 +21,14 @@ import org.I0Itec.zkclient.ZkClient
 import kafka.server.{AbstractFetcherThread, AbstractFetcherManager}
 import kafka.cluster.{Cluster, Broker}
 import scala.collection.immutable
+import collection.mutable.HashMap
 import scala.collection.mutable
 import java.util.concurrent.locks.ReentrantLock
 import kafka.utils.ZkUtils._
 import kafka.utils.{ShutdownableThread, SystemTime}
+import kafka.utils.Utils._
 import kafka.common.TopicAndPartition
 
-
 /**
  *  Usage:
  *  Once ConsumerFetcherManager is created, startConnections() and stopAllConnections() can
be called repeatedly
@@ -49,21 +50,31 @@ class ConsumerFetcherManager(private val
       try {
         if (noLeaderPartitionSet.isEmpty)
           cond.await()
-        noLeaderPartitionSet.foreach {
+
+        val brokers = getAllBrokersInCluster(zkClient)
+        val topicsMetadata = getTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSeq,
brokers).topicsMetadata
+        val leaderForPartitionsMap = new HashMap[(String, Int), Broker]
+        topicsMetadata.foreach(
+          tmd => {
+            val topic = tmd.topic
+            tmd.partitionsMetadata.foreach(
+            pmd => {
+              if(pmd.leader.isDefined){
+                val partition = pmd.partitionId
+                val leaderBroker = pmd.leader.get
+                leaderForPartitionsMap.put((topic, partition), leaderBroker)
+              }
+            })
+          })
+        noLeaderPartitionSet.foreach
+        {
           case(TopicAndPartition(topic, partitionId)) =>
             // find the leader for this partition
-            getLeaderForPartition(zkClient, topic, partitionId) match {
-              case Some(leaderId) =>
-                cluster.getBroker(leaderId) match {
-                  case Some(broker) =>
-                    val pti = partitionMap((topic, partitionId))
-                    addFetcher(topic, partitionId, pti.getFetchOffset(), broker)
-                    noLeaderPartitionSet.remove(TopicAndPartition(topic, partitionId))
-                  case None =>
-                    error("Broker %d is unavailable, fetcher for topic %s partition %d could
not be started"
-                                  .format(leaderId, topic, partitionId))
-                }
-              case None => // let it go since we will keep retrying
+            val leaderBrokerOpt = leaderForPartitionsMap.get((topic, partitionId))
+            if(leaderBrokerOpt.isDefined){
+              val pti = partitionMap((topic, partitionId))
+              addFetcher(topic, partitionId, pti.getFetchOffset(), leaderBrokerOpt.get)
+              noLeaderPartitionSet.remove(TopicAndPartition(topic, partitionId))
             }
         }
       } finally {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1390798&r1=1390797&r2=1390798&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
Thu Sep 27 01:19:29 2012
@@ -27,13 +27,14 @@ import org.I0Itec.zkclient.exception.ZkN
 import java.net.InetAddress
 import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
 import org.apache.zookeeper.Watcher.Event.KeeperState
-import kafka.api.OffsetRequest
 import java.util.UUID
 import kafka.serializer.Decoder
 import kafka.utils.ZkUtils._
 import kafka.common.{KafkaException, NoBrokersForPartitionException, ConsumerRebalanceFailedException,
InvalidConfigException}
 import com.yammer.metrics.core.Gauge
 import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.Utils._
+import kafka.api.OffsetRequest
 
 
 /**
@@ -410,8 +411,21 @@ private[kafka] class ZookeeperConsumerCo
     private def rebalance(cluster: Cluster): Boolean = {
       val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic
       val consumersPerTopicMap = getConsumersPerTopic(zkClient, group)
-      val partitionsPerTopicMap = getPartitionsForTopics(zkClient, myTopicThreadIdsMap.keySet.toSeq)
-
+      val brokers = getAllBrokersInCluster(zkClient)
+      val topicsMetadata = getTopicMetadata(myTopicThreadIdsMap.keySet.toSeq, brokers).topicsMetadata
+      val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
+      val leaderIdForPartitionsMap = new mutable.HashMap[(String, Int), Int]
+      topicsMetadata.foreach(m =>{
+        val topic = m.topic
+        val partitions = m.partitionsMetadata.map(m1 => m1.partitionId)
+        partitionsPerTopicMap.put(topic, partitions)
+        m.partitionsMetadata.foreach(pmd =>{
+          val partitionId = pmd.partitionId
+          val leaderOpt = pmd.leader
+          if(leaderOpt.isDefined)
+            leaderIdForPartitionsMap.put((topic, partitionId), leaderOpt.get.id)
+        })
+      })
       /**
        * fetchers must be stopped to avoid data duplication, since if the current
        * rebalancing attempt fails, the partitions that are released could be owned by another
consumer.
@@ -423,14 +437,14 @@ private[kafka] class ZookeeperConsumerCo
       releasePartitionOwnership(topicRegistry)
 
       var partitionOwnershipDecision = new collection.mutable.HashMap[(String, Int), String]()
-      var currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
+      val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
 
       for ((topic, consumerThreadIdSet) <- myTopicThreadIdsMap) {
         currentTopicRegistry.put(topic, new Pool[Int, PartitionTopicInfo])
 
         val topicDirs = new ZKGroupTopicDirs(group, topic)
         val curConsumers = consumersPerTopicMap.get(topic).get
-        var curPartitions: Seq[Int] = partitionsPerTopicMap.get(topic).get
+        val curPartitions: Seq[Int] = partitionsPerTopicMap.get(topic).get
 
         val nPartsPerConsumer = curPartitions.size / curConsumers.size
         val nConsumersWithExtraPart = curPartitions.size % curConsumers.size
@@ -454,7 +468,7 @@ private[kafka] class ZookeeperConsumerCo
             for (i <- startPart until startPart + nParts) {
               val partition = curPartitions(i)
               info(consumerThreadId + " attempting to claim partition " + partition)
-              addPartitionTopicInfo(currentTopicRegistry, topicDirs, partition, topic, consumerThreadId)
+              addPartitionTopicInfo(currentTopicRegistry, leaderIdForPartitionsMap, topicDirs,
partition, topic, consumerThreadId)
               // record the partition ownership decision
               partitionOwnershipDecision += ((topic, partition) -> consumerThreadId)
             }
@@ -481,7 +495,7 @@ private[kafka] class ZookeeperConsumerCo
     private def closeFetchersForQueues(cluster: Cluster,
                                        messageStreams: Map[String,List[KafkaStream[_]]],
                                        queuesToBeCleared: Iterable[BlockingQueue[FetchedDataChunk]])
{
-      var allPartitionInfos = topicRegistry.values.map(p => p.values).flatten
+      val allPartitionInfos = topicRegistry.values.map(p => p.values).flatten
       fetcher match {
         case Some(f) =>
           f.stopAllConnections
@@ -571,12 +585,13 @@ private[kafka] class ZookeeperConsumerCo
     }
 
     private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]],
+                                      leaderIdForPartitionsMap: Map[(String, Int), Int],
                                       topicDirs: ZKGroupTopicDirs, partition: Int,
                                       topic: String, consumerThreadId: String) {
       val partTopicInfoMap = currentTopicRegistry.get(topic)
 
       // find the leader for this partition
-      val leaderOpt = getLeaderForPartition(zkClient, topic, partition)
+      val leaderOpt = leaderIdForPartitionsMap.get((topic, partition))
       leaderOpt match {
         case None => throw new NoBrokersForPartitionException("No leader available for
partition %d on topic %s".
           format(partition, topic))

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala?rev=1390798&r1=1390797&r2=1390798&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
Thu Sep 27 01:19:29 2012
@@ -70,43 +70,23 @@ class BrokerPartitionInfo(producerConfig
    * @param topic the topic for which the metadata is to be fetched
    */
   def updateInfo(topics: Seq[String]) = {
-    var fetchMetaDataSucceeded: Boolean = false
-    var i: Int = 0
-    val topicMetadataRequest = new TopicMetadataRequest(topics)
     var topicsMetadata: Seq[TopicMetadata] = Nil
-    var t: Throwable = null
-    while(i < brokers.size && !fetchMetaDataSucceeded) {
-      val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, brokers(i))
-      info("Fetching metadata for topic %s".format(topics))
-      try {
-        topicsMetadata = producer.send(topicMetadataRequest).topicsMetadata
-        fetchMetaDataSucceeded = true
-        // throw partition specific exception
-        topicsMetadata.foreach(tmd =>{
-          trace("Metadata for topic %s is %s".format(tmd.topic, tmd))
-          if(tmd.errorCode == ErrorMapping.NoError){
-            topicPartitionInfo.put(tmd.topic, tmd)
-          } else
-            warn("Metadata for topic [%s] is erronous: [%s]".format(tmd.topic, tmd), ErrorMapping.exceptionFor(tmd.errorCode))
-          tmd.partitionsMetadata.foreach(pmd =>{
-            if (pmd.errorCode != ErrorMapping.NoError){
-              debug("Metadata for topic partition [%s, %d] is errornous: [%s]".format(tmd.topic,
pmd.partitionId, pmd), ErrorMapping.exceptionFor(pmd.errorCode))
-            }
-          })
-        })
-        producerPool.updateProducer(topicsMetadata)
-      } catch {
-        case e =>
-          warn("fetching broker partition metadata for topics [%s] from broker [%s] failed".format(topics,
brokers(i).toString), e)
-          t = e
-      } finally {
-        i = i + 1
-        producer.close()
-      }
-    }
-    if(!fetchMetaDataSucceeded){
-      throw new KafkaException("fetching broker partition metadata for topics [%s] from broker
[%s] failed".format(topics, brokers), t)
-    }
+    val topicMetadataResponse = Utils.getTopicMetadata(topics, brokers)
+    topicsMetadata = topicMetadataResponse.topicsMetadata
+    // throw partition specific exception
+    topicsMetadata.foreach(tmd =>{
+      trace("Metadata for topic %s is %s".format(tmd.topic, tmd))
+      if(tmd.errorCode == ErrorMapping.NoError){
+        topicPartitionInfo.put(tmd.topic, tmd)
+      } else
+        warn("Metadata for topic [%s] is erronous: [%s]".format(tmd.topic, tmd), ErrorMapping.exceptionFor(tmd.errorCode))
+      tmd.partitionsMetadata.foreach(pmd =>{
+        if (pmd.errorCode != ErrorMapping.NoError){
+          debug("Metadata for topic partition [%s, %d] is errornous: [%s]".format(tmd.topic,
pmd.partitionId, pmd), ErrorMapping.exceptionFor(pmd.errorCode))
+        }
+      })
+    })
+    producerPool.updateProducer(topicsMetadata)
   }
 }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala?rev=1390798&r1=1390797&r2=1390798&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala Thu
Sep 27 01:19:29 2012
@@ -21,23 +21,18 @@ import kafka.cluster.Broker
 import java.util.Properties
 import collection.mutable.HashMap
 import java.lang.Object
-import kafka.common.UnavailableProducerException
 import kafka.utils.{Utils, Logging}
 import kafka.api.TopicMetadata
+import kafka.common.UnavailableProducerException
 
 
 object ProducerPool{
-  def createSyncProducer(config: ProducerConfig): SyncProducer = {
-    val brokerList = config.brokerList
-    val brokers = Utils.getAllBrokersFromBrokerList(brokerList)
-    createSyncProducer(config, brokers.head)
-  }
-
-  def createSyncProducer(config: ProducerConfig, broker: Broker): SyncProducer = {
+  def createSyncProducer(configOpt: Option[ProducerConfig], broker: Broker): SyncProducer
= {
     val props = new Properties()
     props.put("host", broker.host)
     props.put("port", broker.port.toString)
-    props.putAll(config.props.props)
+    if(configOpt.isDefined)
+      props.putAll(configOpt.get.props.props)
     new SyncProducer(new SyncProducerConfig(props))
   }
 }
@@ -58,9 +53,9 @@ class ProducerPool(val config: ProducerC
       newBrokers.foreach(b => {
         if(syncProducers.contains(b.id)){
           syncProducers(b.id).close()
-          syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
+          syncProducers.put(b.id, ProducerPool.createSyncProducer(Some(config), b))
         } else
-          syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
+          syncProducers.put(b.id, ProducerPool.createSyncProducer(Some(config), b))
       })
     }
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1390798&r1=1390797&r2=1390798&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Thu Sep 27 01:19:29
2012
@@ -31,6 +31,8 @@ import joptsimple.{OptionSpec, OptionSet
 import kafka.common.KafkaException
 import kafka.cluster.Broker
 import util.parsing.json.JSON
+import kafka.api.{TopicMetadataRequest, TopicMetadataResponse}
+import kafka.producer.{ProducerPool, SyncProducer}
 
 
 /**
@@ -674,6 +676,34 @@ object Utils extends Logging {
     }
   }
 
+  def getTopicMetadata(topics: Seq[String], brokers: Seq[Broker]): TopicMetadataResponse
= {
+    var fetchMetaDataSucceeded: Boolean = false
+    var i: Int = 0
+    val topicMetadataRequest = new TopicMetadataRequest(topics)
+    var topicMetadataResponse: TopicMetadataResponse = null
+    var t: Throwable = null
+    while(i < brokers.size && !fetchMetaDataSucceeded) {
+      val producer: SyncProducer = ProducerPool.createSyncProducer(None, brokers(i))
+      info("Fetching metadata for topic %s".format(topics))
+      try {
+        topicMetadataResponse = producer.send(topicMetadataRequest)
+        fetchMetaDataSucceeded = true
+      }
+      catch {
+        case e =>
+          warn("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics,
brokers(i).toString), e)
+          t = e
+      } finally {
+        i = i + 1
+        producer.close()
+      }
+    }
+    if(!fetchMetaDataSucceeded){
+      throw new KafkaException("fetching topic metadata for topics [%s] from broker [%s]
failed".format(topics, brokers), t)
+    }
+    return topicMetadataResponse
+  }
+
   /**
    * Create a circular (looping) iterator over a collection.
    * @param coll An iterable over the underlying collection.

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1390798&r1=1390797&r2=1390798&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Thu Sep 27
01:19:29 2012
@@ -523,14 +523,6 @@ object ZkUtils extends Logging {
     getChildren(zkClient, dirs.consumerRegistryDir)
   }
 
-  def getConsumerTopicMaps(zkClient: ZkClient, group: String): Map[String, TopicCount] =
{
-    val dirs = new ZKGroupDirs(group)
-    val consumersInGroup = getConsumersInGroup(zkClient, group)
-    val topicCountMaps = consumersInGroup.map(consumerId => TopicCount.constructTopicCount(consumerId,
-      ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)._1, zkClient))
-    consumersInGroup.zip(topicCountMaps).toMap
-  }
-
   def getConsumersPerTopic(zkClient: ZkClient, group: String) : mutable.Map[String, List[String]]
= {
     val dirs = new ZKGroupDirs(group)
     val consumers = getChildrenParentMayNotExist(zkClient, dirs.consumerRegistryDir)



Mime
View raw message