kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-5501; use async zookeeper apis everywhere
Date Wed, 26 Jul 2017 06:36:11 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 47ee8e954 -> 97fc2ca49


KAFKA-5501; use async zookeeper apis everywhere

Synchronous zookeeper writes means that we wait an entire round trip before doing the next
write. With respect to the controller, these synchronous writes are happening at a per-partition
granularity in several places, so partition-heavy clusters suffer from the controller doing
many sequential round trips to zookeeper.
- PartitionStateMachine.electLeaderForPartition updates leaderAndIsr in zookeeper on transition
to OnlinePartition. This gets triggered per-partition sequentially with synchronous writes
during controlled shutdown of the shutting down broker's replicas for which it is the leader.
- ReplicaStateMachine updates leaderAndIsr in zookeeper on transition to OfflineReplica when
calling KafkaController.removeReplicaFromIsr. This gets triggered per-partition sequentially
with synchronous writes for failed or controlled shutdown brokers.

Author: Onur Karaman <okaraman@linkedin.com>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #3427 from onurkaraman/KAFKA-5501


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/97fc2ca4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/97fc2ca4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/97fc2ca4

Branch: refs/heads/trunk
Commit: 97fc2ca499dc15ecd255a5468a0086ab63e4eecc
Parents: 47ee8e9
Author: Onur Karaman <okaraman@linkedin.com>
Authored: Wed Jul 26 08:36:05 2017 +0200
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed Jul 26 08:36:05 2017 +0200

----------------------------------------------------------------------
 .../kafka/controller/ZookeeperClient.scala      | 286 ++++++++++++++++
 .../kafka/controller/ZookeeperClientTest.scala  | 329 +++++++++++++++++++
 2 files changed, 615 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/97fc2ca4/core/src/main/scala/kafka/controller/ZookeeperClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ZookeeperClient.scala b/core/src/main/scala/kafka/controller/ZookeeperClient.scala
