kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [kafka] 03/09: Additional tests to improve test coverage of KafkaZkClient.
Date Fri, 02 Mar 2018 01:48:35 GMT
This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit d0eb552a891ecd99150ddf4b89985525806c6e31
Author: Sandor Murakozi <smurakozi@gmail.com>
AuthorDate: Mon Feb 19 16:52:46 2018 +0100

    Additional tests to improve test coverage of KafkaZkClient.
---
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala    | 558 ++++++++++++++++++++-
 .../scala/unit/kafka/zk/ZooKeeperTestHarness.scala |  18 +-
 2 files changed, 547 insertions(+), 29 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index d3726c2..9329430 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -16,10 +16,11 @@
 */
 package kafka.zk
 
-import java.util.{Properties, UUID}
+import java.util.{Collections, Properties, UUID}
 import java.nio.charset.StandardCharsets.UTF_8
+import java.util.concurrent.{CountDownLatch, TimeUnit}
 
-import kafka.api.ApiVersion
+import kafka.api.{ApiVersion, LeaderAndIsr}
 import kafka.cluster.{Broker, EndPoint}
 import kafka.log.LogConfig
 import kafka.security.auth._
@@ -30,16 +31,30 @@ import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.security.token.delegation.TokenInformation
 import org.apache.kafka.common.utils.SecurityUtils
-import org.apache.zookeeper.KeeperException.NodeExistsException
+import org.apache.zookeeper.KeeperException.{Code, NoNodeException, NodeExistsException}
 import org.junit.Assert._
 import org.junit.Test
-
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.{Seq, mutable}
 import scala.util.Random
 
