kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5501; introduce async ZookeeperClient [Forced Update!]
Date Wed, 26 Jul 2017 07:03:45 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 97fc2ca49 -> e5e88f636 (forced update)


KAFKA-5501; introduce async ZookeeperClient

Author: Onur Karaman <okaraman@linkedin.com>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #3427 from onurkaraman/KAFKA-5501


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e5e88f63
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e5e88f63
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e5e88f63

Branch: refs/heads/trunk
Commit: e5e88f636fa9857105324f62cc758b1fed3602bb
Parents: 47ee8e9
Author: Onur Karaman <okaraman@linkedin.com>
Authored: Wed Jul 26 08:36:05 2017 +0200
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed Jul 26 08:03:10 2017 +0100

----------------------------------------------------------------------
 .../kafka/controller/ZookeeperClient.scala      | 286 ++++++++++++++++
 .../kafka/controller/ZookeeperClientTest.scala  | 329 +++++++++++++++++++
 2 files changed, 615 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e5e88f63/core/src/main/scala/kafka/controller/ZookeeperClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ZookeeperClient.scala b/core/src/main/scala/kafka/controller/ZookeeperClient.scala
new file mode 100644
index 0000000..7ffa511
--- /dev/null
+++ b/core/src/main/scala/kafka/controller/ZookeeperClient.scala
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.controller
+
+import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock}
+import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap, CountDownLatch, TimeUnit}
+
+import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock}
+import kafka.utils.Logging
+import org.apache.zookeeper.AsyncCallback.{ACLCallback, Children2Callback, DataCallback,
StatCallback, StringCallback, VoidCallback}
+import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState}
+import org.apache.zookeeper.ZooKeeper.States
+import org.apache.zookeeper.data.{ACL, Stat}
+import org.apache.zookeeper.{CreateMode, WatchedEvent, Watcher, ZooKeeper}
+
+/**
+  * ZookeeperClient is a zookeeper client that encourages pipelined requests to zookeeper.
+  *
+  * @param connectString comma separated host:port pairs, each corresponding to a zk server
+  * @param sessionTimeoutMs session timeout in milliseconds
+  * @param connectionTimeoutMs connection timeout in milliseconds
+  * @param stateChangeHandler state change handler callbacks called by the underlying zookeeper
client's EventThread.
+  */
+class ZookeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTimeoutMs:
Int, stateChangeHandler: StateChangeHandler) extends Logging {
+  this.logIdent = "[ZookeeperClient]: "
+  private val initializationLock = new ReentrantReadWriteLock()
+  private val isConnectedOrExpiredLock = new ReentrantLock()
+  private val isConnectedOrExpiredCondition = isConnectedOrExpiredLock.newCondition()
+  private val zNodeChangeHandlers = new ConcurrentHashMap[String, ZNodeChangeHandler]()
+  private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, ZNodeChildChangeHandler]()
+
+  info(s"Initializing a new session to $connectString.")
+  @volatile private var zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZookeeperClientWatcher)
+  waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS)
+
+  /**
+    * Take an AsyncRequest and wait for its AsyncResponse.
+    * @param request a single AsyncRequest to wait on.
+    * @return the request's AsyncReponse.
+    */
+  def handle(request: AsyncRequest): AsyncResponse = {
+    handle(Seq(request)).head
+  }
+
+  /**
+    * Pipeline a sequence of AsyncRequests and wait for all of their AsyncResponses.
+    * @param requests a sequence of AsyncRequests to wait on.
+    * @return the AsyncResponses.
+    */
+  def handle(requests: Seq[AsyncRequest]): Seq[AsyncResponse] = inReadLock(initializationLock)
{
+    import scala.collection.JavaConverters._
+    val countDownLatch = new CountDownLatch(requests.size)
+    val responseQueue = new ArrayBlockingQueue[AsyncResponse](requests.size)
+    requests.foreach {
+      case CreateRequest(path, data, acl, createMode, ctx) => zooKeeper.create(path, data,
acl.asJava, createMode, new StringCallback {
+        override def processResult(rc: Int, path: String, ctx: Any, name: String) = {
+          responseQueue.add(CreateResponse(rc, path, ctx, name))
+          countDownLatch.countDown()
+        }}, ctx)
+      case DeleteRequest(path, version, ctx) => zooKeeper.delete(path, version, new VoidCallback
{
+        override def processResult(rc: Int, path: String, ctx: Any) = {
+          responseQueue.add(DeleteResponse(rc, path, ctx))
+          countDownLatch.countDown()
+        }}, ctx)
+      case ExistsRequest(path, ctx) => zooKeeper.exists(path, zNodeChangeHandlers.containsKey(path),
new StatCallback {
+        override def processResult(rc: Int, path: String, ctx: Any, stat: Stat) = {
+          responseQueue.add(ExistsResponse(rc, path, ctx, stat))
+          countDownLatch.countDown()
+        }}, ctx)
+      case GetDataRequest(path, ctx) => zooKeeper.getData(path, zNodeChangeHandlers.containsKey(path),
new DataCallback {
+        override def processResult(rc: Int, path: String, ctx: Any, data: Array[Byte], stat:
Stat) = {
+          responseQueue.add(GetDataResponse(rc, path, ctx, data, stat))
+          countDownLatch.countDown()
+        }}, ctx)
+      case SetDataRequest(path, data, version, ctx) => zooKeeper.setData(path, data, version,
new StatCallback {
+        override def processResult(rc: Int, path: String, ctx: Any, stat: Stat) = {
+          responseQueue.add(SetDataResponse(rc, path, ctx, stat))
+          countDownLatch.countDown()
+        }}, ctx)
+      case GetACLRequest(path, ctx) => zooKeeper.getACL(path, null, new ACLCallback {
+        override def processResult(rc: Int, path: String, ctx: Any, acl: java.util.List[ACL],
stat: Stat): Unit = {
+          responseQueue.add(GetACLResponse(rc, path, ctx, Option(acl).map(_.asScala).orNull,
stat))
+          countDownLatch.countDown()
+        }}, ctx)
+      case SetACLRequest(path, acl, version, ctx) => zooKeeper.setACL(path, acl.asJava,
version, new StatCallback {
+        override def processResult(rc: Int, path: String, ctx: Any, stat: Stat) = {
+          responseQueue.add(SetACLResponse(rc, path, ctx, stat))
+          countDownLatch.countDown()
+        }}, ctx)
+      case GetChildrenRequest(path, ctx) => zooKeeper.getChildren(path, zNodeChildChangeHandlers.containsKey(path),
new Children2Callback {
+        override def processResult(rc: Int, path: String, ctx: Any, children: java.util.List[String],
stat: Stat) = {
+          responseQueue.add(GetChildrenResponse(rc, path, ctx, Option(children).map(_.asScala).orNull,
stat))
+          countDownLatch.countDown()
+        }}, ctx)
+    }
+    countDownLatch.await()
+    responseQueue.asScala.toSeq
+  }
+
+  /**
+    * Wait indefinitely until the underlying zookeeper client to reaches the CONNECTED state.
+    * @throws ZookeeperClientAuthFailedException if the authentication failed either before
or while waiting for connection.
+    * @throws ZookeeperClientExpiredException if the session expired either before or while
waiting for connection.
+    */
+  def waitUntilConnected(): Unit = inLock(isConnectedOrExpiredLock) {
+    waitUntilConnected(Long.MaxValue, TimeUnit.MILLISECONDS)
+  }
+
+  private def waitUntilConnected(timeout: Long, timeUnit: TimeUnit): Unit = {
+    info("Waiting until connected.")
+    var nanos = timeUnit.toNanos(timeout)
+    inLock(isConnectedOrExpiredLock) {
+      var state = zooKeeper.getState
+      while (!state.isConnected && state.isAlive) {
+        if (nanos <= 0) {
+          throw new ZookeeperClientTimeoutException(s"Timed out waiting for connection while
in state: $state")
+        }
+        nanos = isConnectedOrExpiredCondition.awaitNanos(nanos)
+        state = zooKeeper.getState
+      }
+      if (state == States.AUTH_FAILED) {
+        throw new ZookeeperClientAuthFailedException("Auth failed either before or while
waiting for connection")
+      } else if (state == States.CLOSED) {
+        throw new ZookeeperClientExpiredException("Session expired either before or while
waiting for connection")
+      }
+    }
+    info("Connected.")
+  }
+
+  def registerZNodeChangeHandler(zNodeChangeHandler: ZNodeChangeHandler): ExistsResponse
= {
+    registerZNodeChangeHandlers(Seq(zNodeChangeHandler)).head
+  }
+
+  def registerZNodeChangeHandlers(handlers: Seq[ZNodeChangeHandler]): Seq[ExistsResponse]
= {
+    handlers.foreach(handler => zNodeChangeHandlers.put(handler.path, handler))
+    val asyncRequests = handlers.map(handler => ExistsRequest(handler.path, null))
+    handle(asyncRequests).asInstanceOf[Seq[ExistsResponse]]
+  }
+
+  def unregisterZNodeChangeHandler(path: String): Unit = {
+    zNodeChangeHandlers.remove(path)
+  }
+
+  def registerZNodeChildChangeHandler(zNodeChildChangeHandler: ZNodeChildChangeHandler):
GetChildrenResponse = {
+    registerZNodeChildChangeHandlers(Seq(zNodeChildChangeHandler)).head
+  }
+
+  def registerZNodeChildChangeHandlers(handlers: Seq[ZNodeChildChangeHandler]): Seq[GetChildrenResponse]
= {
+    handlers.foreach(handler => zNodeChildChangeHandlers.put(handler.path, handler))
+    val asyncRequests = handlers.map(handler => GetChildrenRequest(handler.path, null))
+    handle(asyncRequests).asInstanceOf[Seq[GetChildrenResponse]]
+  }
+
+  def unregisterZNodeChildChangeHandler(path: String): Unit = {
+    zNodeChildChangeHandlers.remove(path)
+  }
+
+  def close(): Unit = inWriteLock(initializationLock) {
+    info("Closing.")
+    zNodeChangeHandlers.clear()
+    zNodeChildChangeHandlers.clear()
+    zooKeeper.close()
+    info("Closed.")
+  }
+
+  private def initialize(): Unit = {
+    if (!zooKeeper.getState.isAlive) {
+      info(s"Initializing a new session to $connectString.")
+      var now = System.currentTimeMillis()
+      val threshold = now + connectionTimeoutMs
+      while (now < threshold) {
+        try {
+          zooKeeper.close()
+          zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZookeeperClientWatcher)
+          waitUntilConnected(threshold - now, TimeUnit.MILLISECONDS)
+          return
+        } catch {
+          case _: Exception =>
+            now = System.currentTimeMillis()
+            if (now < threshold) {
+              Thread.sleep(1000)
+              now = System.currentTimeMillis()
+            }
+        }
+      }
+      info(s"Timed out waiting for connection during session initialization while in state:
${zooKeeper.getState}")
+      stateChangeHandler.onConnectionTimeout
+    }
+  }
+
+  private object ZookeeperClientWatcher extends Watcher {
+    override def process(event: WatchedEvent): Unit = {
+      debug("Received event: " + event)
+      if (event.getPath == null) {
+        inLock(isConnectedOrExpiredLock) {
+          isConnectedOrExpiredCondition.signalAll()
+        }
+        if (event.getState == KeeperState.AuthFailed) {
+          info("Auth failed.")
+          stateChangeHandler.onAuthFailure
+        } else if (event.getState == KeeperState.Expired) {
+          inWriteLock(initializationLock) {
+            info("Session expired.")
+            stateChangeHandler.beforeInitializingSession
+            initialize()
+            stateChangeHandler.afterInitializingSession
+          }
+        }
+      } else if (event.getType == EventType.NodeCreated) {
+        Option(zNodeChangeHandlers.get(event.getPath)).foreach(_.handleCreation)
+      } else if (event.getType == EventType.NodeDeleted) {
+        Option(zNodeChangeHandlers.get(event.getPath)).foreach(_.handleDeletion)
+      } else if (event.getType == EventType.NodeDataChanged) {
+        Option(zNodeChangeHandlers.get(event.getPath)).foreach(_.handleDataChange)
+      } else if (event.getType == EventType.NodeChildrenChanged) {
+        Option(zNodeChildChangeHandlers.get(event.getPath)).foreach(_.handleChildChange)
+      }
+    }
+  }
+}
+
+trait StateChangeHandler {
+  def beforeInitializingSession: Unit
+  def afterInitializingSession: Unit
+  def onAuthFailure: Unit
+  def onConnectionTimeout: Unit
+}
+
+trait ZNodeChangeHandler {
+  val path: String
+  def handleCreation: Unit
+  def handleDeletion: Unit
+  def handleDataChange: Unit
+}
+
+trait ZNodeChildChangeHandler {
+  val path: String
+  def handleChildChange: Unit
+}
+
+sealed trait AsyncRequest {
+  val path: String
+  val ctx: Any
+}
+case class CreateRequest(path: String, data: Array[Byte], acl: Seq[ACL], createMode: CreateMode,
ctx: Any) extends AsyncRequest
+case class DeleteRequest(path: String, version: Int, ctx: Any) extends AsyncRequest
+case class ExistsRequest(path: String, ctx: Any) extends AsyncRequest
+case class GetDataRequest(path: String, ctx: Any) extends AsyncRequest
+case class SetDataRequest(path: String, data: Array[Byte], version: Int, ctx: Any) extends
AsyncRequest
+case class GetACLRequest(path: String, ctx: Any) extends AsyncRequest
+case class SetACLRequest(path: String, acl: Seq[ACL], version: Int, ctx: Any) extends AsyncRequest
+case class GetChildrenRequest(path: String, ctx: Any) extends AsyncRequest
+
+sealed trait AsyncResponse {
+  val rc: Int
+  val path: String
+  val ctx: Any
+}
+case class CreateResponse(rc: Int, path: String, ctx: Any, name: String) extends AsyncResponse
+case class DeleteResponse(rc: Int, path: String, ctx: Any) extends AsyncResponse
+case class ExistsResponse(rc: Int, path: String, ctx: Any, stat: Stat) extends AsyncResponse
+case class GetDataResponse(rc: Int, path: String, ctx: Any, data: Array[Byte], stat: Stat)
extends AsyncResponse
+case class SetDataResponse(rc: Int, path: String, ctx: Any, stat: Stat) extends AsyncResponse
+case class GetACLResponse(rc: Int, path: String, ctx: Any, acl: Seq[ACL], stat: Stat) extends
AsyncResponse
+case class SetACLResponse(rc: Int, path: String, ctx: Any, stat: Stat) extends AsyncResponse
+case class GetChildrenResponse(rc: Int, path: String, ctx: Any, children: Seq[String], stat:
Stat) extends AsyncResponse
+
+class ZookeeperClientException(message: String) extends RuntimeException(message)
+class ZookeeperClientExpiredException(message: String) extends ZookeeperClientException(message)
+class ZookeeperClientAuthFailedException(message: String) extends ZookeeperClientException(message)
+class ZookeeperClientTimeoutException(message: String) extends ZookeeperClientException(message)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5e88f63/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala b/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala
new file mode 100644
index 0000000..70a2409
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.controller
+
+import java.net.UnknownHostException
+import java.nio.charset.StandardCharsets
+import java.util.UUID
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+import javax.security.auth.login.Configuration
+
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.security.JaasUtils
+import org.apache.zookeeper.KeeperException.Code
+import org.apache.zookeeper.{CreateMode, ZooDefs}
+import org.junit.Assert.{assertArrayEquals, assertEquals, assertTrue}
+import org.junit.{After, Test}
+
+class ZookeeperClientTest extends ZooKeeperTestHarness {
+  private val mockPath = "/foo"
+
+  @After
+  override def tearDown() {
+    super.tearDown()
+    System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
+    Configuration.setConfiguration(null)
+  }
+
+  @Test(expected = classOf[UnknownHostException])
+  def testUnresolvableConnectString(): Unit = {
+    new ZookeeperClient("-1", -1, -1, null)
+  }
+
+  @Test(expected = classOf[ZookeeperClientTimeoutException])
+  def testConnectionTimeout(): Unit = {
+    zookeeper.shutdown()
+    new ZookeeperClient(zkConnect, zkSessionTimeout, connectionTimeoutMs = 100, null)
+  }
+
+  @Test
+  def testConnection(): Unit = {
+    new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+  }
+
+  @Test
+  def testDeleteNonExistentZNode(): Unit = {
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val deleteResponse = zookeeperClient.handle(DeleteRequest(mockPath, -1, null)).asInstanceOf[DeleteResponse]
+    assertEquals("Response code should be NONODE", Code.NONODE, Code.get(deleteResponse.rc))
+  }
+
+  @Test
+  def testDeleteExistingZNode(): Unit = {
+    import scala.collection.JavaConverters._
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
+    assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc))
+    val deleteResponse = zookeeperClient.handle(DeleteRequest(mockPath, -1, null)).asInstanceOf[DeleteResponse]
+    assertEquals("Response code for delete should be OK", Code.OK, Code.get(deleteResponse.rc))
+  }
+
+  @Test
+  def testExistsNonExistentZNode(): Unit = {
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val existsResponse = zookeeperClient.handle(ExistsRequest(mockPath, null)).asInstanceOf[ExistsResponse]
+    assertEquals("Response code should be NONODE", Code.NONODE, Code.get(existsResponse.rc))
+  }
+
+  @Test
+  def testExistsExistingZNode(): Unit = {
+    import scala.collection.JavaConverters._
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
+    assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc))
+    val existsResponse = zookeeperClient.handle(ExistsRequest(mockPath, null)).asInstanceOf[ExistsResponse]
+    assertEquals("Response code for exists should be OK", Code.OK, Code.get(existsResponse.rc))
+  }
+
+  @Test
+  def testGetDataNonExistentZNode(): Unit = {
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val getDataResponse = zookeeperClient.handle(GetDataRequest(mockPath, null)).asInstanceOf[GetDataResponse]
+    assertEquals("Response code should be NONODE", Code.NONODE, Code.get(getDataResponse.rc))
+  }
+
+  @Test
+  def testGetDataExistingZNode(): Unit = {
+    import scala.collection.JavaConverters._
+    val data = bytes
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, data, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala,
CreateMode.PERSISTENT, null))
+    assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc))
+    val getDataResponse = zookeeperClient.handle(GetDataRequest(mockPath, null)).asInstanceOf[GetDataResponse]
+    assertEquals("Response code for getData should be OK", Code.OK, Code.get(getDataResponse.rc))
+    assertArrayEquals("Data for getData should match created znode data", data, getDataResponse.data)
+  }
+
+  @Test
+  def testSetDataNonExistentZNode(): Unit = {
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val setDataResponse = zookeeperClient.handle(SetDataRequest(mockPath, Array.empty[Byte],
-1, null)).asInstanceOf[SetDataResponse]
+    assertEquals("Response code should be NONODE", Code.NONODE, Code.get(setDataResponse.rc))
+  }
+
+  @Test
+  def testSetDataExistingZNode(): Unit = {
+    import scala.collection.JavaConverters._
+    val data = bytes
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
+    assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc))
+    val setDataResponse = zookeeperClient.handle(SetDataRequest(mockPath, data, -1, null)).asInstanceOf[SetDataResponse]
+    assertEquals("Response code for setData should be OK", Code.OK, Code.get(setDataResponse.rc))
+    val getDataResponse = zookeeperClient.handle(GetDataRequest(mockPath, null)).asInstanceOf[GetDataResponse]
+    assertEquals("Response code for getData should be OK", Code.OK, Code.get(getDataResponse.rc))
+    assertArrayEquals("Data for getData should match setData's data", data, getDataResponse.data)
+  }
+
+  @Test
+  def testGetACLNonExistentZNode(): Unit = {
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val getACLResponse = zookeeperClient.handle(GetACLRequest(mockPath, null)).asInstanceOf[GetACLResponse]
+    assertEquals("Response code should be NONODE", Code.NONODE, Code.get(getACLResponse.rc))
+  }
+
+  @Test
+  def testGetACLExistingZNode(): Unit = {
+    import scala.collection.JavaConverters._
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
+    assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc))
+    val getACLResponse = zookeeperClient.handle(GetACLRequest(mockPath, null)).asInstanceOf[GetACLResponse]
+    assertEquals("Response code for getACL should be OK", Code.OK, Code.get(getACLResponse.rc))
+    assertEquals("ACL should be " + ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala,
getACLResponse.acl)
+  }
+
+  @Test
+  def testSetACLNonExistentZNode(): Unit = {
+    import scala.collection.JavaConverters._
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val setACLResponse = zookeeperClient.handle(SetACLRequest(mockPath, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala,
-1, null)).asInstanceOf[SetACLResponse]
+    assertEquals("Response code should be NONODE", Code.NONODE, Code.get(setACLResponse.rc))
+  }
+
+  @Test
+  def testGetChildrenNonExistentZNode(): Unit = {
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val getChildrenResponse = zookeeperClient.handle(GetChildrenRequest(mockPath, null)).asInstanceOf[GetChildrenResponse]
+    assertEquals("Response code should be NONODE", Code.NONODE, Code.get(getChildrenResponse.rc))
+  }
+
+  @Test
+  def testGetChildrenExistingZNode(): Unit = {
+    import scala.collection.JavaConverters._
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
+    assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc))
+    val getChildrenResponse = zookeeperClient.handle(GetChildrenRequest(mockPath, null)).asInstanceOf[GetChildrenResponse]
+    assertEquals("Response code for getChildren should be OK", Code.OK, Code.get(getChildrenResponse.rc))
+    assertEquals("getChildren should return no children", Seq.empty[String], getChildrenResponse.children)
+  }
+
+  @Test
+  def testGetChildrenExistingZNodeWithChildren(): Unit = {
+    import scala.collection.JavaConverters._
+    val child1 = "child1"
+    val child2 = "child2"
+    val child1Path = mockPath + "/" + child1
+    val child2Path = mockPath + "/" + child2
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
+    assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc))
+    val createResponseChild1 = zookeeperClient.handle(CreateRequest(child1Path, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
+    assertEquals("Response code for create child1 should be OK", Code.OK, Code.get(createResponseChild1.rc))
+    val createResponseChild2 = zookeeperClient.handle(CreateRequest(child2Path, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
+    assertEquals("Response code for create child2 should be OK", Code.OK, Code.get(createResponseChild2.rc))
+
+    val getChildrenResponse = zookeeperClient.handle(GetChildrenRequest(mockPath, null)).asInstanceOf[GetChildrenResponse]
+    assertEquals("Response code for getChildren should be OK", Code.OK, Code.get(getChildrenResponse.rc))
+    assertEquals("getChildren should return two children", Seq(child1, child2), getChildrenResponse.children.sorted)
+  }
+
+  @Test
+  def testPipelinedGetData(): Unit = {
+    import scala.collection.JavaConverters._
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val createRequests = (1 to 3).map(x => CreateRequest("/" + x, (x * 2).toString.getBytes,
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
+    val createResponses = createRequests.map(zookeeperClient.handle)
+    createResponses.foreach(createResponse => assertEquals("Response code for create should
be OK", Code.OK, Code.get(createResponse.rc)))
+    val getDataRequests = (1 to 3).map(x => GetDataRequest("/" + x, null))
+    val getDataResponses = zookeeperClient.handle(getDataRequests)
+    getDataResponses.foreach(getDataResponse => assertEquals("Response code for getData
should be OK", Code.OK, Code.get(getDataResponse.rc)))
+    getDataResponses.zipWithIndex.foreach { case (getDataResponse, i) =>
+      assertEquals("Response code for getData should be OK", Code.OK, Code.get(getDataResponse.rc))
+      assertEquals("Data for getData should match", ((i + 1) * 2), Integer.valueOf(new String(getDataResponse.asInstanceOf[GetDataResponse].data)))
+    }
+  }
+
+  @Test
+  def testMixedPipeline(): Unit = {
+    import scala.collection.JavaConverters._
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
+    assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc))
+    val getDataRequest = GetDataRequest(mockPath, null)
+    val setDataRequest = SetDataRequest("/nonexistent", Array.empty[Byte], -1, null)
+    val responses = zookeeperClient.handle(Seq(getDataRequest, setDataRequest))
+    assertEquals("Response code for getData should be OK", Code.OK, Code.get(responses.head.rc))
+    assertArrayEquals("Data for getData should be empty", Array.empty[Byte], responses.head.asInstanceOf[GetDataResponse].data)
+    assertEquals("Response code for setData should be NONODE", Code.NONODE, Code.get(responses.last.rc))
+  }
+
+  @Test
+  def testZNodeChangeHandlerForCreation(): Unit = {
+    import scala.collection.JavaConverters._
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
+    val zNodeChangeHandler = new ZNodeChangeHandler {
+      override def handleCreation = {
+        znodeChangeHandlerCountDownLatch.countDown()
+      }
+      override def handleDeletion = {}
+      override def handleDataChange = {}
+      override val path: String = mockPath
+    }
+
+    zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
+    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
+    assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc))
+    assertTrue("Failed to receive create notification", znodeChangeHandlerCountDownLatch.await(5,
TimeUnit.SECONDS))
+  }
+
+  @Test
+  def testZNodeChangeHandlerForDeletion(): Unit = {
+    import scala.collection.JavaConverters._
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
+    val zNodeChangeHandler = new ZNodeChangeHandler {
+      override def handleCreation = {}
+      override def handleDeletion = {
+        znodeChangeHandlerCountDownLatch.countDown()
+      }
+      override def handleDataChange = {}
+      override val path: String = mockPath
+    }
+
+    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
+    assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc))
+    zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
+    val deleteResponse = zookeeperClient.handle(DeleteRequest(mockPath, -1, null)).asInstanceOf[DeleteResponse]
+    assertEquals("Response code for delete should be OK", Code.OK, Code.get(deleteResponse.rc))
+    assertTrue("Failed to receive delete notification", znodeChangeHandlerCountDownLatch.await(5,
TimeUnit.SECONDS))
+  }
+
+  @Test
+  def testZNodeChangeHandlerForDataChange(): Unit = {
+    import scala.collection.JavaConverters._
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
+    val zNodeChangeHandler = new ZNodeChangeHandler {
+      override def handleCreation = {}
+      override def handleDeletion = {}
+      override def handleDataChange = {
+        znodeChangeHandlerCountDownLatch.countDown()
+      }
+      override val path: String = mockPath
+    }
+
+    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
+    assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc))
+    zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
+    val setDataResponse = zookeeperClient.handle(SetDataRequest(mockPath, Array.empty[Byte],
-1, null)).asInstanceOf[SetDataResponse]
+    assertEquals("Response code for setData should be OK", Code.OK, Code.get(setDataResponse.rc))
+    assertTrue("Failed to receive data change notification", znodeChangeHandlerCountDownLatch.await(5,
TimeUnit.SECONDS))
+  }
+
+  @Test
+  def testZNodeChildChangeHandlerForChildChange(): Unit = {
+    import scala.collection.JavaConverters._
+    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    val zNodeChildChangeHandlerCountDownLatch = new CountDownLatch(1)
+    val zNodeChildChangeHandler = new ZNodeChildChangeHandler {
+      override def handleChildChange = {
+        zNodeChildChangeHandlerCountDownLatch.countDown()
+      }
+      override val path: String = mockPath
+    }
+
+    val child1 = "child1"
+    val child1Path = mockPath + "/" + child1
+    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
+    assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc))
+    zookeeperClient.registerZNodeChildChangeHandler(zNodeChildChangeHandler)
+    val createResponseChild1 = zookeeperClient.handle(CreateRequest(child1Path, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
+    assertEquals("Response code for create child1 should be OK", Code.OK, Code.get(createResponseChild1.rc))
+    assertTrue("Failed to receive child change notification", zNodeChildChangeHandlerCountDownLatch.await(5,
TimeUnit.SECONDS))
+  }
+
+  @Test
+  def testStateChangeHandlerForAuthFailure(): Unit = {
+    System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, "no-such-file-exists.conf")
+    val stateChangeHandlerCountDownLatch = new CountDownLatch(1)
+    val stateChangeHandler = new StateChangeHandler {
+      override def beforeInitializingSession = {}
+      override def afterInitializingSession = {}
+      override def onAuthFailure = {
+        stateChangeHandlerCountDownLatch.countDown()
+      }
+      override def onConnectionTimeout = {}
+    }
+    new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, stateChangeHandler)
+    assertTrue("Failed to receive auth failed notification", stateChangeHandlerCountDownLatch.await(5,
TimeUnit.SECONDS))
+  }
+
+  private def bytes = UUID.randomUUID().toString.getBytes(StandardCharsets.UTF_8)
+}


Mime
View raw message