kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1396116 - /incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
Date Tue, 09 Oct 2012 17:08:42 GMT
Author: junrao
Date: Tue Oct  9 17:08:42 2012
New Revision: 1396116

URL: http://svn.apache.org/viewvc?rev=1396116&view=rev
Log:
support changing host/port of a broker; patched by David Arthur; reviewed by Jun Rao; kafka-474

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala?rev=1396116&r1=1396115&r2=1396116&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
Tue Oct  9 17:08:42 2012
@@ -23,7 +23,7 @@ import kafka.cluster.Broker
 
 abstract class AbstractFetcherManager(protected val name: String, numFetchers: Int = 1) extends
Logging {
     // map of (source brokerid, fetcher Id per source broker) => fetcher
-  private val fetcherThreadMap = new mutable.HashMap[(Int, Int), AbstractFetcherThread]
+  private val fetcherThreadMap = new mutable.HashMap[(Broker, Int), AbstractFetcherThread]
   private val mapLock = new Object
   this.logIdent = "[" + name + "], "
 
@@ -37,7 +37,7 @@ abstract class AbstractFetcherManager(pr
   def addFetcher(topic: String, partitionId: Int, initialOffset: Long, sourceBroker: Broker)
{
     mapLock synchronized {
       var fetcherThread: AbstractFetcherThread = null
-      val key = (sourceBroker.id, getFetcherId(topic, partitionId))
+      val key = (sourceBroker, getFetcherId(topic, partitionId))
       fetcherThreadMap.get(key) match {
         case Some(f) => fetcherThread = f
         case None =>
@@ -64,15 +64,6 @@ abstract class AbstractFetcherManager(pr
     }
   }
 
-  def fetcherSourceBroker(topic: String, partitionId: Int): Option[Int] = {
-    mapLock synchronized {
-      for ( ((sourceBrokerId, _), fetcher) <- fetcherThreadMap)
-        if (fetcher.hasPartition(topic, partitionId))
-          return Some(sourceBrokerId)
-    }
-    None
-  }
-
   def closeAllFetchers() {
     mapLock synchronized {
       for ( (_, fetcher) <- fetcherThreadMap) {
@@ -81,4 +72,4 @@ abstract class AbstractFetcherManager(pr
       fetcherThreadMap.clear()
     }
   }
-}
\ No newline at end of file
+}



Mime
View raw message