kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [2/37] git commit: correlationId is not set in FetchRequest in AbstractFetcherThread; patched by Jun Rao; reviewed by Neha Narkhede and Swapnil Ghike; kafka-738
Date Mon, 04 Mar 2013 04:22:03 GMT
correlationId is not set in FetchRequest in AbstractFetcherThread; patched by Jun Rao; reviewed
by Neha Narkhede and Swapnil Ghike; kafka-738


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b3a4fe9c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b3a4fe9c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b3a4fe9c

Branch: refs/heads/trunk
Commit: b3a4fe9cedca778a95b7f22054cb8f8ef6cf38c7
Parents: 1fb3e8c
Author: Jun Rao <junrao@gmail.com>
Authored: Tue Jan 29 17:00:49 2013 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Tue Jan 29 17:00:49 2013 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/api/FetchRequest.scala   |    6 +++++-
 .../scala/kafka/server/AbstractFetcherThread.scala |   11 +++++------
 2 files changed, 10 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b3a4fe9c/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index ac74931..19c961e 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -201,5 +201,9 @@ class FetchRequestBuilder() {
     this
   }
 
-  def build() = FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId,
maxWait, minBytes, requestMap.toMap)
+  def build() = {
+    val fetchRequest = FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId,
maxWait, minBytes, requestMap.toMap)
+    requestMap.clear()
+    fetchRequest
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b3a4fe9c/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 0b286f0..1ccf578 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -46,6 +46,11 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
   private val metricId = new ClientIdAndBroker(clientId, brokerInfo)
   val fetcherStats = new FetcherStats(metricId)
   val fetcherLagStats = new FetcherLagStats(metricId)
+  val fetchRequestBuilder = new FetchRequestBuilder().
+          clientId(clientId).
+          replicaId(fetcherBrokerId).
+          maxWait(maxWait).
+          minBytes(minBytes)
 
   /* callbacks to be defined in subclass */
 
@@ -65,12 +70,6 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
   }
 
   override def doWork() {
-    val fetchRequestBuilder = new FetchRequestBuilder().
-            clientId(clientId).
-            replicaId(fetcherBrokerId).
-            maxWait(maxWait).
-            minBytes(minBytes)
-
     partitionMapLock.lock()
     try {
       while (partitionMap.isEmpty)


Mime
View raw message