kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: kafka-1738; Partitions for topic not created after restart from forced shutdown; patched by Jun Rao; reviewed by Neha Narkhede
Date Fri, 07 Nov 2014 18:46:27 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk cbdcd5f10 -> f20e5108a


kafka-1738; Partitions for topic not created after restart from forced shutdown; patched by
Jun Rao; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f20e5108
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f20e5108
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f20e5108

Branch: refs/heads/trunk
Commit: f20e5108a271e1fc37ca6752773efa74e2fef67f
Parents: cbdcd5f
Author: Jun Rao <junrao@gmail.com>
Authored: Fri Nov 7 10:46:21 2014 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Fri Nov 7 10:46:21 2014 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/controller/ControllerChannelManager.scala | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f20e5108/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index ecbfa0f..eb492f0 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -130,10 +130,11 @@ class RequestSendThread(val controllerId: Int,
           // removeBroker which will invoke shutdown() on this thread. At that point, we
will stop retrying.
           try {
             channel.send(request)
+            receive = channel.receive()
             isSendSuccessful = true
           } catch {
             case e: Throwable => // if the send was not successful, reconnect to broker
and resend the message
-              error(("Controller %d epoch %d failed to send request %s to broker %s. " +
+              warn(("Controller %d epoch %d fails to send request %s to broker %s. " +
                 "Reconnecting to broker.").format(controllerId, controllerContext.epoch,
                 request.toString, toBroker.toString()), e)
               channel.disconnect()
@@ -143,7 +144,6 @@ class RequestSendThread(val controllerId: Int,
               Utils.swallow(Thread.sleep(300))
           }
         }
-        receive = channel.receive()
         var response: RequestOrResponse = null
         request.requestId.get match {
           case RequestKeys.LeaderAndIsrKey =>
@@ -162,7 +162,7 @@ class RequestSendThread(val controllerId: Int,
       }
     } catch {
       case e: Throwable =>
-        warn("Controller %d fails to send a request to broker %s".format(controllerId, toBroker.toString()),
e)
+        error("Controller %d fails to send a request to broker %s".format(controllerId, toBroker.toString()),
e)
         // If there is any socket error (eg, socket timeout), the channel is no longer usable
and needs to be recreated.
         channel.disconnect()
     }


Mime
View raw message