kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1401356 - /incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
Date Tue, 23 Oct 2012 17:25:03 GMT
Author: junrao
Date: Tue Oct 23 17:25:02 2012
New Revision: 1401356

URL: http://svn.apache.org/viewvc?rev=1401356&view=rev
Log:
system test testcase_0122 under replication fails due to large # of data loss; patched by
Jun Rao; reviewed by Neha Narkhede; KAFKA-580

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala?rev=1401356&r1=1401355&r2=1401356&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
Tue Oct 23 17:25:02 2012
@@ -51,31 +51,32 @@ class ConsumerFetcherManager(private val
         if (noLeaderPartitionSet.isEmpty)
           cond.await()
 
-        val brokers = getAllBrokersInCluster(zkClient)
-        val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m =>
m.topic).toSet, brokers).topicsMetadata
-        val leaderForPartitionsMap = new HashMap[(String, Int), Broker]
-        topicsMetadata.foreach(
-          tmd => {
-            val topic = tmd.topic
-            tmd.partitionsMetadata.foreach(
-            pmd => {
-              if(pmd.leader.isDefined){
-                val partition = pmd.partitionId
-                val leaderBroker = pmd.leader.get
-                leaderForPartitionsMap.put((topic, partition), leaderBroker)
-              }
+        try {
+          trace("Partitions without leader %s".format(noLeaderPartitionSet))
+          val brokers = getAllBrokersInCluster(zkClient)
+          val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m
=> m.topic).toSet, brokers).topicsMetadata
+          val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker]
+          topicsMetadata.foreach(
+            tmd => {
+              val topic = tmd.topic
+              tmd.partitionsMetadata.foreach(
+              pmd => {
+                val topicAndPartition = TopicAndPartition(topic, pmd.partitionId)
+                if(pmd.leader.isDefined && noLeaderPartitionSet.contains(topicAndPartition))
{
+                  val leaderBroker = pmd.leader.get
+                  leaderForPartitionsMap.put(topicAndPartition, leaderBroker)
+                }
+              })
             })
-          })
-        noLeaderPartitionSet.foreach
-        {
-          case(TopicAndPartition(topic, partitionId)) =>
-            // find the leader for this partition
-            val leaderBrokerOpt = leaderForPartitionsMap.get((topic, partitionId))
-            if(leaderBrokerOpt.isDefined){
-              val pti = partitionMap(TopicAndPartition(topic, partitionId))
-              addFetcher(topic, partitionId, pti.getFetchOffset(), leaderBrokerOpt.get)
-              noLeaderPartitionSet.remove(TopicAndPartition(topic, partitionId))
-            }
+
+          leaderForPartitionsMap.foreach{
+            case(topicAndPartition, leaderBroker) =>
+              val pti = partitionMap(topicAndPartition)
+              addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(),
leaderBroker)
+          }
+          noLeaderPartitionSet --= leaderForPartitionsMap.keySet
+        } catch {
+          case t => warn("Failed to find leader for %s".format(noLeaderPartitionSet),
t)
         }
       } finally {
         lock.unlock()



Mime
View raw message