kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-696 Fix toString() API for all requests to make logging easier to read; reviewed by Neha Narkhede, Jun Rao
Date Thu, 24 Jan 2013 19:52:08 GMT
Updated Branches:
  refs/heads/0.8 a15f1f2d8 -> 8d41620a4


KAFKA-696 Fix toString() API for all requests to make logging easier to read; reviewed by
Neha Narkhede, Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8d41620a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8d41620a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8d41620a

Branch: refs/heads/0.8
Commit: 8d41620a427a027d212c241ae105d09cd470e64f
Parents: a15f1f2
Author: Sriram Subramanian <sriram.sub@gmail.com>
Authored: Thu Jan 24 11:50:18 2013 -0800
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Thu Jan 24 11:51:29 2013 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/api/FetchRequest.scala   |   25 +++++-
 .../main/scala/kafka/api/LeaderAndIsrRequest.scala |   24 +++++
 core/src/main/scala/kafka/api/OffsetRequest.scala  |   24 ++++-
 .../src/main/scala/kafka/api/ProducerRequest.scala |   24 ++++-
 .../main/scala/kafka/api/RequestOrResponse.scala   |    8 +-
 .../main/scala/kafka/api/StopReplicaRequest.scala  |   27 +++++-
 .../scala/kafka/api/TopicMetadataRequest.scala     |   23 ++++-
 core/src/main/scala/kafka/cluster/Partition.scala  |    4 +-
 .../main/scala/kafka/network/RequestChannel.scala  |   11 +--
 core/src/main/scala/kafka/server/KafkaApis.scala   |   81 +--------------
 10 files changed, 156 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8d41620a/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index 1bfabb0..ac74931 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -21,9 +21,10 @@ import java.nio.ByteBuffer
 import kafka.utils.nonthreadsafe
 import kafka.api.ApiUtils._
 import scala.collection.immutable.Map
-import kafka.common.TopicAndPartition
+import kafka.common.{ErrorMapping, TopicAndPartition}
 import kafka.consumer.ConsumerConfig
 import java.util.concurrent.atomic.AtomicInteger
+import kafka.network.{RequestChannel}
 
 
 case class PartitionFetchInfo(offset: Long, fetchSize: Int)
@@ -137,6 +138,28 @@ case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentV
   def isFromLowLevelConsumer = replicaId == Request.DebuggingConsumerId
 
   def numPartitions = requestInfo.size
+
+  override def toString(): String = {
+    val fetchRequest = new StringBuilder
+    fetchRequest.append("Name: " + this.getClass.getSimpleName)
+    fetchRequest.append("; Version: " + versionId)
+    fetchRequest.append("; CorrelationId: " + correlationId)
+    fetchRequest.append("; ClientId: " + clientId)
+    fetchRequest.append("; ReplicaId: " + replicaId)
+    fetchRequest.append("; MaxWait: " + maxWait + " ms")
+    fetchRequest.append("; MinBytes: " + minBytes + " bytes")
+    fetchRequest.append("; RequestInfo: " + requestInfo.mkString(","))
+    fetchRequest.toString()
+  }
+
+  override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request):
Unit = {
+    val fetchResponsePartitionData = requestInfo.map {
+      case (topicAndPartition, data) =>
+        (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
-1, null))
+    }
+    val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData)
+    requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(errorResponse)))
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8d41620a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index 6955433..616f679 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -23,6 +23,9 @@ import kafka.utils._
 import kafka.api.ApiUtils._
 import kafka.cluster.Broker
 import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.network.{BoundedByteBufferSend, RequestChannel}
