kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-992 Double Check on Broker Registration to Avoid False NodeExist Exception; reviewed by Neha Narkhede and Swapnil Ghike
Date Mon, 05 Aug 2013 17:06:04 GMT
Updated Branches:
  refs/heads/0.8 81a9f6a68 -> f3f8fa5c8


KAFKA-992 Double Check on Broker Registration to Avoid False NodeExist Exception; reviewed
by Neha Narkhede and Swapnil Ghike


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f3f8fa5c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f3f8fa5c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f3f8fa5c

Branch: refs/heads/0.8
Commit: f3f8fa5c8c17ade2205ef0f80d9092cae640327d
Parents: 81a9f6a
Author: Guozhang Wang <guwang@linkedin.com>
Authored: Mon Aug 5 10:05:44 2013 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Mon Aug 5 10:05:55 2013 -0700

----------------------------------------------------------------------
 .../scala/kafka/server/KafkaZooKeeper.scala     |  2 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   | 40 ++++++++++++++++----
 2 files changed, 33 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f3f8fa5c/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
index 0e6c656..553640f 100644
--- a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
+++ b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
@@ -48,7 +48,7 @@ class KafkaZooKeeper(config: KafkaConfig) extends Logging {
       else
         config.hostName 
     val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt
-    ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, config.port, jmxPort)
+    ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, config.port, config.zkSessionTimeoutMs,
jmxPort)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/f3f8fa5c/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index d53d511..0072a1a 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -181,19 +181,43 @@ object ZkUtils extends Logging {
     replicas.contains(brokerId.toString)
   }
 
-  def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, jmxPort: Int)
{
+  def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, timeout: Int,
jmxPort: Int) {
     val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
+    val timestamp = "\"" + SystemTime.milliseconds.toString + "\""
     val brokerInfo =
       Utils.mergeJsonFields(Utils.mapToJsonFields(Map("host" -> host), valueInQuotes =
true) ++
-                             Utils.mapToJsonFields(Map("version" -> 1.toString, "jmx_port"
-> jmxPort.toString, "port" -> port.toString),
+                             Utils.mapToJsonFields(Map("version" -> 1.toString, "jmx_port"
-> jmxPort.toString, "port" -> port.toString, "timestamp" -> timestamp),
                                                    valueInQuotes = false))
-    try {
-      createEphemeralPathExpectConflict(zkClient, brokerIdPath, brokerInfo)
-    } catch {
-      case e: ZkNodeExistsException =>
-        throw new RuntimeException("A broker is already registered on the path " + brokerIdPath
+ ". This probably " + "indicates that you either have configured a brokerid that is already
in use, or " + "else you have shutdown this broker and restarted it faster than the zookeeper
" + "timeout so it appears to be re-registering.")
+
+    while (true) {
+      try {
+        createEphemeralPathExpectConflict(zkClient, brokerIdPath, brokerInfo)
+
+        info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath,
host, port))
+        return
+      } catch {
+        case e: ZkNodeExistsException => {
+          // An ephemeral node may still exist even after its corresponding session has expired
+          // due to a Zookeeper bug, in this case we need to retry writing until the previous
node is deleted
+          // and hence the write succeeds without ZkNodeExistsException
+          ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + id)._1 match
{
+            case Some(brokerZKString) => {
+              val broker = Broker.createBroker(id, brokerZKString)
+              if (broker.host == host && broker.port == port) {
+                info("I wrote this conflicted ephemeral node [%s] a while back in a different
session, ".format(brokerZKString)
+                  + "hence I will backoff for this node to be deleted by Zookeeper after
session timeout and retry")
+                Thread.sleep(timeout)
+              } else {
+                // otherwise, throw the runtime exception
+                throw new RuntimeException("Another broker [%s:%s] other than the current
broker [%s:%s] is already registered on the path %s."
+                  .format(broker.host, broker.port, host, port, brokerIdPath))
+              }
+            }
+            case None => // the node disappeared; retry creating the ephemeral node immediately
+          }
+        }
+      }
     }
-    info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath, host,
port))
   }
 
   def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String
= {


Mime
View raw message