kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-1387: Kafka getting stuck creating ephemeral node it has already created when two zookeeper sessions are established in a very short period of time
Date Thu, 24 Sep 2015 17:14:30 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 54dd6e3ad -> 1daf6ac51


KAFKA-1387: Kafka getting stuck creating ephemeral node it has already created when two zookeeper
sessions are established in a very short period of time

This is a patch to get around the problem discussed in the KAFKA-1387 jira. The tests are
not passing in my box when I run them all, but they do pass when I run them individually,
which indicates that there is something leaking from a test to the next. I still need to work
this out and also work on further testing this. I wanted to open this PR now so that it can
start getting reviewed.

Author: flavio junqueira <fpj@apache.org>
Author: fpj <fpj@apache.org>
Author: Flavio Junqueira <fpj@apache.org>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Guozhang Wang <wangguoz@gmail.com>,
Jun Rao <junrao@gmail.com>

Closes #178 from fpj/1387


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1daf6ac5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1daf6ac5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1daf6ac5

Branch: refs/heads/trunk
Commit: 1daf6ac510602ee27210f069bbe7415d4610fbea
Parents: 54dd6e3
Author: flavio junqueira <fpj@apache.org>
Authored: Thu Sep 24 10:14:23 2015 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Thu Sep 24 10:14:23 2015 -0700

----------------------------------------------------------------------
 .../consumer/ZookeeperConsumerConnector.scala   |  15 +-
 .../kafka/controller/KafkaController.scala      |   7 +-
 .../scala/kafka/server/KafkaHealthcheck.scala   |  12 +-
 .../main/scala/kafka/server/KafkaServer.scala   |  17 +-
 .../kafka/server/ZookeeperLeaderElector.scala   |  11 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   | 232 ++++++++++++++-----
 .../test/scala/unit/kafka/admin/AdminTest.scala |   6 +-
 .../unit/kafka/admin/TopicCommandTest.scala     |   4 +-
 .../unit/kafka/server/LeaderElectionTest.scala  |   2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |   6 +-
 .../scala/unit/kafka/zk/ZKEphemeralTest.scala   |  98 ++++++++
 .../unit/kafka/zk/ZooKeeperTestHarness.scala    |   7 +-
 12 files changed, 326 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1daf6ac5/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index e42d104..2027ec8 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -36,13 +36,12 @@ import kafka.utils.CoreUtils.inLock
 import kafka.utils.ZkUtils._
 import kafka.utils._
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
-import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient}
+import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient,
ZkConnection}
 import org.apache.zookeeper.Watcher.Event.KeeperState
 
 import scala.collection._
 import scala.collection.JavaConversions._
 