+import kafka.common.ErrorMapping
+import kafka.network.RequestChannel.Response
 
 
 object LeaderAndIsr {
@@ -157,4 +160,25 @@ case class LeaderAndIsrRequest (versionId: Short,
       size += broker.sizeInBytes /* broker info */
     size
   }
+
+  override def toString(): String = {
+    val leaderAndIsrRequest = new StringBuilder
+    leaderAndIsrRequest.append("Name: " + this.getClass.getSimpleName)
+    leaderAndIsrRequest.append("; Version: " + versionId)
+    leaderAndIsrRequest.append("; CorrelationId: " + correlationId)
+    leaderAndIsrRequest.append("; ClientId: " + clientId)
+    leaderAndIsrRequest.append("; AckTimeoutMs: " + ackTimeoutMs + " ms")
+    leaderAndIsrRequest.append("; ControllerEpoch: " + controllerEpoch)
+    leaderAndIsrRequest.append("; PartitionStateInfo: " + partitionStateInfos.mkString(","))
+    leaderAndIsrRequest.append("; Leaders: " + leaders.mkString(","))
+    leaderAndIsrRequest.toString()
+  }
+
+  override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request):
Unit = {
+    val responseMap = partitionStateInfos.map {
+      case (topicAndPartition, partitionAndState) => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+    }
+    val errorResponse = LeaderAndIsrResponse(correlationId, responseMap)
+    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/8d41620a/core/src/main/scala/kafka/api/OffsetRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala
index 6c522bc..6360a98 100644
--- a/core/src/main/scala/kafka/api/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetRequest.scala
@@ -18,8 +18,10 @@
 package kafka.api
 
 import java.nio.ByteBuffer
-import kafka.common.TopicAndPartition
+import kafka.common.{ErrorMapping, TopicAndPartition}
 import kafka.api.ApiUtils._
+import kafka.network.{BoundedByteBufferSend, RequestChannel}
+import kafka.network.RequestChannel.Response
 
 
 object OffsetRequest {
@@ -104,4 +106,24 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ
 
   def isFromOrdinaryClient = replicaId == Request.OrdinaryConsumerId
   def isFromDebuggingClient = replicaId == Request.DebuggingConsumerId
+
+  override def toString(): String = {
+    val offsetRequest = new StringBuilder
+    offsetRequest.append("Name: " + this.getClass.getSimpleName)
+    offsetRequest.append("; Version: " + versionId)
+    offsetRequest.append("; CorrelationId: " + correlationId)
+    offsetRequest.append("; ClientId: " + clientId)
+    offsetRequest.append("; RequestInfo: " + requestInfo.mkString(","))
+    offsetRequest.append("; ReplicaId: " + replicaId)
+    offsetRequest.toString()
+  }
+
+  override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request):
Unit = {
+    val partitionOffsetResponseMap = requestInfo.map {
+      case (topicAndPartition, partitionOffsetRequest) =>
+        (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
null))
+    }
+    val errorResponse = OffsetResponse(correlationId, partitionOffsetResponseMap)
+    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8d41620a/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index ffa96a6..72b2cba 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -20,8 +20,10 @@ package kafka.api
 import java.nio._
 import kafka.message._
 import scala.collection.Map
-import kafka.common.TopicAndPartition
 import kafka.api.ApiUtils._
+import kafka.common._
+import kafka.network.RequestChannel.Response
+import kafka.network.{RequestChannel, BoundedByteBufferSend}
 
 object ProducerRequest {
   val CurrentVersion = 0.shortValue
@@ -120,5 +122,25 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
 
   def numPartitions = data.size
 
+  override def toString(): String = {
+    val producerRequest = new StringBuilder
+    producerRequest.append("Name: " + this.getClass.getSimpleName)
+    producerRequest.append("; Version: " + versionId)
+    producerRequest.append("; CorrelationId: " + correlationId)
+    producerRequest.append("; ClientId: " + clientId)
+    producerRequest.append("; RequiredAcks: " + requiredAcks)
+    producerRequest.append("; AckTimeoutMs: " + ackTimeoutMs + " ms")
+    producerRequest.append("; TopicAndPartition: " + data.map(r => r._1 -> r._2.sizeInBytes).toMap.mkString(","))
+    producerRequest.toString()
+  }
+
+  override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request):
Unit = {
+    val producerResponseStatus = data.map {
+      case (topicAndPartition, data) =>
+        (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
-1l))
+    }
+    val errorResponse = ProducerResponse(correlationId, producerResponseStatus)
+    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8d41620a/core/src/main/scala/kafka/api/RequestOrResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/RequestOrResponse.scala b/core/src/main/scala/kafka/api/RequestOrResponse.scala
index 83ad42c..3175e1c 100644
--- a/core/src/main/scala/kafka/api/RequestOrResponse.scala
+++ b/core/src/main/scala/kafka/api/RequestOrResponse.scala
@@ -18,6 +18,8 @@ package kafka.api
  */
 
 import java.nio._
+import kafka.network.RequestChannel
+import kafka.utils.Logging
 
 object Request {
   val OrdinaryConsumerId: Int = -1
@@ -25,10 +27,12 @@ object Request {
 }
 
 
-private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None) {
+private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None) extends
Logging{
 
   def sizeInBytes: Int
   
   def writeTo(buffer: ByteBuffer): Unit
-  
+
+  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request):
Unit = {}
 }
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/8d41620a/core/src/main/scala/kafka/api/StopReplicaRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
index 9fe849b..0580636 100644
--- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala
+++ b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
@@ -20,8 +20,10 @@ package kafka.api
 
 import java.nio._
 import kafka.api.ApiUtils._
