kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject [1/3] git commit: KAFKA-992 follow up: Fix the zookeeper de-registration issue for controller and consumer; reviewed by Neha Narkhede
Date Fri, 09 Aug 2013 22:51:35 GMT
Updated Branches:
  refs/heads/0.8 e7a546b67 -> 1db824ed2


KAFKA-992 follow up: Fix the zookeeper de-registration issue for controller and consumer;
reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/81c49bbd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/81c49bbd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/81c49bbd

Branch: refs/heads/0.8
Commit: 81c49bbdae5e490f9d5dc7b042ee60e617fbb22b
Parents: e7a546b
Author: Guozhang Wang <guwang@linkedin.com>
Authored: Thu Aug 8 22:06:03 2013 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Thu Aug 8 22:06:12 2013 -0700

----------------------------------------------------------------------
 .../consumer/ZookeeperConsumerConnector.scala   | 30 +++++-
 .../kafka/controller/KafkaController.scala      |  3 +-
 .../kafka/server/ZookeeperLeaderElector.scala   | 97 ++++++++++++++++----
 core/src/main/scala/kafka/utils/ZkUtils.scala   | 22 ++++-
 .../unit/kafka/server/LeaderElectionTest.scala  |  2 +-
 5 files changed, 126 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/81c49bbd/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 0ca2850..17977e7 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -213,13 +213,35 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   // this API is used by unit tests only
   def getTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]] = topicRegistry
 