-
 /**
  * This class handles the consumers interaction with zookeeper
  *
@@ -90,6 +89,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   private val rebalanceLock = new Object
   private var fetcher: Option[ConsumerFetcherManager] = None
   private var zkClient: ZkClient = null
+  private var zkConnection : ZkConnection = null
   private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
   private val checkpointedZkOffsets = new Pool[TopicAndPartition, Long]
   private val topicThreadIdAndQueues = new Pool[(String, ConsumerThreadId), BlockingQueue[FetchedDataChunk]]
@@ -178,7 +178,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
   private def connectZk() {
     info("Connecting to zookeeper instance at " + config.zkConnect)
-    zkClient = ZkUtils.createZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
+    val (client, connection) = ZkUtils.createZkClientAndConnection(config.zkConnect, config.zkSessionTimeoutMs,
config.zkConnectionTimeoutMs)
+    zkClient = client
+    zkConnection = connection
   }
 
   // Blocks until the offset manager is located and a channel is established to it.
@@ -261,9 +263,12 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     val timestamp = SystemTime.milliseconds.toString
     val consumerRegistrationInfo = Json.encode(Map("version" -> 1, "subscription" ->
topicCount.getTopicCountMap, "pattern" -> topicCount.pattern,
                                                   "timestamp" -> timestamp))
+    val zkWatchedEphemeral = new ZKCheckedEphemeral(dirs.
+                                                    consumerRegistryDir + "/" + consumerIdString,

+                                                    consumerRegistrationInfo,
+                                                    zkConnection.getZookeeper)
+    zkWatchedEphemeral.create()
 
-    createEphemeralPathExpectConflictHandleZKBug(zkClient, dirs.consumerRegistryDir + "/"
+ consumerIdString, consumerRegistrationInfo, null,
-                                                 (consumerZKString, consumer) => true,
config.zkSessionTimeoutMs)
     info("end registering consumer " + consumerIdString + " in ZK")
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1daf6ac5/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 54a31c6..a7b44ca 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -37,13 +37,14 @@ import kafka.utils.CoreUtils._
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.utils.Time
-import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient}
+import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient,
ZkConnection}
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
 import java.util.concurrent.locks.ReentrantLock
 import kafka.server._
 import kafka.common.TopicAndPartition
 
 class ControllerContext(val zkClient: ZkClient,
+                        val zkConnection: ZkConnection,
                         val zkSessionTimeout: Int) {
   var controllerChannelManager: ControllerChannelManager = null
   val controllerLock: ReentrantLock = new ReentrantLock()
@@ -153,11 +154,11 @@ object KafkaController extends Logging {
   }
 }
 
-class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerState: BrokerState,
time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with
KafkaMetricsGroup {
+class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection: ZkConnection,
val brokerState: BrokerState, time: Time, metrics: Metrics, threadNamePrefix: Option[String]
= None) extends Logging with KafkaMetricsGroup {
   this.logIdent = "[Controller " + config.brokerId + "]: "
   private var isRunning = true
   private val stateChangeLogger = KafkaController.stateChangeLogger
-  val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs)
+  val controllerContext = new ControllerContext(zkClient, zkConnection, config.zkSessionTimeoutMs)
   val partitionStateMachine = new PartitionStateMachine(this)
   val replicaStateMachine = new ReplicaStateMachine(this)
   private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath,
onControllerFailover,

http://git-wip-us.apache.org/repos/asf/kafka/blob/1daf6ac5/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index e6e270b..16760d4 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -21,7 +21,7 @@ import kafka.cluster.EndPoint
 import kafka.utils._
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.zookeeper.Watcher.Event.KeeperState
-import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
+import org.I0Itec.zkclient.{IZkStateListener, ZkClient, ZkConnection}
 import java.net.InetAddress
 
 
@@ -35,8 +35,8 @@ import java.net.InetAddress
  */
 class KafkaHealthcheck(private val brokerId: Int,
                        private val advertisedEndpoints: Map[SecurityProtocol, EndPoint],
-                       private val zkSessionTimeoutMs: Int,
-                       private val zkClient: ZkClient) extends Logging {
+                       private val zkClient: ZkClient,
+                       private val zkConnection: ZkConnection) extends Logging {
 
   val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId
   val sessionExpireListener = new SessionExpireListener
@@ -62,7 +62,7 @@ class KafkaHealthcheck(private val brokerId: Int,
     // only PLAINTEXT is supported as default
     // if the broker doesn't listen on PLAINTEXT protocol, an empty endpoint will be registered
and older clients will be unable to connect
     val plaintextEndpoint = updatedEndpoints.getOrElse(SecurityProtocol.PLAINTEXT, new EndPoint(null,-1,null))
-    ZkUtils.registerBrokerInZk(zkClient, brokerId, plaintextEndpoint.host, plaintextEndpoint.port,
updatedEndpoints, zkSessionTimeoutMs, jmxPort)
+    ZkUtils.registerBrokerInZk(zkClient, zkConnection, brokerId, plaintextEndpoint.host,
plaintextEndpoint.port, updatedEndpoints, jmxPort)
   }
 
   /**
@@ -71,9 +71,7 @@ class KafkaHealthcheck(private val brokerId: Int,
    */
   class SessionExpireListener() extends IZkStateListener {
     @throws(classOf[Exception])
-    def handleStateChanged(state: KeeperState) {
-      // do nothing, since zkclient will do reconnect for us.
-    }
+    def handleStateChanged(state: KeeperState) {}
 
     /**
      * Called after the zookeeper session has expired and a new session has been created.
You would have to re-create

http://git-wip-us.apache.org/repos/asf/kafka/blob/1daf6ac5/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 5cc9c5d..ba3333d 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -43,7 +43,7 @@ import org.apache.kafka.common.utils.AppInfoParser
 
 import scala.collection.mutable
 import scala.collection.JavaConverters._
-import org.I0Itec.zkclient.ZkClient
+import org.I0Itec.zkclient.{ZkClient, ZkConnection}
 import kafka.controller.{ControllerStats, KafkaController}
 import kafka.cluster.{EndPoint, Broker}
 import kafka.common.{ErrorMapping, InconsistentBrokerIdException, GenerateBrokerIdException}
@@ -129,6 +129,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
   val metadataCache: MetadataCache = new MetadataCache(config.brokerId)
 
   var zkClient: ZkClient = null
+  var zkConnection: ZkConnection = null
   val correlationId: AtomicInteger = new AtomicInteger(0)
   val brokerMetaPropsFile = "meta.properties"
   val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new
File(logDir + File.separator +brokerMetaPropsFile)))).toMap
@@ -164,7 +165,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
         kafkaScheduler.startup()
 
         /* setup zookeeper */
-        zkClient = initZk()
+        val (client, connection) = initZk()
+        zkClient = client
+        zkConnection = connection
 
         /* start log manager */
         logManager = createLogManager(zkClient, brokerState)
@@ -183,7 +186,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
         replicaManager.startup()
 
         /* start kafka controller */
-        kafkaController = new KafkaController(config, zkClient, brokerState, kafkaMetricsTime,
metrics, threadNamePrefix)
+        kafkaController = new KafkaController(config, zkClient, zkConnection, brokerState,
kafkaMetricsTime, metrics, threadNamePrefix)
         kafkaController.startup()
 
         /* start kafka coordinator */
@@ -220,7 +223,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
           else
             (protocol, endpoint)
         }
-        kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, config.zkSessionTimeoutMs,
zkClient)
+        kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkClient, zkConnection)
         kafkaHealthcheck.startup()
 
         /* register broker metrics */