new file mode 100644
index 0000000..7ffa511
--- /dev/null
+++ b/core/src/main/scala/kafka/controller/ZookeeperClient.scala
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.controller
+
+import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock}
+import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap, CountDownLatch, TimeUnit}
+
+import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock}
+import kafka.utils.Logging
+import org.apache.zookeeper.AsyncCallback.{ACLCallback, Children2Callback, DataCallback,
StatCallback, StringCallback, VoidCallback}
+import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState}
+import org.apache.zookeeper.ZooKeeper.States
+import org.apache.zookeeper.data.{ACL, Stat}
+import org.apache.zookeeper.{CreateMode, WatchedEvent, Watcher, ZooKeeper}
+
+/**
+  * ZookeeperClient is a zookeeper client that encourages pipelined requests to zookeeper.
+  *
+  * @param connectString comma separated host:port pairs, each corresponding to a zk server
+  * @param sessionTimeoutMs session timeout in milliseconds
+  * @param connectionTimeoutMs connection timeout in milliseconds
+  * @param stateChangeHandler state change handler callbacks called by the underlying zookeeper
client's EventThread.
+  */
+class ZookeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTimeoutMs:
Int, stateChangeHandler: StateChangeHandler) extends Logging {
+  this.logIdent = "[ZookeeperClient]: "
+  private val initializationLock = new ReentrantReadWriteLock()
+  private val isConnectedOrExpiredLock = new ReentrantLock()
+  private val isConnectedOrExpiredCondition = isConnectedOrExpiredLock.newCondition()
+  private val zNodeChangeHandlers = new ConcurrentHashMap[String, ZNodeChangeHandler]()
+  private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, ZNodeChildChangeHandler]()
+
+  info(s"Initializing a new session to $connectString.")
+  @volatile private var zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZookeeperClientWatcher)
+  waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS)
+
+  /**
+    * Take an AsyncRequest and wait for its AsyncResponse.
+    * @param request a single AsyncRequest to wait on.
+    * @return the request's AsyncReponse.
+    */
+  def handle(request: AsyncRequest): AsyncResponse = {
+    handle(Seq(request)).head
+  }
+
+  /**
+    * Pipeline a sequence of AsyncRequests and wait for all of their AsyncResponses.
+    * @param requests a sequence of AsyncRequests to wait on.
+    * @return the AsyncResponses.
+    */
+  def handle(requests: Seq[AsyncRequest]): Seq[AsyncResponse] = inReadLock(initializationLock)
{
+    import scala.collection.JavaConverters._
+    val countDownLatch = new CountDownLatch(requests.size)
+    val responseQueue = new ArrayBlockingQueue[AsyncResponse](requests.size)
+    requests.foreach {
+      case CreateRequest(path, data, acl, createMode, ctx) => zooKeeper.create(path, data,
acl.asJava, createMode, new StringCallback {
+        override def processResult(rc: Int, path: String, ctx: Any, name: String) = {
+          responseQueue.add(CreateResponse(rc, path, ctx, name))
+          countDownLatch.countDown()
+        }}, ctx)
+      case DeleteRequest(path, version, ctx) => zooKeeper.delete(path, version, new VoidCallback
{
+        override def processResult(rc: Int, path: String, ctx: Any) = {
+          responseQueue.add(DeleteResponse(rc, path, ctx))
+          countDownLatch.countDown()
+        }}, ctx)
+      case ExistsRequest(path, ctx) => zooKeeper.exists(path, zNodeChangeHandlers.containsKey(path),
new StatCallback {
+        override def processResult(rc: Int, path: String, ctx: Any, stat: Stat) = {
+          responseQueue.add(ExistsResponse(rc, path, ctx, stat))
+          countDownLatch.countDown()
+        }}, ctx)
+      case GetDataRequest(path, ctx) => zooKeeper.getData(path, zNodeChangeHandlers.containsKey(path),
new DataCallback {
+        override def processResult(rc: Int, path: String, ctx: Any, data: Array[Byte], stat:
Stat) = {
+          responseQueue.add(GetDataResponse(rc, path, ctx, data, stat))
+          countDownLatch.countDown()
+        }}, ctx)
+      case SetDataRequest(path, data, version, ctx) => zooKeeper.setData(path, data, version,
new StatCallback {
+        override def processResult(rc: Int, path: String, ctx: Any, stat: Stat) = {
+          responseQueue.add(SetDataResponse(rc, path, ctx, stat))
+          countDownLatch.countDown()
+        }}, ctx)
+      case GetACLRequest(path, ctx) => zooKeeper.getACL(path, null, new ACLCallback {
+        override def processResult(rc: Int, path: String, ctx: Any, acl: java.util.List[ACL],
stat: Stat): Unit = {
+          responseQueue.add(GetACLResponse(rc, path, ctx, Option(acl).map(_.asScala).orNull,
stat))
+          countDownLatch.countDown()
+        }}, ctx)
+      case SetACLRequest(path, acl, version, ctx) => zooKeeper.setACL(path, acl.asJava,
version, new StatCallback {
+        override def processResult(rc: Int, path: String, ctx: Any, stat: Stat) = {
+          responseQueue.add(SetACLResponse(rc, path, ctx, stat))
+          countDownLatch.countDown()
+        }}, ctx)
+      case GetChildrenRequest(path, ctx) => zooKeeper.getChildren(path, zNodeChildChangeHandlers.containsKey(path),
new Children2Callback {
+        override def processResult(rc: Int, path: String, ctx: Any, children: java.util.List[String],
stat: Stat) = {
+          responseQueue.add(GetChildrenResponse(rc, path, ctx, Option(children).map(_.asScala).orNull,
stat))
+          countDownLatch.countDown()
+        }}, ctx)
+    }
+    countDownLatch.await()
+    responseQueue.asScala.toSeq
+  }
+
+  /**
+    * Wait indefinitely until the underlying zookeeper client to reaches the CONNECTED state.
+    * @throws ZookeeperClientAuthFailedException if the authentication failed either before
or while waiting for connection.
+    * @throws ZookeeperClientExpiredException if the session expired either before or while
waiting for connection.
+    */
+  def waitUntilConnected(): Unit = inLock(isConnectedOrExpiredLock) {
+    waitUntilConnected(Long.MaxValue, TimeUnit.MILLISECONDS)
+  }
+
+  private def waitUntilConnected(timeout: Long, timeUnit: TimeUnit): Unit = {
+    info("Waiting until connected.")
+    var nanos = timeUnit.toNanos(timeout)
+    inLock(isConnectedOrExpiredLock) {
+      var state = zooKeeper.getState
+      while (!state.isConnected && state.isAlive) {
+        if (nanos <= 0) {
+          throw new ZookeeperClientTimeoutException(s"Timed out waiting for connection while
in state: $state")
+        }
+        nanos = isConnectedOrExpiredCondition.awaitNanos(nanos)
+        state = zooKeeper.getState
+      }
+      if (state == States.AUTH_FAILED) {
+        throw new ZookeeperClientAuthFailedException("Auth failed either before or while
waiting for connection")
+      } else if (state == States.CLOSED) {
+        throw new ZookeeperClientExpiredException("Session expired either before or while
waiting for connection")
+      }
+    }
+    info("Connected.")
+  }
+
+  def registerZNodeChangeHandler(zNodeChangeHandler: ZNodeChangeHandler): ExistsResponse
= {
+    registerZNodeChangeHandlers(Seq(zNodeChangeHandler)).head
+  }
+
+  def registerZNodeChangeHandlers(handlers: Seq[ZNodeChangeHandler]): Seq[ExistsResponse]
= {
+    handlers.foreach(handler => zNodeChangeHandlers.put(handler.path, handler))
+    val asyncRequests = handlers.map(handler => ExistsRequest(handler.path, null))
+    handle(asyncRequests).asInstanceOf[Seq[ExistsResponse]]
+  }
+
+  def unregisterZNodeChangeHandler(path: String): Unit = {
+    zNodeChangeHandlers.remove(path)
+  }
+
+  def registerZNodeChildChangeHandler(zNodeChildChangeHandler: ZNodeChildChangeHandler):
GetChildrenResponse = {
+    registerZNodeChildChangeHandlers(Seq(zNodeChildChangeHandler)).head
+  }
+
+  def registerZNodeChildChangeHandlers(handlers: Seq[ZNodeChildChangeHandler]): Seq[GetChildrenResponse]
= {
+    handlers.foreach(handler => zNodeChildChangeHandlers.put(handler.path, handler))
+    val asyncRequests = handlers.map(handler => GetChildrenRequest(handler.path, null))
+    handle(asyncRequests).asInstanceOf[Seq[GetChildrenResponse]]
+  }
+
+  def unregisterZNodeChildChangeHandler(path: String): Unit = {
+    zNodeChildChangeHandlers.remove(path)
+  }
+
+  def close(): Unit = inWriteLock(initializationLock) {
+    info("Closing.")
+    zNodeChangeHandlers.clear()
+    zNodeChildChangeHandlers.clear()
+    zooKeeper.close()
+    info("Closed.")
+  }
+
+  private def initialize(): Unit = {
+    if (!zooKeeper.getState.isAlive) {
+      info(s"Initializing a new session to $connectString.")
+      var now = System.currentTimeMillis()
+      val threshold = now + connectionTimeoutMs
+      while (now < threshold) {
+        try {
+          zooKeeper.close()
+          zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZookeeperClientWatcher)
+          waitUntilConnected(threshold - now, TimeUnit.MILLISECONDS)
+          return
+        } catch {
+          case _: Exception =>
+            now = System.currentTimeMillis()
+            if (now < threshold) {
+              Thread.sleep(1000)
+              now = System.currentTimeMillis()
+            }
+        }
+      }
+      info(s"Timed out waiting for connection during session initialization while in state:
${zooKeeper.getState}")
+      stateChangeHandler.onConnectionTimeout
+    }
+  }
+
+  private object ZookeeperClientWatcher extends Watcher {
+    override def process(event: WatchedEvent): Unit = {
+      debug("Received event: " + event)
+      if (event.getPath == null) {
+        inLock(isConnectedOrExpiredLock) {
+          isConnectedOrExpiredCondition.signalAll()
+        }
+        if (event.getState == KeeperState.AuthFailed) {
+          info("Auth failed.")
+          stateChangeHandler.onAuthFailure
+        } else if (event.getState == KeeperState.Expired) {
+          inWriteLock(initializationLock) {
+            info("Session expired.")
+            stateChangeHandler.beforeInitializingSession
+            initialize()
+            stateChangeHandler.afterInitializingSession
+          }
+        }
+      } else if (event.getType == EventType.NodeCreated) {
+        Option(zNodeChangeHandlers.get(event.getPath)).foreach(_.handleCreation)
+      } else if (event.getType == EventType.NodeDeleted) {
+        Option(zNodeChangeHandlers.get(event.getPath)).foreach(_.handleDeletion)
+      } else if (event.getType == EventType.NodeDataChanged) {
+        Option(zNodeChangeHandlers.get(event.getPath)).foreach(_.handleDataChange)
+      } else if (event.getType == EventType.NodeChildrenChanged) {
+        Option(zNodeChildChangeHandlers.get(event.getPath)).foreach(_.handleChildChange)
+      }
+    }
+  }
+}
+
+trait StateChangeHandler {
+  def beforeInitializingSession: Unit
+  def afterInitializingSession: Unit
+  def onAuthFailure: Unit
+  def onConnectionTimeout: Unit
+}
+
+trait ZNodeChangeHandler {
+  val path: String
+  def handleCreation: Unit
+  def handleDeletion: Unit
+  def handleDataChange: Unit
+}
+
+trait ZNodeChildChangeHandler {
+  val path: String
+  def handleChildChange: Unit
+}
+
+sealed trait AsyncRequest {
+  val path: String
+  val ctx: Any
+}
+case class CreateRequest(path: String, data: Array[Byte], acl: Seq[ACL], createMode: CreateMode,
ctx: Any) extends AsyncRequest
+case class DeleteRequest(path: String, version: Int, ctx: Any) extends AsyncRequest
+case class ExistsRequest(path: String, ctx: Any) extends AsyncRequest
+case class GetDataRequest(path: String, ctx: Any) extends AsyncRequest
+case class SetDataRequest(path: String, data: Array[Byte], version: Int, ctx: Any) extends
AsyncRequest
+case class GetACLRequest(path: String, ctx: Any) extends AsyncRequest
+case class SetACLRequest(path: String, acl: Seq[ACL], version: Int, ctx: Any) extends AsyncRequest
+case class GetChildrenRequest(path: String, ctx: Any) extends AsyncRequest
+
+sealed trait AsyncResponse {
+  val rc: Int
+  val path: String
+  val ctx: Any
+}
+case class CreateResponse(rc: Int, path: String, ctx: Any, name: String) extends AsyncResponse
+case class DeleteResponse(rc: Int, path: String, ctx: Any) extends AsyncResponse
+case class ExistsResponse(rc: Int, path: String, ctx: Any, stat: Stat) extends AsyncResponse
+case class GetDataResponse(rc: Int, path: String, ctx: Any, data: Array[Byte], stat: Stat)
extends AsyncResponse
+case class SetDataResponse(rc: Int, path: String, ctx: Any, stat: Stat) extends AsyncResponse
+case class GetACLResponse(rc: Int, path: String, ctx: Any, acl: Seq[ACL], stat: Stat) extends
AsyncResponse
+case class SetACLResponse(rc: Int, path: String, ctx: Any, stat: Stat) extends AsyncResponse
+case class GetChildrenResponse(rc: Int, path: String, ctx: Any, children: Seq[String], stat:
Stat) extends AsyncResponse
+
+class ZookeeperClientException(message: String) extends RuntimeException(message)
+class ZookeeperClientExpiredException(message: String) extends ZookeeperClientException(message)
+class ZookeeperClientAuthFailedException(message: String) extends ZookeeperClientException(message)
+class ZookeeperClientTimeoutException(message: String) extends ZookeeperClientException(message)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/97fc2ca4/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala b/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala
new file mode 100644
index 0000000..70a2409
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.controller
+
+import java.net.UnknownHostException
+import java.nio.charset.StandardCharsets
+import java.util.UUID
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+import javax.security.auth.login.Configuration
+
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.security.JaasUtils
+import org.apache.zookeeper.KeeperException.Code
+import org.apache.zookeeper.{CreateMode, ZooDefs}
+import org.junit.Assert.{assertArrayEquals, assertEquals, assertTrue}
+import org.junit.{After, Test}
+
+class ZookeeperClientTest extends ZooKeeperTestHarness {
+  private val mockPath = "/foo"
+
+  @After
+  override def tearDown() {
+    super.tearDown()
+    System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
+    Configuration.setConfiguration(null)
+  }
+
+  @Test(expected = classOf[UnknownHostException])
+  def testUnresolvableConnectString(): Unit = {
+    new ZookeeperClient("-1", -1, -1, null)
+  }
+
+  @Test(expected = classOf[ZookeeperClientTimeoutException])
+  def testConnectionTimeout(): Unit = {
+    zookeeper.shutdown()
+    new ZookeeperClient(zkConnect, zkSessionTimeout, connectionTimeoutMs = 100, null)
+  }
+
+  @Test
+  def testConnection(): Unit = {
+    new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+  }
+
+  @Test
+  def testDeleteNonExistentZNode(): Unit = {
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val deleteResponse = zookeeperClient.handle(DeleteRequest(mockPath, -1, null)).asInstanceOf[DeleteResponse]
+    assertEquals("Response code should be NONODE", Code.NONODE, Code.get(deleteResponse.rc))
+  }
+
+  @Test
+  def testDeleteExistingZNode(): Unit = {
+    import scala.collection.JavaConverters._
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
+    assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc))
+    val deleteResponse = zookeeperClient.handle(DeleteRequest(mockPath, -1, null)).asInstanceOf[DeleteResponse]
+    assertEquals("Response code for delete should be OK", Code.OK, Code.get(deleteResponse.rc))
+  }
+
+  @Test
+  def testExistsNonExistentZNode(): Unit = {
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val existsResponse = zookeeperClient.handle(ExistsRequest(mockPath, null)).asInstanceOf[ExistsResponse]
+    assertEquals("Response code should be NONODE", Code.NONODE, Code.get(existsResponse.rc))
+  }
+
+  @Test
+  def testExistsExistingZNode(): Unit = {
+    import scala.collection.JavaConverters._
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
+    assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc))
+    val existsResponse = zookeeperClient.handle(ExistsRequest(mockPath, null)).asInstanceOf[ExistsResponse]
+    assertEquals("Response code for exists should be OK", Code.OK, Code.get(existsResponse.rc))
+  }
+
+  @Test
+  def testGetDataNonExistentZNode(): Unit = {
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val getDataResponse = zookeeperClient.handle(GetDataRequest(mockPath, null)).asInstanceOf[GetDataResponse]
+    assertEquals("Response code should be NONODE", Code.NONODE, Code.get(getDataResponse.rc))
+  }
+
+  @Test
+  def testGetDataExistingZNode(): Unit = {
+    import scala.collection.JavaConverters._
+    val data = bytes
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, data, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala,
CreateMode.PERSISTENT, null))
+    assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc))
+    val getDataResponse = zookeeperClient.handle(GetDataRequest(mockPath, null)).asInstanceOf[GetDataResponse]
+    assertEquals("Response code for getData should be OK", Code.OK, Code.get(getDataResponse.rc))
+    assertArrayEquals("Data for getData should match created znode data", data, getDataResponse.data)
+  }
+
+  @Test
+  def testSetDataNonExistentZNode(): Unit = {
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val setDataResponse = zookeeperClient.handle(SetDataRequest(mockPath, Array.empty[Byte],
-1, null)).asInstanceOf[SetDataResponse]
+    assertEquals("Response code should be NONODE", Code.NONODE, Code.get(setDataResponse.rc))
+  }
+
+  @Test
+  def testSetDataExistingZNode(): Unit = {
+    import scala.collection.JavaConverters._
+    val data = bytes
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
+    assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc))
+    val setDataResponse = zookeeperClient.handle(SetDataRequest(mockPath, data, -1, null)).asInstanceOf[SetDataResponse]
+    assertEquals("Response code for setData should be OK", Code.OK, Code.get(setDataResponse.rc))
+    val getDataResponse = zookeeperClient.handle(GetDataRequest(mockPath, null)).asInstanceOf[GetDataResponse]
+    assertEquals("Response code for getData should be OK", Code.OK, Code.get(getDataResponse.rc))
+    assertArrayEquals("Data for getData should match setData's data", data, getDataResponse.data)
+  }
+
+  @Test
+  def testGetACLNonExistentZNode(): Unit = {
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val getACLResponse = zookeeperClient.handle(GetACLRequest(mockPath, null)).asInstanceOf[GetACLResponse]
+    assertEquals("Response code should be NONODE", Code.NONODE, Code.get(getACLResponse.rc))
+  }
+
+  @Test
+  def testGetACLExistingZNode(): Unit = {
+    import scala.collection.JavaConverters._
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
+    assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc))
+    val getACLResponse = zookeeperClient.handle(GetACLRequest(mockPath, null)).asInstanceOf[GetACLResponse]
+    assertEquals("Response code for getACL should be OK", Code.OK, Code.get(getACLResponse.rc))
+    assertEquals("ACL should be " + ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala,
getACLResponse.acl)
+  }
+
+  @Test
+  def testSetACLNonExistentZNode(): Unit = {
+    import scala.collection.JavaConverters._
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val setACLResponse = zookeeperClient.handle(SetACLRequest(mockPath, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala,
-1, null)).asInstanceOf[SetACLResponse]
+    assertEquals("Response code should be NONODE", Code.NONODE, Code.get(setACLResponse.rc))
+  }
+
+  @Test
+  def testGetChildrenNonExistentZNode(): Unit = {
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val getChildrenResponse = zookeeperClient.handle(GetChildrenRequest(mockPath, null)).asInstanceOf[GetChildrenResponse]
+    assertEquals("Response code should be NONODE", Code.NONODE, Code.get(getChildrenResponse.rc))
+  }
+
+  @Test
+  def testGetChildrenExistingZNode(): Unit = {
+    import scala.collection.JavaConverters._
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
+    assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc))
+    val getChildrenResponse = zookeeperClient.handle(GetChildrenRequest(mockPath, null)).asInstanceOf[GetChildrenResponse]
+    assertEquals("Response code for getChildren should be OK", Code.OK, Code.get(getChildrenResponse.rc))
+    assertEquals("getChildren should return no children", Seq.empty[String], getChildrenResponse.children)
+  }
+
+  @Test
+  def testGetChildrenExistingZNodeWithChildren(): Unit = {
+    import scala.collection.JavaConverters._
+    val child1 = "child1"
+    val child2 = "child2"
+    val child1Path = mockPath + "/" + child1
+    val child2Path = mockPath + "/" + child2
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
+    assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc))
+    val createResponseChild1 = zookeeperClient.handle(CreateRequest(child1Path, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
+    assertEquals("Response code for create child1 should be OK", Code.OK, Code.get(createResponseChild1.rc))
+    val createResponseChild2 = zookeeperClient.handle(CreateRequest(child2Path, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
+    assertEquals("Response code for create child2 should be OK", Code.OK, Code.get(createResponseChild2.rc))
+
+    val getChildrenResponse = zookeeperClient.handle(GetChildrenRequest(mockPath, null)).asInstanceOf[GetChildrenResponse]
+    assertEquals("Response code for getChildren should be OK", Code.OK, Code.get(getChildrenResponse.rc))
+    assertEquals("getChildren should return two children", Seq(child1, child2), getChildrenResponse.children.sorted)
+  }
+
+  @Test
+  def testPipelinedGetData(): Unit = {
+    import scala.collection.JavaConverters._
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val createRequests = (1 to 3).map(x => CreateRequest("/" + x, (x * 2).toString.getBytes,
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
+    val createResponses = createRequests.map(zookeeperClient.handle)
+    createResponses.foreach(createResponse => assertEquals("Response code for create should
be OK", Code.OK, Code.get(createResponse.rc)))
+    val getDataRequests = (1 to 3).map(x => GetDataRequest("/" + x, null))
+    val getDataResponses = zookeeperClient.handle(getDataRequests)
+    getDataResponses.foreach(getDataResponse => assertEquals("Response code for getData
should be OK", Code.OK, Code.get(getDataResponse.rc)))
+    getDataResponses.zipWithIndex.foreach { case (getDataResponse, i) =>
+      assertEquals("Response code for getData should be OK", Code.OK, Code.get(getDataResponse.rc))
+      assertEquals("Data for getData should match", ((i + 1) * 2), Integer.valueOf(new String(getDataResponse.asInstanceOf[GetDataResponse].data)))
+    }
+  }
+
+  @Test
+  def testMixedPipeline(): Unit = {
+    import scala.collection.JavaConverters._
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
+    assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc))
+    val getDataRequest = GetDataRequest(mockPath, null)
+    val setDataRequest = SetDataRequest("/nonexistent", Array.empty[Byte], -1, null)
+    val responses = zookeeperClient.handle(Seq(getDataRequest, setDataRequest))
+    assertEquals("Response code for getData should be OK", Code.OK, Code.get(responses.head.rc))
+    assertArrayEquals("Data for getData should be empty", Array.empty[Byte], responses.head.asInstanceOf[GetDataResponse].data)
+    assertEquals("Response code for setData should be NONODE", Code.NONODE, Code.get(responses.last.rc))
+  }
+
+  @Test
+  def testZNodeChangeHandlerForCreation(): Unit = {
+    import scala.collection.JavaConverters._
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
+    val zNodeChangeHandler = new ZNodeChangeHandler {
+      override def handleCreation = {
+        znodeChangeHandlerCountDownLatch.countDown()
+      }
+      override def handleDeletion = {}
+      override def handleDataChange = {}
+      override val path: String = mockPath
+    }
+
+    zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
+    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
+    assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc))
+    assertTrue("Failed to receive create notification", znodeChangeHandlerCountDownLatch.await(5,
TimeUnit.SECONDS))
+  }
+
+  @Test
+  def testZNodeChangeHandlerForDeletion(): Unit = {
+    import scala.collection.JavaConverters._
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
+    val zNodeChangeHandler = new ZNodeChangeHandler {
+      override def handleCreation = {}
+      override def handleDeletion = {
+        znodeChangeHandlerCountDownLatch.countDown()
+      }
+      override def handleDataChange = {}
+      override val path: String = mockPath
+    }
+
+    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
+    assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc))
+    zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
+    val deleteResponse = zookeeperClient.handle(DeleteRequest(mockPath, -1, null)).asInstanceOf[DeleteResponse]
+    assertEquals("Response code for delete should be OK", Code.OK, Code.get(deleteResponse.rc))
+    assertTrue("Failed to receive delete notification", znodeChangeHandlerCountDownLatch.await(5,
TimeUnit.SECONDS))
+  }
+
+  @Test
+  def testZNodeChangeHandlerForDataChange(): Unit = {
+    import scala.collection.JavaConverters._
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
+    val zNodeChangeHandler = new ZNodeChangeHandler {
+      override def handleCreation = {}
+      override def handleDeletion = {}
+      override def handleDataChange = {
+        znodeChangeHandlerCountDownLatch.countDown()
+      }
+      override val path: String = mockPath
+    }
+
+    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
+    assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc))
+    zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
+    val setDataResponse = zookeeperClient.handle(SetDataRequest(mockPath, Array.empty[Byte],
-1, null)).asInstanceOf[SetDataResponse]
+    assertEquals("Response code for setData should be OK", Code.OK, Code.get(setDataResponse.rc))
+    assertTrue("Failed to receive data change notification", znodeChangeHandlerCountDownLatch.await(5,
TimeUnit.SECONDS))
+  }
+
+  @Test
+  def testZNodeChildChangeHandlerForChildChange(): Unit = {
+    import scala.collection.JavaConverters._
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val zNodeChildChangeHandlerCountDownLatch = new CountDownLatch(1)
+    val zNodeChildChangeHandler = new ZNodeChildChangeHandler {
+      override def handleChildChange = {
+        zNodeChildChangeHandlerCountDownLatch.countDown()
+      }
+      override val path: String = mockPath
+    }
+
+    val child1 = "child1"
+    val child1Path = mockPath + "/" + child1
+    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
+    assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc))
+    zookeeperClient.registerZNodeChildChangeHandler(zNodeChildChangeHandler)
+    val createResponseChild1 = zookeeperClient.handle(CreateRequest(child1Path, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
+    assertEquals("Response code for create child1 should be OK", Code.OK, Code.get(createResponseChild1.rc))
+    assertTrue("Failed to receive child change notification", zNodeChildChangeHandlerCountDownLatch.await(5,
TimeUnit.SECONDS))
+  }
+
+  @Test
+  def testStateChangeHandlerForAuthFailure(): Unit = {
+    System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, "no-such-file-exists.conf")
+    val stateChangeHandlerCountDownLatch = new CountDownLatch(1)
+    val stateChangeHandler = new StateChangeHandler {
+      override def beforeInitializingSession = {}
+      override def afterInitializingSession = {}
+      override def onAuthFailure = {
+        stateChangeHandlerCountDownLatch.countDown()
+      }
+      override def onConnectionTimeout = {}
+    }
+    new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, stateChangeHandler)
+    assertTrue("Failed to receive auth failed notification", stateChangeHandlerCountDownLatch.await(5,
TimeUnit.SECONDS))
+  }
+
+  private def bytes = UUID.randomUUID().toString.getBytes(StandardCharsets.UTF_8)
+}


Mime
View raw message