-  private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount:
TopicCount) = {
+  private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount:
TopicCount) {
     info("begin registering consumer " + consumerIdString + " in ZK")
+    val timestamp = SystemTime.milliseconds.toString
     val consumerRegistrationInfo =
       Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "subscription"
-> topicCount.dbString), valueInQuotes = false)
-                             ++ Utils.mapToJsonFields(Map("pattern" -> topicCount.pattern),
valueInQuotes = true))
-    createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString,
consumerRegistrationInfo)
-    info("end registering consumer " + consumerIdString + " in ZK")
+                             ++ Utils.mapToJsonFields(Map("pattern" -> topicCount.pattern,
"timestamp" -> timestamp), valueInQuotes = true))
+
+    while (true) {
+      try {
+        createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString,
consumerRegistrationInfo)
+
+        info("end registering consumer " + consumerIdString + " in ZK")
+        return
+      } catch {
+        case e: ZkNodeExistsException => {
+          // An ephemeral node may still exist even after its corresponding session has expired
+          // due to a Zookeeper bug, in this case we need to retry writing until the previous
node is deleted
+          // and hence the write succeeds without ZkNodeExistsException
+          ZkUtils.readDataMaybeNull(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString)._1
match {
+            case Some(consumerZKString) => {
+              info("I wrote this conflicted ephemeral node a while back in a different session,
"
+                + "hence I will backoff for this node to be deleted by Zookeeper after session
timeout and retry")
+              Thread.sleep(config.zkSessionTimeoutMs)
+            }
+            case None => // the node disappeared; retry creating the ephemeral node immediately
+          }
+        }
+      }
+    }
   }
 
   private def sendShutdownToAllQueues() = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/81c49bbd/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index c87caab..800f900 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -37,6 +37,7 @@ import scala.Some
 import kafka.common.TopicAndPartition
 
 class ControllerContext(val zkClient: ZkClient,
+                        val zkSessionTimeout: Int,
                         var controllerChannelManager: ControllerChannelManager = null,
                         val controllerLock: Object = new Object,
                         var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty,
@@ -83,7 +84,7 @@ object KafkaController {
 class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with
KafkaMetricsGroup with KafkaControllerMBean {
   this.logIdent = "[Controller " + config.brokerId + "]: "
   private var isRunning = true
-  val controllerContext = new ControllerContext(zkClient)
+  val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs)
   private val partitionStateMachine = new PartitionStateMachine(this)
   private val replicaStateMachine = new ReplicaStateMachine(this)
   private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath,
onControllerFailover,

http://git-wip-us.apache.org/repos/asf/kafka/blob/81c49bbd/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index 574922b..6016bd5 100644
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -17,10 +17,11 @@
 package kafka.server
 
 import kafka.utils.ZkUtils._
-import kafka.utils.Logging
+import kafka.utils.{Json, Utils, SystemTime, Logging}
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import org.I0Itec.zkclient.IZkDataListener
 import kafka.controller.ControllerContext
+import kafka.common.KafkaException
 
 /**
  * This class handles zookeeper based leader election based on an ephemeral path. The election
module does not handle
@@ -46,23 +47,62 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
 
   def elect: Boolean = {
     controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
-    try {
-      createEphemeralPathExpectConflict(controllerContext.zkClient, electionPath, brokerId.toString)
-      info(brokerId + " successfully elected as leader")
-      leaderId = brokerId
-      onBecomingLeader()
-    } catch {
-      case e: ZkNodeExistsException =>
-        // If someone else has written the path, then
-        val data: String = controllerContext.zkClient.readData(electionPath, true)
-        debug("Broker %d was elected as leader instead of broker %d".format(data.toInt, brokerId))
-        if (data != null) {
-          leaderId = data.toInt
-        }
-      case e2 =>
-        error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
-        resign()
-    }
+    val timestamp = SystemTime.milliseconds.toString
+    val electString =
+      Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "brokerid"
-> brokerId.toString), valueInQuotes = false)
+        ++ Utils.mapToJsonFields(Map("timestamp" -> timestamp), valueInQuotes = true))
+
+    var electNotDone = true
+    do {
+      electNotDone = false
+      try {
+        createEphemeralPathExpectConflict(controllerContext.zkClient, electionPath, electString)
+
+        info(brokerId + " successfully elected as leader")
+        leaderId = brokerId
+        onBecomingLeader()
+      } catch {
+        case e: ZkNodeExistsException =>
+          readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match {
+          // If someone else has written the path, then read the broker id
+            case Some(controllerString) =>
+              try {
+                Json.parseFull(controllerString) match {
+                  case Some(m) =>
+                    val controllerInfo = m.asInstanceOf[Map[String, Any]]
+                    leaderId = controllerInfo.get("brokerid").get.asInstanceOf[Int]
+                    if (leaderId != brokerId) {
+                      info("Broker %d was elected as leader instead of broker %d".format(leaderId,
brokerId))
+                    } else {
+                      info("I wrote this conflicted ephemeral node a while back in a different
session, "
+                        + "hence I will retry")
+                      electNotDone = true
+                      Thread.sleep(controllerContext.zkSessionTimeout)
+                    }
+                  case None =>
+                    error("Error while reading leader info %s on broker %d".format(controllerString,
brokerId))
+                    resign()
+                }
+              } catch {
+                case t =>
+                  // It may be due to an incompatible controller register version
+                  info("Failed to parse the controller info as json. " +
+                    "Probably this controller is still using the old format [%s] of storing
the broker id in the zookeeper path".format(controllerString))
+                  try {
+                    leaderId = controllerString.toInt
+                    info("Broker %d was elected as leader instead of broker %d".format(leaderId,
brokerId))
+                  } catch {
+                    case t => throw new KafkaException("Failed to parse the leader info
[%s] from zookeeper. This is neither the new or the old format.", t)
+                  }
+              }
+          }
+          // If the node disappear, retry immediately
+        case e2 =>
+          error("Error while electing or becoming leader on broker %d".format(brokerId),
e2)
+          resign()
+      }
+    } while (electNotDone)
+
     amILeader
   }
 
@@ -88,8 +128,25 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
     @throws(classOf[Exception])
     def handleDataChange(dataPath: String, data: Object) {
       controllerContext.controllerLock synchronized {
-        leaderId = data.toString.toInt
-        info("New leader is %d".format(leaderId))
+        try {
+          Json.parseFull(data.toString) match {
+            case Some(m) =>
+              val controllerInfo = m.asInstanceOf[Map[String, Any]]
+              leaderId = controllerInfo.get("brokerid").get.asInstanceOf[Int]
+              info("New leader is %d".format(leaderId))
+            case None =>
+              error("Error while reading the leader info %s".format(data.toString))
+          }
+        } catch {
+          case t =>
+            // It may be due to an incompatible controller register version
+            try {
+              leaderId = data.toString.toInt
+              info("New leader is %d".format(leaderId))
+            } catch {
+              case t => throw new KafkaException("Failed to parse the leader info from
zookeeper: " + data, t)
+            }
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/81c49bbd/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 0072a1a..8440d94 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -54,7 +54,25 @@ object ZkUtils extends Logging {
 
   def getController(zkClient: ZkClient): Int= {
     readDataMaybeNull(zkClient, ControllerPath)._1 match {
-      case Some(controller) => controller.toInt
+      case Some(controller) =>
+        try {
+          Json.parseFull(controller) match {
+            case Some(m) =>
+              val controllerInfo = m.asInstanceOf[Map[String, Any]]
+              controllerInfo.get("brokerid").get.asInstanceOf[Int]
+            case None => throw new KafkaException("Failed to parse the controller info
json [%s] from zookeeper.".format(controller))
+          }
+        } catch {
+          case t =>
+            // It may be due to an incompatible controller register version
+            info("Failed to parse the controller info as json. " +
+              "Probably this controller is still using the old format [%s] of storing the
broker id in the zookeeper path".format(controller))
+            try {
+              controller.toInt
+            } catch {
+              case t => throw new KafkaException("Failed to parse the controller info
[%s] from zookeeper. This is neither the new or the old format.", t)
+            }
+        }
       case None => throw new KafkaException("Controller doesn't exist")
     }
   }
@@ -204,7 +222,7 @@ object ZkUtils extends Logging {
             case Some(brokerZKString) => {
               val broker = Broker.createBroker(id, brokerZKString)
               if (broker.host == host && broker.port == port) {
-                info("I wrote this conflicted ephemeral node [%s] a while back in a different
session, ".format(brokerZKString)
+                info("I wrote this conflicted ephemeral node a while back in a different
session, "
                   + "hence I will backoff for this node to be deleted by Zookeeper after
session timeout and retry")
                 Thread.sleep(timeout)
               } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/81c49bbd/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index c4328f0..70e4b51 100644
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -124,7 +124,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness
{
     val controllerId = 2
     val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(controllerId, TestUtils.choosePort()))
     val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.config.port))
-    val controllerContext = new ControllerContext(zkClient)
+    val controllerContext = new ControllerContext(zkClient, 6000)
     controllerContext.liveBrokers = brokers.toSet
     val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig)
     controllerChannelManager.startup()


Mime
View raw message