@@ -242,7 +245,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
     }
   }
 
-  private def initZk(): ZkClient = {
+  private def initZk(): (ZkClient, ZkConnection) = {
     info("Connecting to zookeeper on " + config.zkConnect)
 
     val chroot = {
@@ -260,9 +263,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
       zkClientForChrootCreation.close()
     }
 
-    val zkClient = ZkUtils.createZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
+    val (zkClient, zkConnection) = ZkUtils.createZkClientAndConnection(config.zkConnect,
config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
     ZkUtils.setupCommonPaths(zkClient)
-    zkClient
+    (zkClient, zkConnection)
   }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1daf6ac5/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index 2b36b2d..b283e0a 100755
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -18,7 +18,7 @@ package kafka.server
 
 import kafka.utils.ZkUtils._
 import kafka.utils.CoreUtils._
-import kafka.utils.{Json, SystemTime, Logging}
+import kafka.utils.{Json, SystemTime, Logging, ZKCheckedEphemeral}
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import org.I0Itec.zkclient.IZkDataListener
 import kafka.controller.ControllerContext
@@ -56,7 +56,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
        case None => -1
     }
   }
-    
+
   def elect: Boolean = {
     val timestamp = SystemTime.milliseconds.toString
     val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp"
-> timestamp))
@@ -73,9 +73,10 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
     }
 
     try {
-      createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath,
electString, brokerId,
-        (controllerString : String, leaderId : Any) => KafkaController.parseControllerId(controllerString)
== leaderId.asInstanceOf[Int],
-        controllerContext.zkSessionTimeout)
+      val zkCheckedEphemeral = new ZKCheckedEphemeral(electionPath,
+                                                      electString,
+                                                      controllerContext.zkConnection.getZookeeper)
+      zkCheckedEphemeral.create()
       info(brokerId + " successfully elected as leader")
       leaderId = brokerId
       onBecomingLeader()

