kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject svn commit: r1410665 - in /incubator/kafka/branches/0.8/core/src/main/scala/kafka: consumer/ConsumerFetcherManager.scala consumer/ConsumerFetcherThread.scala server/AbstractFetcherThread.scala
Date Sat, 17 Nov 2012 01:38:15 GMT
Author: jjkoshy
Date: Sat Nov 17 01:38:14 2012
New Revision: 1410665

URL: http://svn.apache.org/viewvc?rev=1410665&view=rev
Log:
Fix deadlock between leader-finder-thread and consumer-fetcher-thread during broker failure;
patched by Joel Koshy; reviewed by Jun Rao; KAFKA-618
pre-commit-status-crumb=5e65bf7a-f347-4600-b3ae-99eed1cd2a78

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala?rev=1410665&r1=1410664&r2=1410665&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
Sat Nov 17 01:38:14 2012
@@ -88,7 +88,9 @@ class ConsumerFetcherManager(private val
 
 
   override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread
= {
-    new ConsumerFetcherThread("ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId,
sourceBroker.id), config, sourceBroker, this)
+    new ConsumerFetcherThread(
+      "ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id),
+      config, sourceBroker, partitionMap, this)
   }
 
   def startConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster) {
@@ -106,29 +108,15 @@ class ConsumerFetcherManager(private val
   }
 
   def stopAllConnections() {
-    // first, clear noLeaderPartitionSet so that no more fetchers can be added to leader_finder_thread
     lock.lock()
+    // first, clear noLeaderPartitionSet so that no more fetchers can be added to leader_finder_thread
     noLeaderPartitionSet.clear()
-    lock.unlock()
-
-    // second, stop all existing fetchers
-    closeAllFetchers()
-
-    // finally clear partitionMap
-    lock.lock()
+    // second, clear partitionMap
     partitionMap = null
     lock.unlock()
-  }
 
-  def getPartitionTopicInfo(topicAndPartition: TopicAndPartition) : PartitionTopicInfo =
{
-    var pti :PartitionTopicInfo =null
-    lock.lock()
-    try {
-      pti = partitionMap(topicAndPartition)
-    } finally {
-      lock.unlock()
-    }
-    pti      
+    // third, stop all existing fetchers
+    closeAllFetchers()
   }
 
   def addPartitionsWithError(partitionList: Iterable[TopicAndPartition]) {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala?rev=1410665&r1=1410664&r2=1410665&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
Sat Nov 17 01:38:14 2012
@@ -27,6 +27,7 @@ import kafka.common.TopicAndPartition
 class ConsumerFetcherThread(name: String,
                             val config: ConsumerConfig,
                             sourceBroker: Broker,
+                            partitionMap: Map[TopicAndPartition, PartitionTopicInfo],
                             val consumerFetcherManager: ConsumerFetcherManager)
         extends AbstractFetcherThread(name = name, 
                                       clientId = config.clientId,
@@ -40,7 +41,7 @@ class ConsumerFetcherThread(name: String
 
   // process fetched data
   def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData:
FetchResponsePartitionData) {
-    val pti = consumerFetcherManager.getPartitionTopicInfo(topicAndPartition)
+    val pti = partitionMap(topicAndPartition)
     if (pti.getFetchOffset != fetchOffset)
       throw new RuntimeException("Offset doesn't match for topic %s partition: %d pti offset:
%d fetch offset: %d"
                                 .format(topicAndPartition.topic, topicAndPartition.partition,
pti.getFetchOffset, fetchOffset))
@@ -57,7 +58,7 @@ class ConsumerFetcherThread(name: String
     }
     val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(startTimestamp,
1)))
     val newOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
-    val pti = consumerFetcherManager.getPartitionTopicInfo(topicAndPartition)
+    val pti = partitionMap(topicAndPartition)
     pti.resetFetchOffset(newOffset)
     pti.resetConsumeOffset(newOffset)
     newOffset

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala?rev=1410665&r1=1410664&r2=1410665&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
Sat Nov 17 01:38:14 2012
@@ -34,7 +34,7 @@ import java.util.concurrent.TimeUnit
 /**
  *  Abstract class for fetching data from multiple partitions from the same broker.
  */
-abstract class  AbstractFetcherThread(name: String, clientId: String, sourceBroker: Broker,
socketTimeout: Int, socketBufferSize: Int,
+abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: Broker,
socketTimeout: Int, socketBufferSize: Int,
                                      fetchSize: Int, fetcherBrokerId: Int = -1, maxWait:
Int = 0, minBytes: Int = 1)
   extends ShutdownableThread(name) {
 



Mime
View raw message