kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject git commit: kafka-907; controller needs to close socket channel to brokers on exception ; patched by Jun Rao; reviewed by Neha Narkhede
Date Wed, 22 May 2013 23:23:35 GMT
Updated Branches:
  refs/heads/0.8 e93937c88 -> 32cd8994b


kafka-907; controller needs to close socket channel to brokers on exception ; patched by Jun
Rao; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/32cd8994
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/32cd8994
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/32cd8994

Branch: refs/heads/0.8
Commit: 32cd8994bf35b65a8053e0caea9a7710cc889df7
Parents: e93937c
Author: Jun Rao <junrao@gmail.com>
Authored: Wed May 22 16:23:21 2013 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed May 22 16:23:21 2013 -0700

----------------------------------------------------------------------
 .../controller/ControllerChannelManager.scala      |    6 ++++--
 1 files changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/32cd8994/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 0c41d1d..38b8674 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -122,6 +122,7 @@ class RequestSendThread(val controllerId: Int,
 
     try{
       lock synchronized {
+        channel.connect() // establish a socket connection if needed
         channel.send(request)
         receive = channel.receive()
         var response: RequestOrResponse = null
@@ -142,8 +143,9 @@ class RequestSendThread(val controllerId: Int,
       }
     } catch {
       case e =>
-        // log it and let it go. Let controller shut it down.
-        debug("Exception occurs", e)
+        warn("Controller %d fails to send a request to broker %d".format(controllerId, toBrokerId),
e)
+        // If there is any socket error (eg, socket timeout), the channel is no longer usable
and needs to be recreated.
+        channel.disconnect()
     }
   }
 }


Mime
View raw message