kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2353; SocketServer catch exception and close connection properly; reviewed by Gwen Shapira, Ismael Juma and Guozhang Wang
Date Fri, 24 Jul 2015 00:19:10 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 66c8647d8 -> 520a8135f


KAFKA-2353; SocketServer catch exception and close connection properly; reviewed by Gwen Shapira,
Ismael Juma and Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/520a8135
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/520a8135
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/520a8135

Branch: refs/heads/trunk
Commit: 520a8135f4820ee92e63427c5dbb2d5356e6a473
Parents: 66c8647
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Thu Jul 23 17:19:16 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Jul 23 17:19:16 2015 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/network/SocketServer.scala | 104 +++++++++++--------
 1 file changed, 60 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/520a8135/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 91319fa..dbe784b 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -37,6 +37,7 @@ import org.apache.kafka.common.protocol.types.SchemaException
 import org.apache.kafka.common.utils.{SystemTime, Time, Utils}
 
 import scala.collection._
+import scala.util.control.{NonFatal, ControlThrowable}
 
 /**
  * An NIO socket server. The threading model is
@@ -357,49 +358,57 @@ private[kafka] class Processor(val id: Int,
   override def run() {
     startupComplete()
     while(isRunning) {
-      // setup any new connections that have been queued up
-      configureNewConnections()
-      // register any new responses for writing
-      processNewResponses()
-
       try {
-        selector.poll(300)
-      } catch {
-        case e @ (_: IllegalStateException | _: IOException) => {
-          error("Closing processor %s due to illegal state or IO exception".format(id))
-          swallow(closeAll())
-          shutdownComplete()
-          throw e
-        }
-        case e: InvalidReceiveException =>
-          // Log warning and continue since Selector already closed the connection
-          warn("Connection was closed due to invalid receive. Processor will continue handling
other connections")
-      }
-      collection.JavaConversions.collectionAsScalaIterable(selector.completedReceives).foreach(
receive => {
+        // setup any new connections that have been queued up
+        configureNewConnections()
+        // register any new responses for writing
+        processNewResponses()
+
         try {
-          val req = RequestChannel.Request(processor = id, connectionId = receive.source,
buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = SecurityProtocol.PLAINTEXT)
-          requestChannel.sendRequest(req)
+          selector.poll(300)
         } catch {
-          case e @ (_: InvalidRequestException | _: SchemaException) => {
-            // note that even though we got an exception, we can assume that receive.source
is valid. Issues with constructing a valid receive object were handled earlier
-            error("Closing socket for " + receive.source + " because of error", e)
-            selector.close(receive.source)
+          case e @ (_: IllegalStateException | _: IOException) => {
+            error("Closing processor %s due to illegal state or IO exception".format(id))
+            swallow(closeAll())
+            shutdownComplete()
+            throw e
           }
+          case e: InvalidReceiveException =>
+            // Log warning and continue since Selector already closed the connection
+            warn("Connection was closed due to invalid receive. Processor will continue handling
other connections")
         }
-        selector.mute(receive.source)
-      })
-
-      collection.JavaConversions.iterableAsScalaIterable(selector.completedSends()).foreach(
send => {
-        val resp = inflightResponses.remove(send.destination()).get
-        resp.request.updateRequestMetrics()
-        selector.unmute(send.destination())
-      })
+        collection.JavaConversions.collectionAsScalaIterable(selector.completedReceives).foreach(receive
=> {
+          try {
+            val req = RequestChannel.Request(processor = id, connectionId = receive.source,
buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = SecurityProtocol.PLAINTEXT)
+            requestChannel.sendRequest(req)
+          } catch {
+            case e @ (_: InvalidRequestException | _: SchemaException) => {
+              // note that even though we got an exception, we can assume that receive.source
is valid. Issues with constructing a valid receive object were handled earlier
+              error("Closing socket for " + receive.source + " because of error", e)
+              selector.close(receive.source)
+            }
+          }
+          selector.mute(receive.source)
+        })
+
+        collection.JavaConversions.iterableAsScalaIterable(selector.completedSends()).foreach(send
=> {
+          val resp = inflightResponses.remove(send.destination()).get
+          resp.request.updateRequestMetrics()
+          selector.unmute(send.destination())
+        })
+      } catch {
+        // We catch all the throwables here to prevent the processor thread from exiting.
We do this because
+        // letting a processor exit might cause bigger impact on the broker. Usually the
exceptions thrown would
+        // be either associated with a specific socket channel or a bad request. We just
ignore the bad socket channel
+        // or request. This behavior might need to be reviewed if we see an exception that
need the entire broker to stop.
+        case e : ControlThrowable => throw e
+        case e : Throwable =>
+          error("Processor got uncaught exception.", e)
+      }
     }
 
-
-
     debug("Closing selector - processor " + id)
-    closeAll()
+    swallowError(closeAll())
     shutdownComplete()
   }
 
@@ -426,8 +435,6 @@ private[kafka] class Processor(val id: Int,
             selector.close(curr.request.connectionId)
           }
         }
-
-
       } finally {
         curr = requestChannel.receiveResponse(id)
       }
@@ -448,13 +455,22 @@ private[kafka] class Processor(val id: Int,
   private def configureNewConnections() {
     while(!newConnections.isEmpty) {
       val channel = newConnections.poll()
-      debug("Processor " + id + " listening to new connection from " + channel.socket.getRemoteSocketAddress)
-      val localHost = channel.socket().getLocalAddress.getHostAddress
-      val localPort = channel.socket().getLocalPort
-      val remoteHost = channel.socket().getInetAddress.getHostAddress
-      val remotePort = channel.socket().getPort
-      val connectionId = localHost + ":" + localPort + "-" + remoteHost + ":" + remotePort
-      selector.register(connectionId, channel)
+      try {
+        debug("Processor " + id + " listening to new connection from " + channel.socket.getRemoteSocketAddress)
+        val localHost = channel.socket().getLocalAddress.getHostAddress
+        val localPort = channel.socket().getLocalPort
+        val remoteHost = channel.socket().getInetAddress.getHostAddress
+        val remotePort = channel.socket().getPort
+        val connectionId = localHost + ":" + localPort + "-" + remoteHost + ":" + remotePort
+        selector.register(connectionId, channel)
+      } catch {
+        // We explicitly catch all non fatal exceptions and close the socket to avoid socket
leak. The other
+        // throwables will be caught in processor and logged as uncaught exception.
+        case NonFatal(e) =>
+          // need to close the channel here to avoid socket leak.
+          close(channel)
+          error("Processor " + id + " closed connection from " + channel.getRemoteAddress,
e)
+      }
     }
   }
 


Mime
View raw message