kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2150; move partitionMapCond.await into partitionMapLock; reviewed by Guozhang Wang
Date Tue, 12 May 2015 00:42:12 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 4328aa02c -> af5061923


KAFKA-2150; move partitionMapCond.await into partitionMapLock; reviewed by Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/af506192
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/af506192
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/af506192

Branch: refs/heads/trunk
Commit: af506192332f223393639bb390c38a13e0f2ca82
Parents: 4328aa0
Author: Sriharsha Chintalapani <schintalapani@hortonworks.com>
Authored: Mon May 11 17:41:54 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon May 11 17:41:54 2015 -0700

----------------------------------------------------------------------
 .../scala/kafka/server/AbstractFetcherThread.scala   | 15 ++++++++-------
 1 file changed, 8 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/af506192/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index a439046..83fc474 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -76,6 +76,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
   }
 
   override def doWork() {
+    var fetchRequest: FetchRequest = null
 
     inLock(partitionMapLock) {
       partitionMap.foreach {
@@ -84,16 +85,16 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
             fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition,
               partitionFetchState.offset, fetchSize)
       }
-    }
 
-    val fetchRequest = fetchRequestBuilder.build()
+      fetchRequest = fetchRequestBuilder.build()
+      if (fetchRequest.requestInfo.isEmpty) {
+        trace("There are no active partitions. Back off for %d ms before sending a fetch
request".format(fetchBackOffMs))
+        partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
+      }
+    }
 
-    if (!fetchRequest.requestInfo.isEmpty)
+    if(!fetchRequest.requestInfo.isEmpty)
       processFetchRequest(fetchRequest)
-    else {
-      trace("There are no active partitions. Back off for %d ms before sending a fetch request".format(fetchBackOffMs))
-      partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
-    }
   }
 
   private def processFetchRequest(fetchRequest: FetchRequest) {


Mime
View raw message