-import kafka.utils.Logging
-import kafka.network.InvalidRequestException
+import kafka.network.{BoundedByteBufferSend, RequestChannel, InvalidRequestException}
+import kafka.common.ErrorMapping
+import kafka.network.RequestChannel.Response
+import kafka.utils.{Logging}
 
 
 object StopReplicaRequest extends Logging {
@@ -93,4 +95,25 @@ case class StopReplicaRequest(versionId: Short,
     }
     size
   }
+
+  override def toString(): String = {
+    val stopReplicaRequest = new StringBuilder
+    stopReplicaRequest.append("Name: " + this.getClass.getSimpleName)
+    stopReplicaRequest.append("; Version: " + versionId)
+    stopReplicaRequest.append("; CorrelationId: " + correlationId)
+    stopReplicaRequest.append("; ClientId: " + clientId)
+    stopReplicaRequest.append("; AckTimeoutMs: " + ackTimeoutMs + " ms")
+    stopReplicaRequest.append("; DeletePartitions: " + deletePartitions)
+    stopReplicaRequest.append("; ControllerEpoch: " + controllerEpoch)
+    stopReplicaRequest.append("; Partitions: " + partitions.mkString(","))
+    stopReplicaRequest.toString()
+  }
+
+  override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request):
Unit = {
+    val responseMap = partitions.map {
+      case topicAndPartition => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+    }.toMap
+    val errorResponse = StopReplicaResponse(correlationId, responseMap)
+    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8d41620a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
index fe1170f..824f8f1 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
@@ -20,7 +20,10 @@ package kafka.api
 import java.nio.ByteBuffer
 import kafka.api.ApiUtils._
 import collection.mutable.ListBuffer
-import kafka.utils.Logging
+import kafka.network.{BoundedByteBufferSend, RequestChannel}
+import kafka.common.ErrorMapping
+import kafka.network.RequestChannel.Response
+import kafka.utils.{Logging}
 
 object TopicMetadataRequest extends Logging {
   val CurrentVersion = 0.shortValue
@@ -67,4 +70,22 @@ case class TopicMetadataRequest(val versionId: Short,
     4 + /* number of topics */
     topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */
   }
+
+  override def toString(): String = {
+    val topicMetadataRequest = new StringBuilder
+    topicMetadataRequest.append("Name: " + this.getClass.getSimpleName)
+    topicMetadataRequest.append("; Version: " + versionId)
+    topicMetadataRequest.append("; CorrelationId: " + correlationId)
+    topicMetadataRequest.append("; ClientId: " + clientId)
+    topicMetadataRequest.append("; Topics: " + topics.mkString(","))
+    topicMetadataRequest.toString()
+  }
+
+  override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request):
Unit = {
+    val topicMetadata = topics.map {
+      topic => TopicMetadata(topic, Nil, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+    }
+    val errorResponse = TopicMetadataResponse(topicMetadata, correlationId)
+    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8d41620a/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index ea5b5a0..71eb980 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -337,8 +337,8 @@ class Partition(val topic: String,
     partitionString.append("Topic: " + topic)
     partitionString.append("; Partition: " + partitionId)
     partitionString.append("; Leader: " + leaderReplicaIdOpt)
-    partitionString.append("; Assigned replicas: " + assignedReplicaMap.keys.mkString(","))
-    partitionString.append("; In Sync replicas: " + inSyncReplicas.map(_.brokerId).mkString(","))
+    partitionString.append("; AssignedReplicas: " + assignedReplicaMap.keys.mkString(","))
+    partitionString.append("; InSyncReplicas: " + inSyncReplicas.map(_.brokerId).mkString(","))
     partitionString.toString()
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8d41620a/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 848c877..5185dec 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -46,12 +46,7 @@ object RequestChannel extends Logging {
     val requestId = buffer.getShort()
     val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer)
     buffer.rewind()
-    buffer.getShort
-    val versionId = buffer.getShort
-    val correlationId = buffer.getInt
-    val clientId = ApiUtils.readShortString(buffer)
-    buffer.rewind()
-    trace("Received request v%d with correlation id %d from client %s: %s".format(versionId,
correlationId, clientId, requestObj))
+    trace("Received request : %s".format(requestObj))
 
     def updateRequestMetrics() {
       val endTimeMs = SystemTime.milliseconds
@@ -80,8 +75,8 @@ object RequestChannel extends Logging {
              m.responseSendTimeHist.update(responseSendTime)
              m.totalTimeHist.update(totalTime)
       }
-      trace("Completed request v%d with correlation id %d and client %s: %s, totalTime:%d,
queueTime:%d, localTime:%d, remoteTime:%d, sendTime:%d"
-        .format(versionId, correlationId, clientId, requestObj, totalTime, queueTime, apiLocalTime,
apiRemoteTime, responseSendTime))
+      trace("Completed request : %s, totalTime:%d, queueTime:%d, localTime:%d, remoteTime:%d,
sendTime:%d"
+        .format(requestObj, totalTime, queueTime, apiLocalTime, apiRemoteTime, responseSendTime))
     }
   }
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/8d41620a/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 60752fb..0a1a11a 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -54,6 +54,8 @@ class KafkaApis(val requestChannel: RequestChannel,
    */
   def handle(request: RequestChannel.Request) {
     try{
+      if(requestLogger.isTraceEnabled)
+        requestLogger.trace("Handling request: %s".format(request.requestObj))
       request.requestId match {
         case RequestKeys.ProduceKey => handleProducerRequest(request)
         case RequestKeys.FetchKey => handleFetchRequest(request)
@@ -65,68 +67,14 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     } catch {
       case e: Throwable =>
-        request.requestId match {
-          case RequestKeys.ProduceKey =>
-            val apiRequest = request.requestObj.asInstanceOf[ProducerRequest]
-            val producerResponseStatus = apiRequest.data.map {
-              case (topicAndPartition, data) =>
-                (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
-1L))
-            }
-            val errorResponse = ProducerResponse(apiRequest.correlationId, producerResponseStatus)
-            requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
-            error("error when handling request %s".format(apiRequest), e)
-          case RequestKeys.FetchKey =>
-            val apiRequest = request.requestObj.asInstanceOf[FetchRequest]
-            val fetchResponsePartitionData = apiRequest.requestInfo.map {
-              case (topicAndPartition, data) =>
-                (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
-1, null))
-            }
-            val errorResponse = FetchResponse(apiRequest.correlationId, fetchResponsePartitionData)
-            requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(errorResponse)))
-            error("error when handling request %s".format(apiRequest), e)
-          case RequestKeys.OffsetsKey =>
-            val apiRequest = request.requestObj.asInstanceOf[OffsetRequest]
-            val partitionOffsetResponseMap = apiRequest.requestInfo.map {
-              case (topicAndPartition, partitionOffsetRequest) =>
-                (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
null))
-            }
-            val errorResponse = OffsetResponse(apiRequest.correlationId, partitionOffsetResponseMap)
-            requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
-            error("error when handling request %s".format(apiRequest), e)
-          case RequestKeys.MetadataKey =>
-            val apiRequest = request.requestObj.asInstanceOf[TopicMetadataRequest]
-            val topicMeatadata = apiRequest.topics.map {
-              topic => TopicMetadata(topic, Nil, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
-            }
-            val errorResponse = TopicMetadataResponse(topicMeatadata, apiRequest.correlationId)
-            requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
-            error("error when handling request %s".format(apiRequest), e)
-          case RequestKeys.LeaderAndIsrKey =>
-            val apiRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest]
-            val responseMap = apiRequest.partitionStateInfos.map {
-              case (topicAndPartition, partitionAndState) => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
-            }
-            val errorResponse = LeaderAndIsrResponse(apiRequest.correlationId, responseMap)
-            requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
-            error("error when handling request %s".format(apiRequest), e)
-          case RequestKeys.StopReplicaKey =>
-            val apiRequest = request.requestObj.asInstanceOf[StopReplicaRequest]
-            val responseMap = apiRequest.partitions.map {
-              case topicAndPartition => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
-            }.toMap
-            error("error when handling request %s".format(apiRequest), e)
-            val errorResponse = StopReplicaResponse(apiRequest.correlationId, responseMap)
-            requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
-        }
+        request.requestObj.handleError(e, requestChannel, request)
+        error("error when handling request %s".format(request.requestObj), e)
     } finally
       request.apiLocalCompleteTimeMs = SystemTime.milliseconds
   }
 
   def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
     val leaderAndIsrRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest]