http://git-wip-us.apache.org/repos/asf/kafka/blob/1daf6ac5/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 74b587e..e1cfa2e 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -17,11 +17,12 @@
 
 package kafka.utils
 
+import java.util.concurrent.CountDownLatch
 import kafka.cluster._
 import kafka.consumer.{ConsumerThreadId, TopicCount}
 import kafka.server.ConfigType
-import org.I0Itec.zkclient.ZkClient
-import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException,
+import org.I0Itec.zkclient.{ZkClient,ZkConnection}
+import org.I0Itec.zkclient.exception.{ZkException, ZkNodeExistsException, ZkNoNodeException,
   ZkMarshallingError, ZkBadVersionException}
 import org.I0Itec.zkclient.serialize.ZkSerializer
 import org.apache.kafka.common.config.ConfigException
@@ -36,6 +37,14 @@ import kafka.controller.KafkaController
 import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.common.TopicAndPartition
 
+import org.apache.zookeeper.AsyncCallback.{DataCallback,StringCallback}
+import org.apache.zookeeper.CreateMode
+import org.apache.zookeeper.KeeperException
+import org.apache.zookeeper.KeeperException.Code
+import org.apache.zookeeper.ZooDefs.Ids
+import org.apache.zookeeper.ZooKeeper
+
+
 object ZkUtils extends Logging {
   val ConsumersPath = "/consumers"
   val BrokerIdsPath = "/brokers/ids"
@@ -195,24 +204,22 @@ object ZkUtils extends Logging {
    * @param timeout
    * @param jmxPort
    */
-  def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, advertisedEndpoints:
immutable.Map[SecurityProtocol, EndPoint], timeout: Int, jmxPort: Int) {
+  def registerBrokerInZk(zkClient: ZkClient, zkConnection: ZkConnection, id: Int, host: String,
port: Int, advertisedEndpoints: immutable.Map[SecurityProtocol, EndPoint], jmxPort: Int) {
     val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
     val timestamp = SystemTime.milliseconds.toString
 
     val brokerInfo = Json.encode(Map("version" -> 2, "host" -> host, "port" -> port,
"endpoints"->advertisedEndpoints.values.map(_.connectionString).toArray, "jmx_port" ->
jmxPort, "timestamp" -> timestamp))
-    val expectedBroker = new Broker(id, advertisedEndpoints)
-
-    registerBrokerInZk(zkClient, brokerIdPath, brokerInfo, expectedBroker, timeout)
+    registerBrokerInZk(zkClient, zkConnection, brokerIdPath, brokerInfo)
 
     info("Registered broker %d at path %s with addresses: %s".format(id, brokerIdPath, advertisedEndpoints.mkString(",")))
   }
 
-  private def registerBrokerInZk(zkClient: ZkClient, brokerIdPath: String, brokerInfo: String,
expectedBroker: Broker, timeout: Int) {
+  private def registerBrokerInZk(zkClient: ZkClient, zkConnection: ZkConnection, brokerIdPath:
String, brokerInfo: String) {
     try {
-      createEphemeralPathExpectConflictHandleZKBug(zkClient, brokerIdPath, brokerInfo, expectedBroker,
-        (brokerString: String, broker: Any) => Broker.createBroker(broker.asInstanceOf[Broker].id,
brokerString).equals(broker.asInstanceOf[Broker]),
-        timeout)
-
+      val zkCheckedEphemeral = new ZKCheckedEphemeral(brokerIdPath,
+                                                      brokerInfo,
+                                                      zkConnection.getZookeeper)
+      zkCheckedEphemeral.create()
     } catch {
       case e: ZkNodeExistsException =>
         throw new RuntimeException("A broker is already registered on the path " + brokerIdPath
@@ -302,47 +309,6 @@ object ZkUtils extends Logging {
   }
 
   /**
-   * Create an ephemeral node with the given path and data.
-   * Throw NodeExistsException if node already exists.
-   * Handles the following ZK session timeout bug:
-   *
-   * https://issues.apache.org/jira/browse/ZOOKEEPER-1740
-   *
-   * Upon receiving a NodeExistsException, read the data from the conflicted path and
-   * trigger the checker function comparing the read data and the expected data,
-   * If the checker function returns true then the above bug might be encountered, back off
and retry;
-   * otherwise re-throw the exception
-   */
-  def createEphemeralPathExpectConflictHandleZKBug(zkClient: ZkClient, path: String, data:
String, expectedCallerData: Any, checker: (String, Any) => Boolean, backoffTime: Int):
Unit = {
-    while (true) {
-      try {
-        createEphemeralPathExpectConflict(zkClient, path, data)
-        return
-      } catch {
-        case e: ZkNodeExistsException => {
-          // An ephemeral node may still exist even after its corresponding session has expired
-          // due to a Zookeeper bug, in this case we need to retry writing until the previous
node is deleted
-          // and hence the write succeeds without ZkNodeExistsException
-          ZkUtils.readDataMaybeNull(zkClient, path)._1 match {
-            case Some(writtenData) => {
-              if (checker(writtenData, expectedCallerData)) {
-                info("I wrote this conflicted ephemeral node [%s] at %s a while back in a
different session, ".format(data, path)
-                  + "hence I will backoff for this node to be deleted by Zookeeper and retry")
-
-                Thread.sleep(backoffTime)
-              } else {
-                throw e
-              }
-            }
-            case None => // the node disappeared; retry creating the ephemeral node immediately
-          }
-        }
-        case e2: Throwable => throw e2
-      }
-    }
-  }
-
-  /**
    * Create an persistent node with the given path and data. Create parents if necessary.
    */
   def createPersistentPath(client: ZkClient, path: String, data: String = ""): Unit = {
@@ -809,7 +775,13 @@ object ZkUtils extends Logging {
 
   def createZkClient(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int): ZkClient
= {
     val zkClient = new ZkClient(zkUrl, sessionTimeout, connectionTimeout, ZKStringSerializer)
-    zkClient    
+    zkClient
+  }
+
+  def createZkClientAndConnection(zkUrl: String, sessionTimeout: Int, connectionTimeout:
Int): (ZkClient, ZkConnection) = {
+    val zkConnection = new ZkConnection(zkUrl, sessionTimeout)
+    val zkClient = new ZkClient(zkConnection, connectionTimeout, ZKStringSerializer)
+    (zkClient, zkConnection)
   }
 }
 
@@ -892,3 +864,157 @@ object ZkPath {
     client.createPersistentSequential(path, data)
   }
 }
+
+/**
+ * Creates an ephemeral znode checking the session owner
+ * in the case of conflict. In the regular case, the
+ * znode is created and the create call returns OK. If
+ * the call receives a node exists event, then it checks
+ * if the session matches. If it does, then it returns OK,
+ * and otherwise it fails the operation.  
+ */
+
+class ZKCheckedEphemeral(path: String,
+                         data: String,
+                         zkHandle: ZooKeeper) extends Logging {
+  private val createCallback = new CreateCallback
+  private val getDataCallback = new GetDataCallback
+  val latch: CountDownLatch = new CountDownLatch(1)
+  var result: Code = Code.OK
+  
+  private class CreateCallback extends StringCallback {
+    def processResult(rc: Int,
+                      path: String,
+                      ctx: Object,
+                      name: String) {
+      Code.get(rc) match {
+        case Code.OK =>
+           setResult(Code.OK)
+        case Code.CONNECTIONLOSS =>
+          // try again
+          createEphemeral
+        case Code.NONODE =>
+          error("No node for path %s (could be the parent missing)".format(path))
+          setResult(Code.NONODE)
+        case Code.NODEEXISTS =>
+          zkHandle.getData(path, false, getDataCallback, null)
+        case Code.SESSIONEXPIRED =>
+          error("Session has expired while creating %s".format(path))
+          setResult(Code.SESSIONEXPIRED)
+        case _ =>
+          warn("ZooKeeper event while creating registration node: %s %s".format(path, Code.get(rc)))
+          setResult(Code.get(rc))
+      }
+    }
+  }
+
+  private class GetDataCallback extends DataCallback {
+      def processResult(rc: Int,
+                        path: String,
+                        ctx: Object,
+                        readData: Array[Byte],
+                        stat: Stat) {
+        Code.get(rc) match {
+          case Code.OK =>
+                if (stat.getEphemeralOwner != zkHandle.getSessionId)
+                  setResult(Code.NODEEXISTS)
+                else
+                  setResult(Code.OK)
+          case Code.NONODE =>
+            info("The ephemeral node [%s] at %s has gone away while reading it, ".format(data,
path))
+            createEphemeral
+          case Code.SESSIONEXPIRED =>
+            error("Session has expired while reading znode %s".format(path))
+            setResult(Code.SESSIONEXPIRED)
+          case _ =>
+            warn("ZooKeeper event while getting znode data: %s %s".format(path, Code.get(rc)))
+            setResult(Code.get(rc))
+        }
+      }
+  }
+  
+  private def createEphemeral() {
+    zkHandle.create(path,
+                    ZKStringSerializer.serialize(data),
+                    Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.EPHEMERAL,
+                    createCallback,
+                    null)
+  }
+  
+  private def createRecursive(prefix: String, suffix: String) {
+    debug("Path: %s, Prefix: %s, Suffix: %s".format(path, prefix, suffix))
+    if(suffix.isEmpty()) {
+      createEphemeral
+    } else {
+      zkHandle.create(prefix,
+                      new Array[Byte](0),
+                      Ids.OPEN_ACL_UNSAFE,
+                      CreateMode.PERSISTENT,
+                      new StringCallback() {
+                        def processResult(rc : Int,
+                                          path : String,
+                                          ctx : Object,
+                                          name : String) {
+                          Code.get(rc) match {
+                            case Code.OK | Code.NODEEXISTS =>
+                              // Nothing to do
+                            case Code.CONNECTIONLOSS =>
+                              // try again
+                              val suffix = ctx.asInstanceOf[String]
+                              createRecursive(path, suffix)
+                            case Code.NONODE =>
+                              error("No node for path %s (could be the parent missing)".format(path))
+                              setResult(Code.get(rc))
+                            case Code.SESSIONEXPIRED =>
+                              error("Session has expired while creating %s".format(path))
+                              setResult(Code.get(rc))
+                            case _ =>
+                              warn("ZooKeeper event while creating registration node: %s
%s".format(path, Code.get(rc)))
+                              setResult(Code.get(rc))
+                          }
+                        }
+                  },
+                  suffix)
+      // Update prefix and suffix
+      val index = suffix.indexOf('/', 1) match {
+        case -1 => suffix.length
+        case x : Int => x
+      }
+      // Get new prefix
+      val newPrefix = prefix + suffix.substring(0, index)
+      // Get new suffix
+      val newSuffix = suffix.substring(index, suffix.length)
+      createRecursive(newPrefix, newSuffix)
+    }
+  }
+
+  private def setResult(code: Code) {
+    result = code
+    latch.countDown()
+  }
+
+  private def waitUntilResolved(): Code = {
+    latch.await()
+    result
+  }
+
+  def create() {
+    val index = path.indexOf('/', 1) match {
+        case -1 => path.length
+        case x : Int => x
+    }
+    val prefix = path.substring(0, index)
+    val suffix = path.substring(index, path.length)
+    debug("Path: %s, Prefix: %s, Suffix: %s".format(path, prefix, suffix))
+    createRecursive(prefix, suffix)
+    val result = waitUntilResolved()
+    info("Result of znode creation is: %s".format(result))
+    result match {
+      case Code.OK =>
+        // Nothing to do
+      case _ =>
+        throw ZkException.create(KeeperException.create(result))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1daf6ac5/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 9bd8171..2d18069 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -66,7 +66,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
   @Test
   def testManualReplicaAssignment() {
     val brokers = List(0, 1, 2, 3, 4)
-    TestUtils.createBrokersInZk(zkClient, brokers)
+    TestUtils.createBrokersInZk(zkClient, zkConnection, brokers)
 
     // duplicate brokers
     intercept[IllegalArgumentException] {
@@ -117,7 +117,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
       11 -> 1
     )
     val topic = "test"
-    TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4))
+    TestUtils.createBrokersInZk(zkClient, zkConnection, List(0, 1, 2, 3, 4))
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
     // create leaders for all partitions
@@ -137,7 +137,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
   def testTopicCreationWithCollision() {
     val topic = "test.topic"
     val collidingTopic = "test_topic"
-    TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4))
+    TestUtils.createBrokersInZk(zkClient, zkConnection, List(0, 1, 2, 3, 4))
     // create the topic
     AdminUtils.createTopic(zkClient, topic, 3, 1)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1daf6ac5/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index 9bfec72..d4fa0d5 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -36,7 +36,7 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging {
     val cleanupVal = "compact"
     // create brokers
     val brokers = List(0, 1, 2)
-    TestUtils.createBrokersInZk(zkClient, brokers)
+    TestUtils.createBrokersInZk(zkClient, zkConnection, brokers)
     // create the topic
     val createOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString,
       "--replication-factor", "1",
@@ -67,7 +67,7 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging {
 
     // create brokers
     val brokers = List(0, 1, 2)
-    TestUtils.createBrokersInZk(zkClient, brokers)
+    TestUtils.createBrokersInZk(zkClient, zkConnection, brokers)
 
     // create the NormalTopic
     val createOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString,

http://git-wip-us.apache.org/repos/asf/kafka/blob/1daf6ac5/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index ff17830..ac347ef 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -136,7 +136,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
       new LeaderAndIsrRequest.EndPoint(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port)
     }
 
-    val controllerContext = new ControllerContext(zkClient, 6000)
+    val controllerContext = new ControllerContext(zkClient, zkConnection, 6000)
     controllerContext.liveBrokers = brokers.toSet
     val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig,
new SystemTime, new Metrics)
     controllerChannelManager.startup()

http://git-wip-us.apache.org/repos/asf/kafka/blob/1daf6ac5/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index b01adc8..7f482fb 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.common.utils.Utils._
 
 import collection.mutable.ListBuffer
 
-import org.I0Itec.zkclient.ZkClient
+import org.I0Itec.zkclient.{ZkClient, ZkConnection}
 
 import kafka.server._
 import kafka.producer._
@@ -500,9 +500,9 @@ object TestUtils extends Logging {
     }
   }
 
-  def createBrokersInZk(zkClient: ZkClient, ids: Seq[Int]): Seq[Broker] = {
+  def createBrokersInZk(zkClient: ZkClient, zkConnection: ZkConnection, ids: Seq[Int]): Seq[Broker]
= {
     val brokers = ids.map(id => new Broker(id, "localhost", 6667, SecurityProtocol.PLAINTEXT))
-    brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, b.id, "localhost", 6667,
b.endPoints, 6000, jmxPort = -1))
+    brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, zkConnection, b.id, "localhost",
6667, b.endPoints, jmxPort = -1))
     brokers
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1daf6ac5/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
index f240e89..2bf658c 100644
--- a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
@@ -19,7 +19,13 @@ package kafka.zk
 
 import kafka.consumer.ConsumerConfig
 import kafka.utils.ZkUtils
+import kafka.utils.ZKCheckedEphemeral
 import kafka.utils.TestUtils
+import org.apache.zookeeper.CreateMode
+import org.apache.zookeeper.WatchedEvent
+import org.apache.zookeeper.Watcher
+import org.apache.zookeeper.ZooDefs.Ids
+import org.I0Itec.zkclient.exception.{ZkException,ZkNodeExistsException}
 import org.junit.{Test, Assert}
 
 class ZKEphemeralTest extends ZooKeeperTestHarness {
@@ -44,4 +50,96 @@ class ZKEphemeralTest extends ZooKeeperTestHarness {
     val nodeExists = ZkUtils.pathExists(zkClient, "/tmp/zktest")
     Assert.assertFalse(nodeExists)
   }
+
+  /*****
+   ***** Tests for ZkWatchedEphemeral
+   *****/
+
+  /**
+   * Tests basic creation
+   */
+  @Test
+  def testZkWatchedEphemeral = {
+    val path = "/zwe-test"
+    testCreation(path)
+  }
+
+  /**
+   * Tests recursive creation
+   */
+  @Test
+  def testZkWatchedEphemeralRecursive = {
+    val path = "/zwe-test-parent/zwe-test"
+    testCreation(path)
+  }
+
+  private def testCreation(path: String) {
+    val zk = zkConnection.getZookeeper
+    val zwe = new ZKCheckedEphemeral(path, "", zk)
+    var created = false
+    var counter = 10
+
+    zk.exists(path, new Watcher() {
+      def process(event: WatchedEvent) {
+        if(event.getType == Watcher.Event.EventType.NodeCreated) {
+          created = true
+        }
+      }
+    })
+    zwe.create()
+    // Waits until the znode is created
+    TestUtils.waitUntilTrue(() => ZkUtils.pathExists(zkClient, path),
+                            "Znode %s wasn't created".format(path))
+  }
+
+  /**
+   * Tests that it fails in the presence of an overlapping
+   * session.
+   */
+  @Test
+  def testOverlappingSessions = {
+    val path = "/zwe-test"
+    val zk1 = zkConnection.getZookeeper
+
+    //Creates a second session
+    val (_, zkConnection2) = ZkUtils.createZkClientAndConnection(zkConnect, zkSessionTimeoutMs,
zkConnectionTimeout)
+    val zk2 = zkConnection2.getZookeeper
+    var zwe = new ZKCheckedEphemeral(path, "", zk2)
+
+    // Creates znode for path in the first session
+    zk1.create(path, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
+    
+    //Bootstraps the ZKWatchedEphemeral object
+    var gotException = false;
+    try {
+      zwe.create()
+    } catch {
+      case e: ZkNodeExistsException =>
+        gotException = true
+    }
+    Assert.assertTrue(gotException)
+  }
+  
+  /**
+   * Tests if succeeds with znode from the same session
+   * 
+   */
+  @Test
+  def testSameSession = {
+    val path = "/zwe-test"
+    val zk = zkConnection.getZookeeper
+    // Creates znode for path in the first session
+    zk.create(path, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
+    
+    var zwe = new ZKCheckedEphemeral(path, "", zk)
+    //Bootstraps the ZKWatchedEphemeral object
+    var gotException = false;
+    try {
+      zwe.create()
+    } catch {
+      case e: ZkNodeExistsException =>
+        gotException = true
+    }
+    Assert.assertFalse(gotException)
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1daf6ac5/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index e4bfb48..3e1c6e0 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -17,7 +17,7 @@
 
 package kafka.zk
 
-import org.I0Itec.zkclient.ZkClient
+import org.I0Itec.zkclient.{ZkClient, ZkConnection}
 import kafka.utils.{ZkUtils, CoreUtils}
 import org.junit.{After, Before}
 import org.scalatest.junit.JUnitSuite
@@ -26,6 +26,7 @@ trait ZooKeeperTestHarness extends JUnitSuite {
   var zkPort: Int = -1
   var zookeeper: EmbeddedZookeeper = null
   var zkClient: ZkClient = null
+  var zkConnection : ZkConnection = null
   val zkConnectionTimeout = 6000
   val zkSessionTimeout = 6000
 
@@ -35,7 +36,9 @@ trait ZooKeeperTestHarness extends JUnitSuite {
   def setUp() {
     zookeeper = new EmbeddedZookeeper()
     zkPort = zookeeper.port
-    zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout)
+    val (client, connection) = ZkUtils.createZkClientAndConnection(zkConnect, zkSessionTimeout,
zkConnectionTimeout)
+    zkClient = client
+    zkConnection = connection
   }
 
   @After


Mime
View raw message