+import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
+import kafka.zookeeper._
+import org.apache.zookeeper.data.Stat
+
 class KafkaZkClientTest extends ZooKeeperTestHarness {
 
   private val group = "my-group"
+  private val topic1 = "topic1"
+  private val topic2 = "topic2"
+
+  val topicPartition10 = new TopicPartition(topic1, 0)
+  val topicPartition11 = new TopicPartition(topic1, 1)
+  val topicPartition20 = new TopicPartition(topic2, 0)
+  val topicPartitions10_11 = Seq(topicPartition10, topicPartition11)
+
   private val topicPartition = new TopicPartition("topic", 0)
 
   @Test
@@ -90,17 +105,18 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
   @Test
   def testTopicAssignmentMethods() {
-    val topic1 = "topic1"
-    val topic2 = "topic2"
+    assertTrue(zkClient.getAllTopicsInCluster.isEmpty)
 
     // test with non-existing topic
+    assertFalse(zkClient.topicExists(topic1))
     assertTrue(zkClient.getTopicPartitionCount(topic1).isEmpty)
     assertTrue(zkClient.getPartitionAssignmentForTopics(Set(topic1)).isEmpty)
     assertTrue(zkClient.getPartitionsForTopics(Set(topic1)).isEmpty)
     assertTrue(zkClient.getReplicasForPartition(new TopicPartition(topic1, 2)).isEmpty)
 
+    val topicPartition = new TopicPartition(topic1, 0)
     val assignment = Map(
-      new TopicPartition(topic1, 0) -> Seq(0, 1),
+      topicPartition -> Seq(0, 1),
       new TopicPartition(topic1, 1) -> Seq(0, 1),
       new TopicPartition(topic1, 2) -> Seq(1, 2, 3)
     )
@@ -108,6 +124,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     // create a topic assignment
     zkClient.createTopicAssignment(topic1, assignment)
 
+    assertTrue(zkClient.topicExists(topic1))
+
     val expectedAssignment = assignment map { topicAssignment =>
       val partition = topicAssignment._1.partition
       val assignment = topicAssignment._2
@@ -215,6 +233,43 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
+  def testIsrChangeNotificationGetters(): Unit = {
+    assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getAllIsrChangeNotifications)
+    assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getPartitionsFromIsrChangeNotifications(Seq("0000000000")))
+
+    zkClient.createRecursive("/isr_change_notification")
+
+    zkClient.propagateIsrChanges(Set(topicPartition10, topicPartition11))
+    zkClient.propagateIsrChanges(Set(topicPartition10))
+
+    assertEquals(Set("0000000000", "0000000001"), zkClient.getAllIsrChangeNotifications.toSet)
+
+    // A partition can have multiple notifications
+    assertEquals(Seq(topicPartition10, topicPartition11, topicPartition10),
+      zkClient.getPartitionsFromIsrChangeNotifications(Seq("0000000000", "0000000001")))
+  }
+
+  @Test
+  def testIsrChangeNotificationsDeletion(): Unit = {
+    // Should not fail even if parent node does not exist
+    zkClient.deleteIsrChangeNotifications(Seq("0000000000"))
+
+    zkClient.createRecursive("/isr_change_notification")
+
+    zkClient.propagateIsrChanges(Set(topicPartition10, topicPartition11))
+    zkClient.propagateIsrChanges(Set(topicPartition10))
+    zkClient.propagateIsrChanges(Set(topicPartition11))
+
+    zkClient.deleteIsrChangeNotifications(Seq("0000000001"))
+    // Should not fail if called on a non-existent notification
+    zkClient.deleteIsrChangeNotifications(Seq("0000000001"))
+
+    assertEquals(Set("0000000000", "0000000002"), zkClient.getAllIsrChangeNotifications.toSet)
+    zkClient.deleteIsrChangeNotifications()
+    assertEquals(Seq.empty,zkClient.getAllIsrChangeNotifications)
+  }
+
+  @Test
   def testPropagateLogDir(): Unit = {
     zkClient.createRecursive("/log_dir_event_notification")
 
@@ -238,6 +293,52 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
+  def testLogDirGetters(): Unit = {
+    assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getAllLogDirEventNotifications)
+    assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
+
+    zkClient.createRecursive("/log_dir_event_notification")
+
+    val brokerId = 3
+    zkClient.propagateLogDirEvent(brokerId)
+
+    assertEquals(Seq(3), zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
+
+    zkClient.propagateLogDirEvent(brokerId)
+
+    val anotherBrokerId = 4
+    zkClient.propagateLogDirEvent(anotherBrokerId)
+
+    val notifications012 = Seq("0000000000", "0000000001", "0000000002")
+    assertEquals(notifications012.toSet, zkClient.getAllLogDirEventNotifications.toSet)
+    assertEquals(Seq(3, 3, 4), zkClient.getBrokerIdsFromLogDirEvents(notifications012))
+  }
+
+  @Test
+  def testLogDirEventNotificationsDeletion(): Unit = {
+    // Should not fail even if parent node does not exist
+    zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"))
+
+    zkClient.createRecursive("/log_dir_event_notification")
+
+    val brokerId = 3
+    val anotherBrokerId = 4
+
+    zkClient.propagateLogDirEvent(brokerId)
+    zkClient.propagateLogDirEvent(brokerId)
+    zkClient.propagateLogDirEvent(anotherBrokerId)
+
+    zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"))
+
+    assertEquals(Seq("0000000001"), zkClient.getAllLogDirEventNotifications)
+
+    zkClient.propagateLogDirEvent(anotherBrokerId)
+
+    zkClient.deleteLogDirEventNotifications()
+    assertEquals(Seq.empty, zkClient.getAllLogDirEventNotifications)
+  }
+
+  @Test
   def testSetGetAndDeletePartitionReassignment() {
     zkClient.createRecursive(AdminZNode.path)
 
@@ -377,10 +478,23 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testDeleteTopicPathMethods() {
-    val topic1 = "topic1"
-    val topic2 = "topic2"
+  def testDeletePath() {
+    val path = "/a/b/c"
+    zkClient.createRecursive(path)
+    zkClient.deletePath(path)
+    assertFalse(zkClient.pathExists(path))
+  }
 
+  @Test
+  def testDeleteTopicZNode(): Unit ={
+    zkClient.deleteTopicZNode(topic1)
+    zkClient.createRecursive(TopicZNode.path(topic1))
+    zkClient.deleteTopicZNode(topic1)
+    assertFalse(zkClient.pathExists(TopicZNode.path(topic1)))
+  }
+
+  @Test
+  def testDeleteTopicPathMethods() {
     assertFalse(zkClient.isTopicMarkedForDeletion(topic1))
     assertTrue(zkClient.getTopicDeletions.isEmpty)
 
@@ -394,17 +508,25 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertTrue(zkClient.getTopicDeletions.isEmpty)
   }
 
+  private def assertPathExistenceAndData(expectedPath: String, data: String){
+    assertTrue(zkClient.pathExists(expectedPath))
+    assertEquals(Some(data), dataAsString(expectedPath))
+   }
+
   @Test
-  def testEntityConfigManagementMethods() {
-    val topic1 = "topic1"
-    val topic2 = "topic2"
+  def testCreateTokenChangeNotification() {
+    intercept[NoNodeException] {
+      zkClient.createTokenChangeNotification("delegationToken")
+    }
+    zkClient.createDelegationTokenPaths()
 
-    assertTrue(zkClient.getEntityConfigs(ConfigType.Topic, topic1).isEmpty)
+    zkClient.createTokenChangeNotification("delegationToken")
+    assertPathExistenceAndData(s"/delegation_token/token_changes/token_change_0000000000",
"delegationToken")
+  }
 
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, "1024")
-    logProps.put(LogConfig.SegmentIndexBytesProp, "1024")
-    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+  @Test
+  def testEntityConfigManagementMethods() {
+    assertTrue(zkClient.getEntityConfigs(ConfigType.Topic, topic1).isEmpty)
 
     zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic1, logProps)
     assertEquals(logProps, zkClient.getEntityConfigs(ConfigType.Topic, topic1))
@@ -421,15 +543,399 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testBrokerRegistrationMethods() {
+  def testCreateConfigChangeNotification() {
+    intercept[NoNodeException] {
+      zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
+    }
+
+    zkClient.createTopLevelPaths()
+    zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
+
+    assertPathExistenceAndData(
+      s"/config/changes/config_change_0000000000",
+      """{"version":2,"entity_path":"/config/topics/topic1"}""")
+  }
+
+  private def createLogProps(bytesProp: Int) = {
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, bytesProp.toString)
+    logProps.put(LogConfig.SegmentIndexBytesProp, bytesProp.toString)
+    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+    logProps
+  }
+
+  private val logProps = createLogProps(1024)
+
+  @Test
+  def testGetLogConfigs() {
+    val emptyConfig = LogConfig(Collections.emptyMap())
+    assertEquals("Non existent config, no defaults",
+      (Map(topic1 -> emptyConfig), Map.empty),
+      zkClient.getLogConfigs(Seq(topic1), Collections.emptyMap()))
+
+    val logProps2 = createLogProps(2048)
+
+    zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic1, logProps)
+    assertEquals("One existing and one non-existent topic",
+      (Map(topic1 -> LogConfig(logProps), topic2 -> emptyConfig), Map.empty),
+      zkClient.getLogConfigs(Seq(topic1, topic2), Collections.emptyMap()))
+
+    zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic2, logProps2)
+    assertEquals("Two existing topics",
+      (Map(topic1 -> LogConfig(logProps), topic2 -> LogConfig(logProps2)), Map.empty),
+      zkClient.getLogConfigs(Seq(topic1, topic2), Collections.emptyMap()))
+
+    val logProps1WithMoreValues = createLogProps(1024)
+    logProps1WithMoreValues.put(LogConfig.SegmentJitterMsProp, "100")
+    logProps1WithMoreValues.put(LogConfig.SegmentBytesProp, "1024")
+
+    assertEquals("Config with defaults",
+      (Map(topic1 -> LogConfig(logProps1WithMoreValues)), Map.empty),
+      zkClient.getLogConfigs(Seq(topic1),
+        Map[String, AnyRef](LogConfig.SegmentJitterMsProp -> "100", LogConfig.SegmentBytesProp
-> "128").asJava))
+  }
+
+  private def createBrokerInfo(id: Int, host: String, port: Int,
+                               securityProtocol: SecurityProtocol, rack: Option[String] =
None) =
+    BrokerInfo(Broker(id, Seq(new EndPoint(host, port, ListenerName.forSecurityProtocol
+    (securityProtocol), securityProtocol)), rack = rack), ApiVersion.latestVersion, jmxPort
= port + 10)
+
+  @Test
+  def testRegisterBrokerInfo() {
     zkClient.createTopLevelPaths()
 
-    val brokerInfo = BrokerInfo(Broker(1,
-      Seq(new EndPoint("test.host", 9999, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
SecurityProtocol.PLAINTEXT)),
-      rack = None), ApiVersion.latestVersion, jmxPort = 9998)
+    val brokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT)
+    val differentBrokerInfoWithSameId = createBrokerInfo(1, "test.host2", 9995, SecurityProtocol.SSL)
 
     zkClient.registerBrokerInZk(brokerInfo)
     assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
+    assertEquals("Other ZK clients can read broker info", Some(brokerInfo.broker), otherZkClient.getBroker(1))
+
+    // Node exists, owned by current session - no error, no update
+    zkClient.registerBrokerInZk(differentBrokerInfoWithSameId)
+    assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
+
+    // Other client tries to register broker with same id causes failure, info is not changed
in ZK
+    intercept[NodeExistsException] {
+      otherZkClient.registerBrokerInZk(differentBrokerInfoWithSameId)
+    }
+    assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
+  }
+
+  @Test
+  def testGetBrokerMethods() {
+    zkClient.createTopLevelPaths()
+
+    assertEquals(Seq.empty,zkClient.getAllBrokersInCluster)
+    assertEquals(Seq.empty, zkClient.getSortedBrokerList())
+    assertEquals(None, zkClient.getBroker(0))
+
+    val brokerInfo0 = createBrokerInfo(0, "test.host0", 9998, SecurityProtocol.PLAINTEXT)
+    val brokerInfo1 = createBrokerInfo(1, "test.host1", 9999, SecurityProtocol.SSL)
+
+    zkClient.registerBrokerInZk(brokerInfo1)
+    otherZkClient.registerBrokerInZk(brokerInfo0)
+
+    assertEquals(Seq(0, 1), zkClient.getSortedBrokerList())
+    assertEquals(
+      Seq(brokerInfo0.broker, brokerInfo1.broker),
+      zkClient.getAllBrokersInCluster
+    )
+    assertEquals(Some(brokerInfo0.broker), zkClient.getBroker(0))
+  }
+
+  @Test
+  def testUpdateBrokerInfo() {
+    zkClient.createTopLevelPaths()
+
+    // Updating info of a broker not existing in ZK fails
+    val originalBrokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT)
+    intercept[NoNodeException]{
+      zkClient.updateBrokerInfoInZk(originalBrokerInfo)
+    }
+
+    zkClient.registerBrokerInZk(originalBrokerInfo)
+
+    val updatedBrokerInfo = createBrokerInfo(1, "test.host2", 9995, SecurityProtocol.SSL)
+    zkClient.updateBrokerInfoInZk(updatedBrokerInfo)
+    assertEquals(Some(updatedBrokerInfo.broker), zkClient.getBroker(1))
+
+    // Other ZK clients can update info
+    otherZkClient.updateBrokerInfoInZk(originalBrokerInfo)
+    assertEquals(Some(originalBrokerInfo.broker), otherZkClient.getBroker(1))
+  }
+
+  private def statWithVersion(version: Int) = {
+    val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
+    stat.setVersion(version)
+    stat
+  }
+
+  private def leaderIsrAndControllerEpochs(state: Int, zkVersion: Int) = Map(
+    topicPartition10 -> LeaderIsrAndControllerEpoch(
+      LeaderAndIsr(leader = 1, leaderEpoch = state, isr = List(2 + state, 3 + state), zkVersion
= zkVersion),
+      controllerEpoch = 4),
+    topicPartition11 -> LeaderIsrAndControllerEpoch(
+      LeaderAndIsr(leader = 0, leaderEpoch = state + 1, isr = List(1 + state, 2 + state),
zkVersion = zkVersion),
+      controllerEpoch = 4))
+
+  private def initialLeaderIsrAndControllerEpochs = leaderIsrAndControllerEpochs(0, 0)
+  private def updatedLeaderIsrAndControllerEpochs(state: Int) = leaderIsrAndControllerEpochs(state,
state - 1)
+
+  private def initialLeaderIsrs = initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr)
+  private def updatedLeaderIsrs(state: Int) = updatedLeaderIsrAndControllerEpochs(state).mapValues(_.leaderAndIsr)
+  private def leaderIsrs(state: Int, zkVersion: Int) =
+    leaderIsrAndControllerEpochs(state, zkVersion).mapValues(_.leaderAndIsr)
+
+  private def checkUpdateLeaderAndIsrResult(
+                  expectedSuccessfulPartitions: Map[TopicPartition, LeaderAndIsr],
+                  expectedPartitionsToRetry: Seq[TopicPartition],
+                  expectedFailedPartitions: Map[TopicPartition, (Class[_], String)],
+                  actualUpdateLeaderAndIsrResult: UpdateLeaderAndIsrResult): Unit = {
+    val failedPartitionsExcerpt =
+      actualUpdateLeaderAndIsrResult.failedPartitions.mapValues(e => (e.getClass, e.getMessage))
+    assertEquals("Permanently failed updates do not match expected",
+      expectedFailedPartitions, failedPartitionsExcerpt)
+    assertEquals("Retriable updates (due to BADVERSION) do not match expected",
+      expectedPartitionsToRetry, actualUpdateLeaderAndIsrResult.partitionsToRetry)
+    assertEquals("Successful updates do not match expected",
+      expectedSuccessfulPartitions, actualUpdateLeaderAndIsrResult.successfulPartitions)
+  }
+
+  @Test
+  def testUpdateLeaderAndIsr() {
+    zkClient.createRecursive(TopicZNode.path(topic1))
+
+    // Non-existing topicPartitions
+    checkUpdateLeaderAndIsrResult(
+        Map.empty,
+        mutable.ArrayBuffer.empty,
+      Map(
+        topicPartition10 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic1/partitions/0/state"),
+        topicPartition11 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic1/partitions/1/state")),
+      zkClient.updateLeaderAndIsr(initialLeaderIsrs, controllerEpoch = 4))
+
+    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+
+    checkUpdateLeaderAndIsrResult(
+      leaderIsrs(state = 1, zkVersion = 1),
+      mutable.ArrayBuffer.empty,
+      Map.empty,
+      zkClient.updateLeaderAndIsr(updatedLeaderIsrs(1),controllerEpoch = 4))
+
+    // Try to update with wrong ZK version
+    checkUpdateLeaderAndIsrResult(
+      Map.empty,
+      ArrayBuffer(topicPartition10, topicPartition11),
+      Map.empty,
+      zkClient.updateLeaderAndIsr(updatedLeaderIsrs(1),controllerEpoch = 4))
+
+    // Trigger successful, to be retried and failed partitions in same call
+    val mixedState = Map(
+      topicPartition10 -> LeaderAndIsr(leader = 1, leaderEpoch = 2, isr = List(4, 5),
zkVersion = 1),
+      topicPartition11 -> LeaderAndIsr(leader = 0, leaderEpoch = 2, isr = List(3, 4),
zkVersion = 0),
+      topicPartition20 -> LeaderAndIsr(leader = 0, leaderEpoch = 2, isr = List(3, 4),
zkVersion = 0))
+
+    checkUpdateLeaderAndIsrResult(
+      leaderIsrs(state = 2, zkVersion = 2).filterKeys{_ == topicPartition10},
+      ArrayBuffer(topicPartition11),
+      Map(
+        topicPartition20 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic2/partitions/0/state")),
+      zkClient.updateLeaderAndIsr(mixedState, controllerEpoch = 4))
+  }
+
+  private def checkGetDataResponse(
+      leaderIsrAndControllerEpochs: Map[TopicPartition,LeaderIsrAndControllerEpoch],
+      topicPartition: TopicPartition,
+      response: GetDataResponse) = {
+    val zkVersion = leaderIsrAndControllerEpochs(topicPartition).leaderAndIsr.zkVersion
+    assertEquals(Code.OK, response.resultCode)
+    assertEquals(TopicPartitionStateZNode.path(topicPartition), response.path)
+    assertEquals(Some(topicPartition), response.ctx)
+    assertEquals(
+      Some(leaderIsrAndControllerEpochs(topicPartition)),
+      TopicPartitionStateZNode.decode(response.data, statWithVersion(zkVersion)))
+  }
+
+  private def eraseMetadata(response: CreateResponse) = response.copy(metadata = ResponseMetadata(0,
0))
+
+  @Test
+  def testGetTopicsAndPartitions() {
+    assertTrue(zkClient.getAllTopicsInCluster.isEmpty)
+    assertTrue(zkClient.getAllPartitions.isEmpty)
+
+    zkClient.createRecursive(TopicZNode.path(topic1))
+    zkClient.createRecursive(TopicZNode.path(topic2))
+    assertEquals(Set(topic1, topic2), zkClient.getAllTopicsInCluster.toSet)
+
+    assertTrue(zkClient.getAllPartitions.isEmpty)
+
+    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+    assertEquals(Set(topicPartition10, topicPartition11), zkClient.getAllPartitions)
+  }
+
+  @Test
+  def testCreateAndGetTopicPartitionStatesRaw() {
+    zkClient.createRecursive(TopicZNode.path(topic1))
+
+    assertEquals(
+      Seq(
+        CreateResponse(Code.OK, TopicPartitionStateZNode.path(topicPartition10), Some(topicPartition10),
+          TopicPartitionStateZNode.path(topicPartition10), ResponseMetadata(0, 0)),
+        CreateResponse(Code.OK, TopicPartitionStateZNode.path(topicPartition11), Some(topicPartition11),
+          TopicPartitionStateZNode.path(topicPartition11), ResponseMetadata(0, 0))),
+      zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+        .map(eraseMetadata).toList)
+
+    val getResponses = zkClient.getTopicPartitionStatesRaw(topicPartitions10_11)
+    assertEquals(2, getResponses.size)
+    topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(initialLeaderIsrAndControllerEpochs,
tp, r)}
+
+    // Trying to create existing topicPartition states fails
+    assertEquals(
+      Seq(
+        CreateResponse(Code.NODEEXISTS, TopicPartitionStateZNode.path(topicPartition10),
Some(topicPartition10),
+          null, ResponseMetadata(0, 0)),
+        CreateResponse(Code.NODEEXISTS, TopicPartitionStateZNode.path(topicPartition11),
Some(topicPartition11),
+          null, ResponseMetadata(0, 0))),
+      zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map(eraseMetadata).toList)
+  }
+
+  @Test
+  def testSetTopicPartitionStatesRaw() {
+
+    def expectedSetDataResponses(topicPartitions: TopicPartition*)(resultCode: Code, stat:
Stat) =
+      topicPartitions.map { topicPartition =>
+        SetDataResponse(resultCode, TopicPartitionStateZNode.path(topicPartition),
+          Some(topicPartition), stat, ResponseMetadata(0, 0))
+      }
+
+    zkClient.createRecursive(TopicZNode.path(topic1))
+
+    // Trying to set non-existing topicPartition's data results in NONODE responses
+    assertEquals(
+      expectedSetDataResponses(topicPartition10, topicPartition11)(Code.NONODE, null),
+      zkClient.setTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map {
+        _.copy(metadata = ResponseMetadata(0, 0))}.toList)
+
+    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+
+    assertEquals(
+      expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(1)),
+      zkClient.setTopicPartitionStatesRaw(updatedLeaderIsrAndControllerEpochs(1)).map {
+        eraseMetadataAndStat}.toList)
+
+
+    val getResponses = zkClient.getTopicPartitionStatesRaw(topicPartitions10_11)
+    assertEquals(2, getResponses.size)
+    topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(updatedLeaderIsrAndControllerEpochs(1),
tp, r)}
+
+    // Other ZK client can also write the state of a partition
+    assertEquals(
+      expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(2)),
+      otherZkClient.setTopicPartitionStatesRaw(updatedLeaderIsrAndControllerEpochs(2)).map
{
+        eraseMetadataAndStat}.toList)
+  }
+
+  @Test
+  def testReassignPartitionsInProgress() {
+    assertFalse(zkClient.reassignPartitionsInProgress)
+    zkClient.createRecursive(ReassignPartitionsZNode.path)
+    assertTrue(zkClient.reassignPartitionsInProgress)
+  }
+
+  @Test
+  def testGetTopicPartitionStates() {
+    assertEquals(None, zkClient.getTopicPartitionState(topicPartition10))
+    assertEquals(None, zkClient.getLeaderForPartition(topicPartition10))
+
+    zkClient.createRecursive(TopicZNode.path(topic1))
+
+
+    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map(eraseMetadata).toList
+    assertEquals(
+      initialLeaderIsrAndControllerEpochs,
+      zkClient.getTopicPartitionStates(Seq(topicPartition10, topicPartition11))
+    )
+
+    assertEquals(
+      Some(initialLeaderIsrAndControllerEpochs(topicPartition10)),
+      zkClient.getTopicPartitionState(topicPartition10)
+    )
+
+    assertEquals(Some(1), zkClient.getLeaderForPartition(topicPartition10))
+
+    val notExistingPartition = new TopicPartition(topic1, 2)
+    assertTrue(zkClient.getTopicPartitionStates(Seq(notExistingPartition)).isEmpty)
+    assertEquals(
+      Map(topicPartition10 -> initialLeaderIsrAndControllerEpochs(topicPartition10)),
+      zkClient.getTopicPartitionStates(Seq(topicPartition10, notExistingPartition))
+    )
+
+    assertEquals(None, zkClient.getTopicPartitionState(notExistingPartition))
+    assertEquals(None, zkClient.getLeaderForPartition(notExistingPartition))
+
+  }
+
+  private def eraseMetadataAndStat(response: SetDataResponse) = {
+    val stat = if (response.stat != null) statWithVersion(response.stat.getVersion) else
null
+    response.copy(metadata = ResponseMetadata(0, 0), stat = stat)
+  }
+
+  @Test
+  def testControllerEpochMethods() {
+    assertEquals(None, zkClient.getControllerEpoch)
+
+    assertEquals("Setting non existing nodes should return NONODE results",
+      SetDataResponse(Code.NONODE, ControllerEpochZNode.path, None, null, ResponseMetadata(0,
0)),
+      eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
+
+    assertEquals("Creating non existing nodes is OK",
+      CreateResponse(Code.OK, ControllerEpochZNode.path, None, ControllerEpochZNode.path,
ResponseMetadata(0, 0)),
+      eraseMetadata(zkClient.createControllerEpochRaw(0)))
+    assertEquals(0, zkClient.getControllerEpoch.get._1)
+
+    assertEquals("Attemt to create existing nodes should return NODEEXISTS",
+      CreateResponse(Code.NODEEXISTS, ControllerEpochZNode.path, None, null, ResponseMetadata(0,
0)),
+      eraseMetadata(zkClient.createControllerEpochRaw(0)))
+
+    assertEquals("Updating existing nodes is OK",
+      SetDataResponse(Code.OK, ControllerEpochZNode.path, None, statWithVersion(1), ResponseMetadata(0,
0)),
+      eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
+    assertEquals(1, zkClient.getControllerEpoch.get._1)
+
+    assertEquals("Updating with wrong ZK version returns BADVERSION",
+      SetDataResponse(Code.BADVERSION, ControllerEpochZNode.path, None, null, ResponseMetadata(0,
0)),
+      eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
+  }
+
+  @Test
+  def testControllerManagementMethods() {
+    // No controller
+    assertEquals(None, zkClient.getControllerId)
+    // Create controller
+    zkClient.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(brokerId
= 1, timestamp = 123456))
+    assertEquals(Some(1), zkClient.getControllerId)
+    zkClient.deleteController()
+    assertEquals(None, zkClient.getControllerId)
+  }
+
+  @Test
+  def testZNodeChangeHandlerForDataChange(): Unit = {
+    val mockPath = "/foo"
+
+    val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
+    val zNodeChangeHandler = new ZNodeChangeHandler {
+      override def handleCreation(): Unit = {
+        znodeChangeHandlerCountDownLatch.countDown()
+      }
+
+      override val path: String = mockPath
+    }
+
+    zkClient.registerZNodeChangeHandlerAndCheckExistence(zNodeChangeHandler)
+    zkClient.createRecursive(mockPath)
+    assertTrue("Failed to receive create notification", znodeChangeHandlerCountDownLatch.await(5,
TimeUnit.SECONDS))
   }
 
   @Test
@@ -458,7 +964,6 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
     assertTrue(zkClient.getPreferredReplicaElection.isEmpty)
 
-    val topic1 = "topic1"
     val electionPartitions = Set(new TopicPartition(topic1, 0), new TopicPartition(topic1,
1))
 
     zkClient.createPreferredReplicaElection(electionPartitions)
@@ -498,6 +1003,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
     // test non-existent token
     assertTrue(zkClient.getDelegationTokenInfo(tokenId).isEmpty)
+    assertFalse(zkClient.deleteDelegationToken(tokenId))
 
     // create a token
     zkClient.setOrCreateDelegationToken(token)
@@ -511,5 +1017,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
     //test updated token
     assertEquals(tokenInfo, zkClient.getDelegationTokenInfo(tokenId).get)
+
+    //test deleting token
+    assertTrue(zkClient.deleteDelegationToken(tokenId))
+    assertEquals(None, zkClient.getDelegationTokenInfo(tokenId))
   }
-}
+}
\ No newline at end of file
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index a122297..f9cb8e3 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -19,16 +19,16 @@ package kafka.zk
 
 import javax.security.auth.login.Configuration
 
