kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1231298 - in /incubator/kafka/branches/0.8/core/src/main/scala/kafka: network/RequestChannel.scala server/KafkaApis.scala server/KafkaRequestHandler.scala
Date Fri, 13 Jan 2012 21:15:29 GMT
Author: nehanarkhede
Date: Fri Jan 13 21:15:29 2012
New Revision: 1231298

URL: http://svn.apache.org/viewvc?rev=1231298&view=rev
Log:
Checking in some files left from the last checkin for KAFKA-202

Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala?rev=1231298&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala Fri
Jan 13 21:15:29 2012
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.util.ArrayList
+import java.util.concurrent._
+
+object RequestChannel { 
+  val AllDone = new Request(1, 2, null, 0)
+  case class Request(val processor: Int, requestKey: Any, request: Receive, start: Long)
+  case class Response(val processor: Int, requestKey: Any, response: Send, start: Long, ellapsed:
Long)
+}
+
+class RequestChannel(val numProcessors: Int, val queueSize: Int) { 
+  private var responseListeners: List[(Int) => Unit] = Nil
+  private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
+  private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
+  for(i <- 0 until numProcessors)
+    responseQueues(i) = new ArrayBlockingQueue[RequestChannel.Response](queueSize)
+    
+
+  /** Send a request to be handled, potentially blocking until there is room in the queue
for the request */
+  def sendRequest(request: RequestChannel.Request) {
+    requestQueue.put(request)
+  }
+  
+  /** Send a response back to the socket server to be sent over the network */ 
+  def sendResponse(response: RequestChannel.Response) {
+    responseQueues(response.processor).put(response)
+    for(onResponse <- responseListeners)
+      onResponse(response.processor)
+  }
+
+  /** Get the next request or block until there is one */
+  def receiveRequest(): RequestChannel.Request = 
+    requestQueue.take()
+
+  /** Get a response for the given processor if there is one */
+  def receiveResponse(processor: Int): RequestChannel.Response =
+    responseQueues(processor).poll()
+
+  def addResponseListener(onResponse: Int => Unit) { 
+    responseListeners ::= onResponse
+  }
+
+}

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1231298&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Fri Jan
13 21:15:29 2012
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import org.apache.log4j.Logger
+import kafka.log._
+import kafka.network._
+import kafka.message._
+import kafka.api._
+import kafka.common.ErrorMapping
+import kafka.utils.SystemTime
+import kafka.utils.Logging
+import java.io.IOException
+
+/**
+ * Logic to handle the various Kafka requests
+ */
+class KafkaApis(val logManager: LogManager) extends Logging {
+  
+  private val requestLogger = Logger.getLogger("kafka.request.logger")
+
+  def handle(receive: Receive): Option[Send] = { 
+    val apiId = receive.buffer.getShort() 
+    apiId match {
+        case RequestKeys.Produce => handleProducerRequest(receive)
+        case RequestKeys.Fetch => handleFetchRequest(receive)
+        case RequestKeys.MultiFetch => handleMultiFetchRequest(receive)
+        case RequestKeys.MultiProduce => handleMultiProducerRequest(receive)
+        case RequestKeys.Offsets => handleOffsetRequest(receive)
+        case _ => throw new IllegalStateException("No mapping found for handler id " +
apiId)
+    }
+  }
+
+  def handleProducerRequest(receive: Receive): Option[Send] = {
+    val sTime = SystemTime.milliseconds
+    val request = ProducerRequest.readFrom(receive.buffer)
+
+    if(requestLogger.isTraceEnabled)
+      requestLogger.trace("Producer request " + request.toString)
+    handleProducerRequest(request, "ProduceRequest")
+    debug("kafka produce time " + (SystemTime.milliseconds - sTime) + " ms")
+    None
+  }
+
+  def handleMultiProducerRequest(receive: Receive): Option[Send] = {
+    val request = MultiProducerRequest.readFrom(receive.buffer)
+    if(requestLogger.isTraceEnabled)
+      requestLogger.trace("Multiproducer request " + request.toString)
+    request.produces.map(handleProducerRequest(_, "MultiProducerRequest"))
+    None
+  }
+
+  private def handleProducerRequest(request: ProducerRequest, requestHandlerName: String)
= {
+    val partition = request.getTranslatedPartition(logManager.chooseRandomPartition)
+    try {
+      logManager.getOrCreateLog(request.topic, partition).append(request.messages)
+      trace(request.messages.sizeInBytes + " bytes written to logs.")
+    }
+    catch {
+      case e =>
+        error("Error processing " + requestHandlerName + " on " + request.topic + ":" + partition,
e)
+        e match {
+          case _: IOException => 
+            fatal("Halting due to unrecoverable I/O error while handling producer request:
" + e.getMessage, e)
+            System.exit(1)
+          case _ =>
+        }
+        throw e
+    }
+    None
+  }
+
+  def handleFetchRequest(request: Receive): Option[Send] = {
+    val fetchRequest = FetchRequest.readFrom(request.buffer)
+    if(requestLogger.isTraceEnabled)
+      requestLogger.trace("Fetch request " + fetchRequest.toString)
+    Some(readMessageSet(fetchRequest))
+  }
+  
+  def handleMultiFetchRequest(request: Receive): Option[Send] = {
+    val multiFetchRequest = MultiFetchRequest.readFrom(request.buffer)
+    if(requestLogger.isTraceEnabled)
+      requestLogger.trace("Multifetch request")
+    multiFetchRequest.fetches.foreach(req => requestLogger.trace(req.toString))
+    var responses = multiFetchRequest.fetches.map(fetch =>
+        readMessageSet(fetch)).toList
+    
+    Some(new MultiMessageSetSend(responses))
+  }
+
+  private def readMessageSet(fetchRequest: FetchRequest): MessageSetSend = {
+    var  response: MessageSetSend = null
+    try {
+      trace("Fetching log segment for topic = " + fetchRequest.topic + " and partition =
" + fetchRequest.partition)
+      val log = logManager.getLog(fetchRequest.topic, fetchRequest.partition)
+      if (log != null)
+        response = new MessageSetSend(log.read(fetchRequest.offset, fetchRequest.maxSize))
+      else
+        response = new MessageSetSend()
+    }
+    catch {
+      case e =>
+        error("error when processing request " + fetchRequest, e)
+        response=new MessageSetSend(MessageSet.Empty, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+    }
+    response
+  }
+
+  def handleOffsetRequest(request: Receive): Option[Send] = {
+    val offsetRequest = OffsetRequest.readFrom(request.buffer)
+    if(requestLogger.isTraceEnabled)
+      requestLogger.trace("Offset request " + offsetRequest.toString)
+    val offsets = logManager.getOffsets(offsetRequest)
+    val response = new OffsetArraySend(offsets)
+    Some(response)
+  }
+}

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala?rev=1231298&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
(added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
Fri Jan 13 21:15:29 2012
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent._
+import java.util.concurrent.atomic._
+import org.apache.log4j.Logger
+import kafka.network._
+import kafka.utils._
+
+/**
+ * Thread that answers kafka requests.
+ */
+class KafkaRequestHandler(val requestChannel: RequestChannel, val handle: (Receive) =>
Option[Send]) extends Runnable with Logging { 
+  
+  def run() { 
+    while(true) { 
+      val req = requestChannel.receiveRequest()
+      trace("Processor " + Thread.currentThread.getName + " got request " + req)
+      if(req == RequestChannel.AllDone)
+        return
+      handle(req.request) match { 
+        case Some(send) => { 
+          val resp = new RequestChannel.Response(processor = req.processor, 
+                                                 requestKey = req.requestKey, 
+						 response = send, 
+						 start = req.start, 
+						 ellapsed = -1)
+          requestChannel.sendResponse(resp)
+          trace("Processor " + Thread.currentThread.getName + " sent response " + resp)
+        }
+        case None =>
+      }
+    }
+  }
+
+  def shutdown() {
+    requestChannel.sendRequest(RequestChannel.AllDone)
+  }
+  
+}
+
+/**
+ * Pool of request handling threads.
+ */
+class KafkaRequestHandlerPool(val requestChannel: RequestChannel, val handler: (Receive)
=> Option[Send], numThreads: Int) { 
+  
+  val threads = new Array[Thread](numThreads)
+  val runnables = new Array[KafkaRequestHandler](numThreads)
+  for(i <- 0 until numThreads) { 
+    runnables(i) = new KafkaRequestHandler(requestChannel, handler)
+    threads(i) = new Thread(runnables(i), "kafka-request-handler-" + i)
+    threads(i).setDaemon(true)
+    threads(i).start()
+  }
+  
+  def shutdown() {
+    for(handler <- runnables)
+      handler.shutdown
+    for(thread <- threads)
+      thread.join
+  }
+  
+}



Mime
View raw message