kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject kafka git commit: KAFKA-2351; Catch all exceptions in socket server's acceptor; reviewed by Grant Henke, Joel Koshy, Jiangjie Qin, Jun Rao
Date Wed, 26 Aug 2015 00:51:24 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 1d2ae89c5 -> 5d453ba6d


KAFKA-2351; Catch all exceptions in socket server's acceptor; reviewed by Grant Henke, Joel
Koshy, Jiangjie Qin, Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5d453ba6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5d453ba6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5d453ba6

Branch: refs/heads/trunk
Commit: 5d453ba6d4654efc93e6b2d2f069139afa571b94
Parents: 1d2ae89
Author: Mayuresh Gharat <gharatmayuresh15@gmail.com>
Authored: Tue Aug 25 17:50:16 2015 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Tue Aug 25 17:50:49 2015 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/network/SocketServer.scala | 60 ++++++++++++--------
 1 file changed, 36 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5d453ba6/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 649812d..97b84db 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -253,34 +253,46 @@ private[kafka] class Acceptor(val host: String,
     serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT);
     startupComplete()
     var currentProcessor = processorBeginIndex
-    while(isRunning) {
-      val ready = nioSelector.select(500)
-      if(ready > 0) {
-        val keys = nioSelector.selectedKeys()
-        val iter = keys.iterator()
-        while(iter.hasNext && isRunning) {
-          var key: SelectionKey = null
-          try {
-            key = iter.next
-            iter.remove()
-            if(key.isAcceptable)
-              accept(key, processors(currentProcessor))
-            else
-               throw new IllegalStateException("Unrecognized key state for acceptor thread.")
-
-            // round robin to the next processor thread
-            currentProcessor = (currentProcessor + 1) % processorEndIndex
-            if (currentProcessor < processorBeginIndex) currentProcessor = processorBeginIndex
-          } catch {
-            case e: Throwable => error("Error while accepting connection", e)
+    try {
+      while (isRunning) {
+        try {
+          val ready = nioSelector.select(500)
+          if (ready > 0) {
+            val keys = nioSelector.selectedKeys()
+            val iter = keys.iterator()
+            while (iter.hasNext && isRunning) {
+              var key: SelectionKey = null
+              try {
+                key = iter.next
+                iter.remove()
+                if (key.isAcceptable)
+                  accept(key, processors(currentProcessor))
+                else
+                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")
+
+                // round robin to the next processor thread
+                currentProcessor = (currentProcessor + 1) % processorEndIndex
+                if (currentProcessor < processorBeginIndex) currentProcessor = processorBeginIndex
+              } catch {
+                case e: Throwable => error("Error while accepting connection", e)
+              }
+            }
           }
         }
+        catch {
+          // We catch all the throwables to prevent the acceptor thread from exiting on exceptions
due
+          // to a select operation on a specific channel or a bad request. We don't want
the
+          // the broker to stop responding to requests from other clients in these scenarios.
+          case e: ControlThrowable => throw e
+          case e: Throwable => error("Error occurred", e)
+        }
       }
+    } finally {
+      debug("Closing server socket and selector.")
+      swallowError(serverChannel.close())
+      swallowError(nioSelector.close())
+      shutdownComplete()
     }
-    debug("Closing server socket and selector.")
-    swallowError(serverChannel.close())
-    swallowError(nioSelector.close())
-    shutdownComplete()
   }
 
   /*


Mime
View raw message