-    if(requestLogger.isTraceEnabled)
-      requestLogger.trace("Handling LeaderAndIsrRequest v%d with correlation id %d from client
%s: %s"
-            .format(leaderAndIsrRequest.versionId, leaderAndIsrRequest.correlationId, leaderAndIsrRequest.clientId,
leaderAndIsrRequest.toString))
     try {
       val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest)
       val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId,
response, error)
@@ -141,14 +89,9 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleStopReplicaRequest(request: RequestChannel.Request) {
     val stopReplicaRequest = request.requestObj.asInstanceOf[StopReplicaRequest]
-    if(requestLogger.isTraceEnabled)
-      requestLogger.trace("Handling StopReplicaRequest v%d with correlation id %d from client
%s: %s"
-            .format(stopReplicaRequest.versionId, stopReplicaRequest.correlationId, stopReplicaRequest.clientId,
stopReplicaRequest.toString))
-
     val (response, error) = replicaManager.stopReplicas(stopReplicaRequest)
     val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.correlationId, response.toMap,
error)
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse)))
-
     replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads()
   }
 
@@ -174,10 +117,6 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleProducerRequest(request: RequestChannel.Request) {
     val produceRequest = request.requestObj.asInstanceOf[ProducerRequest]
     val sTime = SystemTime.milliseconds
-    if(requestLogger.isTraceEnabled)
-      requestLogger.trace("Handling ProducerRequest v%d with correlation id %d from client
%s: %s"
-            .format(produceRequest.versionId, produceRequest.correlationId, produceRequest.clientId,
produceRequest.toString))
-
     val localProduceResults = appendToLocalLog(produceRequest)
     debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
 
@@ -272,10 +211,6 @@ class KafkaApis(val requestChannel: RequestChannel,
    */
   def handleFetchRequest(request: RequestChannel.Request) {
     val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
-    if(requestLogger.isTraceEnabled)
-      requestLogger.trace("Handling FetchRequest v%d with correlation id %d from client %s:
%s"
-            .format(fetchRequest.versionId, fetchRequest.correlationId, fetchRequest.clientId,
fetchRequest.toString))
-
     if(fetchRequest.isFromFollower) {
       maybeUpdatePartitionHw(fetchRequest)
       // after updating HW, some delayed produce requests may be unblocked
@@ -382,10 +317,6 @@ class KafkaApis(val requestChannel: RequestChannel,
    */
   def handleOffsetRequest(request: RequestChannel.Request) {
     val offsetRequest = request.requestObj.asInstanceOf[OffsetRequest]
-    if(requestLogger.isTraceEnabled)
-      requestLogger.trace("Handling OffsetRequest v%d with correlation id %d from client
%s: %s"
-            .format(offsetRequest.versionId, offsetRequest.correlationId, offsetRequest.clientId,
offsetRequest.toString))
-
     val responseMap = offsetRequest.requestInfo.map(elem => {
       val (topicAndPartition, partitionOffsetRequestInfo) = elem
       try {
@@ -422,10 +353,6 @@ class KafkaApis(val requestChannel: RequestChannel,
    */
   def handleTopicMetadataRequest(request: RequestChannel.Request) {
     val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest]
-    if(requestLogger.isTraceEnabled)
-      requestLogger.trace("Handling TopicMetadataRequest v%d with correlation id %d from
client %s: %s"
-            .format(metadataRequest.versionId, metadataRequest.correlationId, metadataRequest.clientId,
metadataRequest.toString))
-
     val topicsMetadata = new mutable.ArrayBuffer[TopicMetadata]()
     val config = replicaManager.config
     val uniqueTopics = {


Mime
View raw message