kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1404857 - in /incubator/kafka/branches/0.8/core/src/main/scala/kafka: api/LeaderAndIsrResponse.scala api/StopReplicaResponse.scala server/KafkaApis.scala server/KafkaRequestHandler.scala
Date Fri, 02 Nov 2012 02:07:03 GMT
Author: junrao
Date: Fri Nov  2 02:07:03 2012
New Revision: 1404857

URL: http://svn.apache.org/viewvc?rev=1404857&view=rev
Log:
KafkaRequestHandler needs to handle exceptions; patched by Yang Ye; reviewed by Jun Rao; KAFKA-491

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala?rev=1404857&r1=1404856&r2=1404857&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
Fri Nov  2 02:07:03 2012
@@ -17,7 +17,6 @@
 
 package kafka.api
 
-import kafka.common.ErrorMapping
 import java.nio.ByteBuffer
 import kafka.api.ApiUtils._
 import collection.mutable.HashMap
@@ -27,7 +26,6 @@ import collection.Map
 object LeaderAndIsrResponse {
   def readFrom(buffer: ByteBuffer): LeaderAndIsrResponse = {
     val versionId = buffer.getShort
-    val errorCode = buffer.getShort
     val numEntries = buffer.getInt
     val responseMap = new HashMap[(String, Int), Short]()
     for (i<- 0 until numEntries){
@@ -36,17 +34,16 @@ object LeaderAndIsrResponse {
       val partitionErrorCode = buffer.getShort
       responseMap.put((topic, partition), partitionErrorCode)
     }
-    new LeaderAndIsrResponse(versionId, responseMap, errorCode)
+    new LeaderAndIsrResponse(versionId, responseMap)
   }
 }
 
 
 case class LeaderAndIsrResponse(versionId: Short,
-                                responseMap: Map[(String, Int), Short],
-                                errorCode: Short = ErrorMapping.NoError)
+                                responseMap: Map[(String, Int), Short])
         extends RequestOrResponse {
   def sizeInBytes(): Int ={
-    var size = 2 + 2 + 4
+    var size =  2 + 4
     for ((key, value) <- responseMap){
       size += 2 + key._1.length + 4 + 2
     }
@@ -55,7 +52,6 @@ case class LeaderAndIsrResponse(versionI
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
-    buffer.putShort(errorCode)
     buffer.putInt(responseMap.size)
     for ((key:(String, Int), value) <- responseMap){
       writeShortString(buffer, key._1)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaResponse.scala?rev=1404857&r1=1404856&r2=1404857&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaResponse.scala Fri
Nov  2 02:07:03 2012
@@ -19,15 +19,13 @@ package kafka.api
 
 import java.nio.ByteBuffer
 import collection.mutable.HashMap
-import collection.mutable.Map
-import kafka.common.ErrorMapping
+import collection.Map
 import kafka.api.ApiUtils._
 
 
 object StopReplicaResponse {
   def readFrom(buffer: ByteBuffer): StopReplicaResponse = {
     val versionId = buffer.getShort
-    val errorCode = buffer.getShort
     val numEntries = buffer.getInt
 
     val responseMap = new HashMap[(String, Int), Short]()
@@ -37,16 +35,15 @@ object StopReplicaResponse {
       val partitionErrorCode = buffer.getShort()
       responseMap.put((topic, partition), partitionErrorCode)
     }
-    new StopReplicaResponse(versionId, responseMap, errorCode)
+    new StopReplicaResponse(versionId, responseMap)
   }
 }
 
 
 case class StopReplicaResponse(val versionId: Short,
-                               val responseMap: Map[(String, Int), Short],
-                               val errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{
+                               val responseMap: Map[(String, Int), Short]) extends RequestOrResponse{
   def sizeInBytes(): Int ={
-    var size = 2 + 2 + 4
+    var size = 2 + 4
     for ((key, value) <- responseMap){
       size += (2 + key._1.length) + 4 + 2
     }
@@ -55,7 +52,6 @@ case class StopReplicaResponse(val versi
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
-    buffer.putShort(errorCode)
     buffer.putInt(responseMap.size)
     for ((key:(String, Int), value) <- responseMap){
       writeShortString(buffer, key._1)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1404857&r1=1404856&r2=1404857&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Fri Nov
 2 02:07:03 2012
@@ -52,16 +52,73 @@ class KafkaApis(val requestChannel: Requ
    * Top-level method that handles all requests and multiplexes to the right api
    */
   def handle(request: RequestChannel.Request) {
-    request.requestId match {
-      case RequestKeys.ProduceKey => handleProducerRequest(request)
-      case RequestKeys.FetchKey => handleFetchRequest(request)
-      case RequestKeys.OffsetsKey => handleOffsetRequest(request)
-      case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
-      case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
-      case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
-      case requestId => throw new KafkaException("No mapping found for handler id " +
requestId)
-    }
-    request.apiLocalCompleteTimeMs = SystemTime.milliseconds
+    try{
+      request.requestId match {
+        case RequestKeys.ProduceKey => handleProducerRequest(request)
+        case RequestKeys.FetchKey => handleFetchRequest(request)
+        case RequestKeys.OffsetsKey => handleOffsetRequest(request)
+        case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
+        case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
+        case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
+        case requestId => throw new KafkaException("No mapping found for handler id "
+ requestId)
+      }
+    } catch {
+      case e: Throwable =>
+        request.requestId match {
+          case RequestKeys.ProduceKey =>
+            val apiRequest = request.requestObj.asInstanceOf[ProducerRequest]
+            val producerResponseStatus = apiRequest.data.map {
+              case (topicAndPartition, data) =>
+                (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
-1l))
+            }
+            val errorResponse = ProducerResponse(apiRequest.versionId, apiRequest.correlationId,
producerResponseStatus)
+            requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+            error("error when handling request %s".format(apiRequest), e)
+          case RequestKeys.FetchKey =>
+            val apiRequest = request.requestObj.asInstanceOf[FetchRequest]
+            val fetchResponsePartitionData = apiRequest.requestInfo.map {
+              case (topicAndPartition, data) =>
+                (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
0, -1, null))
+            }
+            val errorResponse = FetchResponse(apiRequest.versionId, apiRequest.correlationId,
fetchResponsePartitionData)
+            requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(errorResponse)))
+            error("error when handling request %s".format(apiRequest), e)
+          case RequestKeys.OffsetsKey =>
+            val apiRequest = request.requestObj.asInstanceOf[OffsetRequest]
+            val partitionOffsetResponseMap = apiRequest.requestInfo.map {
+              case (topicAndPartition, partitionOffsetRequest) =>
+                (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
null))
+            }
+            val errorResponse = OffsetResponse(apiRequest.versionId, partitionOffsetResponseMap)
+            requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+            error("error when handling request %s".format(apiRequest), e)
+          case RequestKeys.MetadataKey =>
+            val apiRequest = request.requestObj.asInstanceOf[TopicMetadataRequest]
+            val topicMeatadata = apiRequest.topics.map {
+              topic => TopicMetadata(topic, Nil, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+            }
+            val errorResponse = TopicMetadataResponse(apiRequest.versionId, topicMeatadata)
+            requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+            error("error when handling request %s".format(apiRequest), e)
+          case RequestKeys.LeaderAndIsrKey =>
+            val apiRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest]
+            val responseMap = apiRequest.partitionStateInfos.map {
+              case (topicAndPartition, partitionAndState) => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+            }
+            val errorResponse = LeaderAndIsrResponse(apiRequest.versionId, responseMap)
+            requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+            error("error when handling request %s".format(apiRequest), e)
+          case RequestKeys.StopReplicaKey =>
+            val apiRequest = request.requestObj.asInstanceOf[StopReplicaRequest]
+            val responseMap = apiRequest.partitions.map {
+              case topicAndPartition => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+            }.toMap
+            error("error when handling request %s".format(apiRequest), e)
+            val errorResponse = StopReplicaResponse(apiRequest.versionId, responseMap)
+            requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+        }
+    } finally
+      request.apiLocalCompleteTimeMs = SystemTime.milliseconds
   }
 
   def handleLeaderAndIsrRequest(request: RequestChannel.Request) {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala?rev=1404857&r1=1404856&r2=1404857&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
Fri Nov  2 02:07:03 2012
@@ -29,15 +29,19 @@ class KafkaRequestHandler(id: Int, broke
   this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], "
 
   def run() { 
-    while(true) { 
-      val req = requestChannel.receiveRequest()
-      if(req eq RequestChannel.AllDone){
-        trace("receives shut down command, shut down".format(brokerId, id))
-        return
+    while(true) {
+      try {
+        val req = requestChannel.receiveRequest()
+        if(req eq RequestChannel.AllDone){
+          trace("receives shut down command, shut down".format(brokerId, id))
+          return
+        }
+        req.dequeueTimeMs = SystemTime.milliseconds
+        debug("handles request " + req)
+        apis.handle(req)
+      } catch {
+        case e: Throwable => error("exception when handling request", e)
       }
-      req.dequeueTimeMs = SystemTime.milliseconds
-      debug("handles request " + req)
-      apis.handle(req)
     }
   }
 



Mime
View raw message