-import kafka.utils.{CoreUtils, Logging, TestUtils}
+import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils}
 import org.junit.{After, AfterClass, Before, BeforeClass}
 import org.junit.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.test.IntegrationTest
 import org.junit.experimental.categories.Category
-
 import scala.collection.Set
 import scala.collection.JavaConverters._
+
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
 import kafka.controller.ControllerEventManager
@@ -45,6 +45,7 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
   protected val zkAclsEnabled: Option[Boolean] = None
 
   var zkClient: KafkaZkClient = null
+  var otherZkClient: KafkaZkClient = null
   var adminZkClient: AdminZkClient = null
 
   var zookeeper: EmbeddedZookeeper = null
@@ -55,15 +56,22 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
   @Before
   def setUp() {
     zookeeper = new EmbeddedZookeeper()
-    zkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled),
zkSessionTimeout,
-      zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
+    zkClient = createZkClient
+    otherZkClient = createZkClient
     adminZkClient = new AdminZkClient(zkClient)
   }
 
+  protected def createZkClient = {
+    KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
+      zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
+  }
+
   @After
   def tearDown() {
     if (zkClient != null)
-     zkClient.close()
+      zkClient.close()
+    if (otherZkClient != null)
+      otherZkClient.close()
     if (zookeeper != null)
       CoreUtils.swallow(zookeeper.shutdown(), this)
     Configuration.setConfiguration(null)

-- 
To stop receiving notification emails like this one, please contact
junrao@apache.org.

Mime
View raw message