kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: MINOR: Avoid intermediate strings when parsing/decoding ZK JSON
Date Thu, 07 Dec 2017 00:38:58 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk b00a9fc7c -> b73c765d7


MINOR: Avoid intermediate strings when parsing/decoding ZK JSON

Also:
- Fix bug in result type of `createSequentialPersistentPath`
- Remove duplicated code from `ReplicationUtils`
- Move `propagateIsrChanges` from `ReplicationUtils` to `KafkaZkClient`
- Add tests
- Minor clean-ups

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jun Rao <junrao@gmail.com>, Ted Yu <yuzhihong@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>

Closes #4261 from ijuma/zk-data-improvements


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b73c765d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b73c765d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b73c765d

Branch: refs/heads/trunk
Commit: b73c765d7e172de4742a3aa023d5a0a4b7387247
Parents: b00a9fc
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Thu Dec 7 02:38:34 2017 +0200
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Thu Dec 7 02:38:34 2017 +0200

----------------------------------------------------------------------
 .../src/main/scala/kafka/admin/AdminUtils.scala |   2 +-
 .../main/scala/kafka/cluster/Partition.scala    |   6 +-
 .../ZkNodeChangeNotificationListener.scala      |   2 +-
 .../transaction/ProducerIdManager.scala         |  24 ++--
 .../security/auth/SimpleAclAuthorizer.scala     |   5 +-
 .../kafka/server/DynamicConfigManager.scala     |  26 +++--
 .../scala/kafka/server/ReplicaManager.scala     |   2 +-
 .../scala/kafka/utils/ReplicationUtils.scala    |  75 +++---------
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  29 ++++-
 .../src/main/scala/kafka/zk/AdminZkClient.scala |   6 +
 .../src/main/scala/kafka/zk/KafkaZkClient.scala |  52 +++++----
 core/src/main/scala/kafka/zk/ZkData.scala       |  13 +--
 .../ZkNodeChangeNotificationListenerTest.scala  |   6 +-
 .../transaction/ProducerIdManagerTest.scala     |  14 +--
 .../kafka/server/DynamicConfigChangeTest.scala  |  11 +-
 .../unit/kafka/utils/ReplicationUtilsTest.scala |  60 ++++------
 .../scala/unit/kafka/utils/ZkUtilsTest.scala    |  23 ++++
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala | 116 ++++++++++++++-----
 18 files changed, 271 insertions(+), 201 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b73c765d/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 6665d25..4d0ad58 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -268,7 +268,7 @@ object AdminUtils extends Logging with AdminUtilities {
   * @param validateOnly If true, validate the parameters without actually adding the partitions
   * @return the updated replica assignment
   */
- @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
+  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
   def addPartitions(zkUtils: ZkUtils,
                     topic: String,
                     existingAssignment: Map[Int, Seq[Int]],

http://git-wip-us.apache.org/repos/asf/kafka/blob/b73c765d/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 91b86ee..8d8c2b3 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -663,10 +663,10 @@ class Partition(val topic: String,
 
   private def updateIsr(newIsr: Set[Replica]) {
     val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(_.brokerId).toList, zkVersion)
-    val (updateSucceeded,newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topic, partitionId,
-      newLeaderAndIsr, controllerEpoch, zkVersion)
+    val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topicPartition, newLeaderAndIsr,
+      controllerEpoch)
 
-    if(updateSucceeded) {
+    if (updateSucceeded) {
       replicaManager.recordIsrChange(topicPartition)
       inSyncReplicas = newIsr
       zkVersion = newVersion

http://git-wip-us.apache.org/repos/asf/kafka/blob/b73c765d/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
index 4cae80c..0783f61 100644
--- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
+++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.common.utils.Time
  * Handle the notificationMessage.
  */
 trait NotificationHandler {
-  def processNotification(notificationMessage: String)
+  def processNotification(notificationMessage: Array[Byte])
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/b73c765d/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
index 6be3c6b..5d32085 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
@@ -16,6 +16,8 @@
  */
 package kafka.coordinator.transaction
 
+import java.nio.charset.StandardCharsets
+
 import kafka.common.KafkaException
 import kafka.utils.{Json, Logging, ZkUtils}
 import kafka.zk.KafkaZkClient
@@ -31,17 +33,17 @@ object ProducerIdManager extends Logging {
   val CurrentVersion: Long = 1L
   val PidBlockSize: Long = 1000L
 
-  def generateProducerIdBlockJson(producerIdBlock: ProducerIdBlock): String = {
-    Json.encode(Map("version" -> CurrentVersion,
+  def generateProducerIdBlockJson(producerIdBlock: ProducerIdBlock): Array[Byte] = {
+    Json.encodeAsBytes(Map("version" -> CurrentVersion,
       "broker" -> producerIdBlock.brokerId,
       "block_start" -> producerIdBlock.blockStartId.toString,
       "block_end" -> producerIdBlock.blockEndId.toString)
     )
   }
 
-  def parseProducerIdBlockData(jsonData: String): ProducerIdBlock = {
+  def parseProducerIdBlockData(jsonData: Array[Byte]): ProducerIdBlock = {
     try {
-      Json.parseFull(jsonData).map(_.asJsonObject).flatMap { js =>
+      Json.parseBytes(jsonData).map(_.asJsonObject).flatMap { js =>
         val brokerId = js("broker").to[Int]
         val blockStart = js("block_start").to[String].toLong
         val blockEnd = js("block_end").to[String].toLong
@@ -115,21 +117,19 @@ class ProducerIdManager(val brokerId: Int, val zkClient: KafkaZkClient) extends
     }
   }
 
-  private def checkProducerIdBlockZkData(zkClient: KafkaZkClient, path: String, expectedData: String): (Boolean, Int) = {
+  private def checkProducerIdBlockZkData(zkClient: KafkaZkClient, path: String, expectedData: Array[Byte]): (Boolean, Int) = {
     try {
       val expectedPidBlock = ProducerIdManager.parseProducerIdBlockData(expectedData)
-      val (dataOpt, zkVersion) = zkClient.getDataAndVersion(ZkUtils.ProducerIdBlockPath)
-      dataOpt match {
-        case Some(data) =>
+      zkClient.getDataAndVersion(ZkUtils.ProducerIdBlockPath) match {
+        case (Some(data), zkVersion) =>
           val currProducerIdBLock = ProducerIdManager.parseProducerIdBlockData(data)
           (currProducerIdBLock == expectedPidBlock, zkVersion)
-        case None =>
-          (false, -1)
+        case (None, _) => (false, -1)
       }
     } catch {
       case e: Exception =>
-        warn(s"Error while checking for producerId block Zk data on path $path: expected data $expectedData", e)
-
+        warn(s"Error while checking for producerId block Zk data on path $path: expected data " +
+          s"${new String(expectedData, StandardCharsets.UTF_8)}", e)
         (false, -1)
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b73c765d/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index e10bfa1..80d85a0 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -16,6 +16,7 @@
  */
 package kafka.security.auth
 
+import java.nio.charset.StandardCharsets
 import java.util
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
@@ -313,8 +314,8 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
   }
 
   object AclChangedNotificationHandler extends NotificationHandler {
-    override def processNotification(notificationMessage: String) {
-      val resource: Resource = Resource.fromString(notificationMessage)
+    override def processNotification(notificationMessage: Array[Byte]) {
+      val resource: Resource = Resource.fromString(new String(notificationMessage, StandardCharsets.UTF_8))
       inWriteLock(lock) {
         val versionedAcls = getAclsFromZk(resource)
         updateCache(resource, versionedAcls)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b73c765d/core/src/main/scala/kafka/server/DynamicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
index 457742d..84bcfe3 100644
--- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
@@ -17,6 +17,8 @@
 
 package kafka.server
 
+import java.nio.charset.StandardCharsets
+
 import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener}
 import kafka.utils.Json
 import kafka.utils.Logging
@@ -91,33 +93,34 @@ class DynamicConfigManager(private val zkClient: KafkaZkClient,
   val adminZkClient = new AdminZkClient(zkClient)
 
   object ConfigChangedNotificationHandler extends NotificationHandler {
-    override def processNotification(json: String) = {
+    override def processNotification(jsonBytes: Array[Byte]) = {
       // Ignore non-json notifications because they can be from the deprecated TopicConfigManager
-      Json.parseFull(json).foreach { js =>
+      Json.parseBytes(jsonBytes).foreach { js =>
         val jsObject = js.asJsonObjectOption.getOrElse {
           throw new IllegalArgumentException("Config change notification has an unexpected value. The format is:" +
             """{"version" : 1, "entity_type":"topics/clients", "entity_name" : "topic_name/client_id"} or """ +
             """{"version" : 2, "entity_path":"entity_type/entity_name"}. """ +
-            s"Received: $json")
+            s"Received: ${new String(jsonBytes, StandardCharsets.UTF_8)}")
         }
         jsObject("version").to[Int] match {
-          case 1 => processEntityConfigChangeVersion1(json, jsObject)
-          case 2 => processEntityConfigChangeVersion2(json, jsObject)
+          case 1 => processEntityConfigChangeVersion1(jsonBytes, jsObject)
+          case 2 => processEntityConfigChangeVersion2(jsonBytes, jsObject)
           case version => throw new IllegalArgumentException("Config change notification has unsupported version " +
             s"'$version', supported versions are 1 and 2.")
         }
       }
     }
 
-    private def processEntityConfigChangeVersion1(json: String, js: JsonObject) {
+    private def processEntityConfigChangeVersion1(jsonBytes: Array[Byte], js: JsonObject) {
       val validConfigTypes = Set(ConfigType.Topic, ConfigType.Client)
       val entityType = js.get("entity_type").flatMap(_.to[Option[String]]).filter(validConfigTypes).getOrElse {
         throw new IllegalArgumentException("Version 1 config change notification must have 'entity_type' set to " +
-          s"'clients' or 'topics'. Received: $json")
+          s"'clients' or 'topics'. Received: ${new String(jsonBytes, StandardCharsets.UTF_8)}")
       }
 
       val entity = js.get("entity_name").flatMap(_.to[Option[String]]).getOrElse {
-        throw new IllegalArgumentException("Version 1 config change notification does not specify 'entity_name'. Received: " + json)
+        throw new IllegalArgumentException("Version 1 config change notification does not specify 'entity_name'. " +
+          s"Received: ${new String(jsonBytes, StandardCharsets.UTF_8)}")
       }
 
       val entityConfig = adminZkClient.fetchEntityConfig(entityType, entity)
@@ -126,10 +129,11 @@ class DynamicConfigManager(private val zkClient: KafkaZkClient,
 
     }
 
-    private def processEntityConfigChangeVersion2(json: String, js: JsonObject) {
+    private def processEntityConfigChangeVersion2(jsonBytes: Array[Byte], js: JsonObject) {
 
       val entityPath = js.get("entity_path").flatMap(_.to[Option[String]]).getOrElse {
-        throw new IllegalArgumentException(s"Version 2 config change notification must specify 'entity_path'. Received: $json")
+        throw new IllegalArgumentException(s"Version 2 config change notification must specify 'entity_path'. " +
+          s"Received: ${new String(jsonBytes, StandardCharsets.UTF_8)}")
       }
 
       val index = entityPath.indexOf('/')
@@ -137,7 +141,7 @@ class DynamicConfigManager(private val zkClient: KafkaZkClient,
       if (index < 0 || !configHandlers.contains(rootEntityType)) {
         val entityTypes = configHandlers.keys.map(entityType => s"'$entityType'/").mkString(", ")
         throw new IllegalArgumentException("Version 2 config change notification must have 'entity_path' starting with " +
-          s"one of $entityTypes. Received: $json")
+          s"one of $entityTypes. Received: ${new String(jsonBytes, StandardCharsets.UTF_8)}")
       }
       val fullSanitizedEntityName = entityPath.substring(index + 1)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b73c765d/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 61c430a..593c876 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -266,7 +266,7 @@ class ReplicaManager(val config: KafkaConfig,
       if (isrChangeSet.nonEmpty &&
         (lastIsrChangeMs.get() + ReplicaManager.IsrChangePropagationBlackOut < now ||
           lastIsrPropagationMs.get() + ReplicaManager.IsrChangePropagationInterval < now)) {
-        ReplicationUtils.propagateIsrChanges(zkClient, isrChangeSet)
+        zkClient.propagateIsrChanges(isrChangeSet)
         isrChangeSet.clear()
         lastIsrPropagationMs.set(now)
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b73c765d/core/src/main/scala/kafka/utils/ReplicationUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
index f5751d2..33de22b 100644
--- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala
+++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
@@ -18,76 +18,39 @@
 package kafka.utils
 
 import kafka.api.LeaderAndIsr
-import kafka.controller.{IsrChangeNotificationHandler, LeaderIsrAndControllerEpoch}
-import kafka.utils.ZkUtils._
+import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.zk._
 import org.apache.kafka.common.TopicPartition
-import org.apache.zookeeper.data.Stat
-
-import scala.collection._
 
 object ReplicationUtils extends Logging {
 
-  private val IsrChangeNotificationPrefix = "isr_change_"
-
-  def updateLeaderAndIsr(zkClient: KafkaZkClient, topic: String, partitionId: Int, newLeaderAndIsr: LeaderAndIsr, controllerEpoch: Int,
-    zkVersion: Int): (Boolean,Int) = {
-    debug(s"Updated ISR for $topic-$partitionId to ${newLeaderAndIsr.isr.mkString(",")}")
-    val path = getTopicPartitionLeaderAndIsrPath(topic, partitionId)
-    val newLeaderData = LeaderAndIsrZNode.encode(newLeaderAndIsr, controllerEpoch)
+  def updateLeaderAndIsr(zkClient: KafkaZkClient, partition: TopicPartition, newLeaderAndIsr: LeaderAndIsr,
+                         controllerEpoch: Int): (Boolean, Int) = {
+    debug(s"Updated ISR for $partition to ${newLeaderAndIsr.isr.mkString(",")}")
+    val path = TopicPartitionStateZNode.path(partition)
+    val newLeaderData = TopicPartitionStateZNode.encode(LeaderIsrAndControllerEpoch(newLeaderAndIsr, controllerEpoch))
     // use the epoch of the controller that made the leadership decision, instead of the current controller epoch
-    val updatePersistentPath: (Boolean, Int) = zkClient.conditionalUpdatePath(path, newLeaderData, zkVersion, Some(checkLeaderAndIsrZkData))
+    val updatePersistentPath: (Boolean, Int) = zkClient.conditionalUpdatePath(path, newLeaderData,
+      newLeaderAndIsr.zkVersion, Some(checkLeaderAndIsrZkData))
     updatePersistentPath
   }
 
-  def propagateIsrChanges(zkClient: KafkaZkClient, isrChangeSet: Set[TopicPartition]): Unit = {
-    val isrChangeNotificationPath: String = zkClient.createSequentialPersistentPath(
-      ZkUtils.IsrChangeNotificationPath + "/" + IsrChangeNotificationPrefix,
-      generateIsrChangeJson(isrChangeSet))
-    debug(s"Added $isrChangeNotificationPath for $isrChangeSet")
-  }
-
-  private def checkLeaderAndIsrZkData(zkClient: KafkaZkClient, path: String, expectedLeaderAndIsrInfo: String): (Boolean, Int) = {
+  private def checkLeaderAndIsrZkData(zkClient: KafkaZkClient, path: String, expectedLeaderAndIsrInfo: Array[Byte]): (Boolean, Int) = {
     try {
       val (writtenLeaderOpt, writtenStat) = zkClient.getDataAndStat(path)
-      val expectedLeader = parseLeaderAndIsr(expectedLeaderAndIsrInfo, path, writtenStat)
-      writtenLeaderOpt.foreach { writtenData =>
-        val writtenLeader = parseLeaderAndIsr(writtenData, path, writtenStat)
-        (expectedLeader,writtenLeader) match {
-          case (Some(expectedLeader),Some(writtenLeader)) =>
-            if(expectedLeader == writtenLeader)
-              return (true, writtenStat.getVersion())
-          case _ =>
+      val expectedLeaderOpt = TopicPartitionStateZNode.decode(expectedLeaderAndIsrInfo, writtenStat)
+      val succeeded = writtenLeaderOpt.map { writtenData =>
+        val writtenLeaderOpt = TopicPartitionStateZNode.decode(writtenData, writtenStat)
+        (expectedLeaderOpt, writtenLeaderOpt) match {
+          case (Some(expectedLeader), Some(writtenLeader)) if expectedLeader == writtenLeader => true
+          case _ => false
         }
-      }
+      }.getOrElse(false)
+      if (succeeded) (true, writtenStat.getVersion)
+      else (false, -1)
     } catch {
-      case _: Exception =>
+      case _: Exception => (false, -1)
     }
-    (false, -1)
-  }
-
-  def getLeaderIsrAndEpochForPartition(zkUtils: ZkUtils, topic: String, partition: Int): Option[LeaderIsrAndControllerEpoch] = {
-    val leaderAndIsrPath = getTopicPartitionLeaderAndIsrPath(topic, partition)
-    val (leaderAndIsrOpt, stat) = zkUtils.readDataMaybeNull(leaderAndIsrPath)
-    debug(s"Read leaderISR $leaderAndIsrOpt for $topic-$partition")
-    leaderAndIsrOpt.flatMap(leaderAndIsrStr => parseLeaderAndIsr(leaderAndIsrStr, leaderAndIsrPath, stat))
-  }
-
-  private def parseLeaderAndIsr(leaderAndIsrStr: String, path: String, stat: Stat): Option[LeaderIsrAndControllerEpoch] = {
-    Json.parseFull(leaderAndIsrStr).flatMap { js =>
-      val leaderIsrAndEpochInfo = js.asJsonObject
-      val leader = leaderIsrAndEpochInfo("leader").to[Int]
-      val epoch = leaderIsrAndEpochInfo("leader_epoch").to[Int]
-      val isr = leaderIsrAndEpochInfo("isr").to[List[Int]]
-      val controllerEpoch = leaderIsrAndEpochInfo("controller_epoch").to[Int]
-      val zkPathVersion = stat.getVersion
-      trace(s"Leader $leader, Epoch $epoch, Isr $isr, Zk path version $zkPathVersion for leaderAndIsrPath $path")
-      Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion), controllerEpoch))}
-  }
-
-  private def generateIsrChangeJson(isrChanges: Set[TopicPartition]): String = {
-    val partitions = isrChanges.map(tp => Map("topic" -> tp.topic, "partition" -> tp.partition)).toArray
-    Json.encode(Map("version" -> IsrChangeNotificationHandler.Version, "partitions" -> partitions))
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b73c765d/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index de2bc99..b378280 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -28,7 +28,6 @@ import kafka.controller.{LeaderIsrAndControllerEpoch, ReassignedPartitionsContex
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.ConfigType
 import kafka.utils.ZkUtils._
-
 import com.yammer.metrics.core.MetricName
 import org.I0Itec.zkclient.exception.{ZkBadVersionException, ZkException, ZkMarshallingError, ZkNoNodeException, ZkNodeExistsException}
 import org.I0Itec.zkclient.serialize.ZkSerializer
@@ -348,8 +347,23 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
     brokerIds.map(_.toInt).map(getBrokerInfo(_)).filter(_.isDefined).map(_.get)
   }
 
-  def getLeaderAndIsrForPartition(topic: String, partition: Int):Option[LeaderAndIsr] = {
-    ReplicationUtils.getLeaderIsrAndEpochForPartition(this, topic, partition).map(_.leaderAndIsr)
+  def getLeaderAndIsrForPartition(topic: String, partition: Int): Option[LeaderAndIsr] = {
+    val leaderAndIsrPath = getTopicPartitionLeaderAndIsrPath(topic, partition)
+    val (leaderAndIsrOpt, stat) = readDataMaybeNull(leaderAndIsrPath)
+    debug(s"Read leaderISR $leaderAndIsrOpt for $topic-$partition")
+    leaderAndIsrOpt.flatMap(leaderAndIsrStr => parseLeaderAndIsr(leaderAndIsrStr, leaderAndIsrPath, stat).map(_.leaderAndIsr))
+  }
+
+  private def parseLeaderAndIsr(leaderAndIsrStr: String, path: String, stat: Stat): Option[LeaderIsrAndControllerEpoch] = {
+    Json.parseFull(leaderAndIsrStr).flatMap { js =>
+      val leaderIsrAndEpochInfo = js.asJsonObject
+      val leader = leaderIsrAndEpochInfo("leader").to[Int]
+      val epoch = leaderIsrAndEpochInfo("leader_epoch").to[Int]
+      val isr = leaderIsrAndEpochInfo("isr").to[List[Int]]
+      val controllerEpoch = leaderIsrAndEpochInfo("controller_epoch").to[Int]
+      val zkPathVersion = stat.getVersion
+      trace(s"Leader $leader, Epoch $epoch, Isr $isr, Zk path version $zkPathVersion for leaderAndIsrPath $path")
+      Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion), controllerEpoch))}
   }
 
   def setupCommonPaths() {
@@ -759,13 +773,20 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
   def getPartitionLeaderAndIsrForTopics(topicAndPartitions: Set[TopicAndPartition]): mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = {
     val ret = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
     for(topicAndPartition <- topicAndPartitions) {
-      ReplicationUtils.getLeaderIsrAndEpochForPartition(this, topicAndPartition.topic, topicAndPartition.partition).foreach { leaderIsrAndControllerEpoch =>
+      getLeaderIsrAndEpochForPartition(topicAndPartition.topic, topicAndPartition.partition).foreach { leaderIsrAndControllerEpoch =>
         ret.put(topicAndPartition, leaderIsrAndControllerEpoch)
       }
     }
     ret
   }
 
+  private[utils] def getLeaderIsrAndEpochForPartition(topic: String, partition: Int): Option[LeaderIsrAndControllerEpoch] = {
+    val leaderAndIsrPath = getTopicPartitionLeaderAndIsrPath(topic, partition)
+    val (leaderAndIsrOpt, stat) = readDataMaybeNull(leaderAndIsrPath)
+    debug(s"Read leaderISR $leaderAndIsrOpt for $topic-$partition")
+    leaderAndIsrOpt.flatMap(leaderAndIsrStr => parseLeaderAndIsr(leaderAndIsrStr, leaderAndIsrPath, stat))
+  }
+  
   def getReplicaAssignmentForTopics(topics: Seq[String]): mutable.Map[TopicAndPartition, Seq[Int]] = {
     val ret = new mutable.HashMap[TopicAndPartition, Seq[Int]]
     topics.foreach { topic =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/b73c765d/core/src/main/scala/kafka/zk/AdminZkClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala b/core/src/main/scala/kafka/zk/AdminZkClient.scala
index e00b8e6..fe64414 100644
--- a/core/src/main/scala/kafka/zk/AdminZkClient.scala
+++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala
@@ -30,6 +30,12 @@ import org.apache.zookeeper.KeeperException.NodeExistsException
 
 import scala.collection.{Map, Seq}
 
+/**
+ * Provides admin related methods for interacting with ZooKeeper.
+ *
+ * This is an internal class and no compatibility guarantees are provided,
+ * see org.apache.kafka.clients.admin.AdminClient for publicly supported APIs.
+ */
 class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/b73c765d/core/src/main/scala/kafka/zk/KafkaZkClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index c035237..97ae11e 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -16,7 +16,6 @@
 */
 package kafka.zk
 
-import java.nio.charset.StandardCharsets.UTF_8
 import java.util.Properties
 
 import com.yammer.metrics.core.MetricName
@@ -59,11 +58,19 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: T
 
   import KafkaZkClient._
 
-  def createSequentialPersistentPath(path: String, data: String = ""): String = {
-    val createRequest = CreateRequest(path, data.getBytes("UTF-8"), acls(path), CreateMode.PERSISTENT_SEQUENTIAL)
+  /**
+   * Create a sequential persistent path. That is, the znode will not be automatically deleted upon client's disconnect
+   * and a monotonically increasing number will be appended to its name.
+   *
+   * @param path the path to create (with the monotonically increasing number appended)
+   * @param data the znode data
+   * @return the created path (including the appended monotonically increasing number)
+   */
+  private[zk] def createSequentialPersistentPath(path: String, data: Array[Byte]): String = {
+    val createRequest = CreateRequest(path, data, acls(path), CreateMode.PERSISTENT_SEQUENTIAL)
     val createResponse = retryRequestUntilConnected(createRequest)
     createResponse.maybeThrow
-    createResponse.path
+    createResponse.name
   }
 
   /**
@@ -131,7 +138,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: T
   }
 
   /**
-   * Try to update the partition states of multiple partitions in zookeeper.
+   * Update the partition states of multiple partitions in zookeeper.
    * @param leaderAndIsrs The partition states to update.
    * @param controllerEpoch The current controller epoch.
    * @return UpdateLeaderAndIsrResult instance containing per partition results.
@@ -140,7 +147,9 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: T
     val successfulUpdates = mutable.Map.empty[TopicPartition, LeaderAndIsr]
     val updatesToRetry = mutable.Buffer.empty[TopicPartition]
     val failed = mutable.Map.empty[TopicPartition, Exception]
-    val leaderIsrAndControllerEpochs = leaderAndIsrs.map { case (partition, leaderAndIsr) => partition -> LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch) }
+    val leaderIsrAndControllerEpochs = leaderAndIsrs.map { case (partition, leaderAndIsr) =>
+      partition -> LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
+    }
     val setDataResponses = try {
       setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs)
     } catch {
@@ -477,11 +486,11 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: T
   /**
    * Gets the data and version at the given zk path
    * @param path zk node path
-   * @return A tuple of 2 elements, where first element is zk node data as string
+   * @return A tuple of 2 elements, where first element is zk node data as an array of bytes
    *         and second element is zk node version.
    *         returns (None, ZkVersion.NoVersion) if node doesn't exists and throws exception for any error
    */
-  def getDataAndVersion(path: String): (Option[String], Int) = {
+  def getDataAndVersion(path: String): (Option[Array[Byte]], Int) = {
     val (data, stat) = getDataAndStat(path)
     stat match {
       case ZkStat.NoStat => (data, ZkVersion.NoVersion)
@@ -492,22 +501,16 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: T
   /**
    * Gets the data and Stat at the given zk path
    * @param path zk node path
-   * @return A tuple of 2 elements, where first element is zk node data as string
+   * @return A tuple of 2 elements, where first element is zk node data as an array of bytes
    *         and second element is zk node stats.
    *         returns (None, ZkStat.NoStat) if node doesn't exists and throws exception for any error
    */
-  def getDataAndStat(path: String): (Option[String], Stat) = {
+  def getDataAndStat(path: String): (Option[Array[Byte]], Stat) = {
     val getDataRequest = GetDataRequest(path)
     val getDataResponse = retryRequestUntilConnected(getDataRequest)
 
     getDataResponse.resultCode match {
-      case Code.OK =>
-        if (getDataResponse.data == null)
-          (None, getDataResponse.stat)
-        else {
-          val data = Option(getDataResponse.data).map(new String(_, UTF_8))
-          (data, getDataResponse.stat)
-        }
+      case Code.OK => (Option(getDataResponse.data), getDataResponse.stat)
       case Code.NONODE => (None, ZkStat.NoStat)
       case _ => throw getDataResponse.resultException.get
     }
@@ -535,10 +538,10 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: T
    * since the previous update may have succeeded (but the stored zkVersion no longer matches the expected one).
    * In this case, we will run the optionalChecker to further check if the previous write did indeed succeeded.
    */
-  def conditionalUpdatePath(path: String, data: String, expectVersion: Int,
-                            optionalChecker:Option[(KafkaZkClient, String, String) => (Boolean,Int)] = None): (Boolean, Int) = {
+  def conditionalUpdatePath(path: String, data: Array[Byte], expectVersion: Int,
+                            optionalChecker: Option[(KafkaZkClient, String, Array[Byte]) => (Boolean,Int)] = None): (Boolean, Int) = {
 
-    val setDataRequest = SetDataRequest(path, data.getBytes(UTF_8), expectVersion)
+    val setDataRequest = SetDataRequest(path, data, expectVersion)
     val setDataResponse = retryRequestUntilConnected(setDataRequest)
 
     setDataResponse.resultCode match {
@@ -578,7 +581,6 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: T
     createRecursive(DeleteTopicsTopicZNode.path(topicName))
   }
 
-
   /**
    * Checks if topic is marked for deletion
    * @param topic
@@ -938,10 +940,16 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: T
   def propagateLogDirEvent(brokerId: Int) {
     val logDirEventNotificationPath: String = createSequentialPersistentPath(
       LogDirEventNotificationZNode.path + "/" + LogDirEventNotificationSequenceZNode.SequenceNumberPrefix,
-      new String(LogDirEventNotificationSequenceZNode.encode(brokerId), UTF_8))
+      LogDirEventNotificationSequenceZNode.encode(brokerId))
     debug(s"Added $logDirEventNotificationPath for broker $brokerId")
   }
 
+  def propagateIsrChanges(isrChangeSet: collection.Set[TopicPartition]): Unit = {
+    val isrChangeNotificationPath: String = createSequentialPersistentPath(IsrChangeNotificationSequenceZNode.path(),
+      IsrChangeNotificationSequenceZNode.encode(isrChangeSet))
+    debug(s"Added $isrChangeNotificationPath for $isrChangeSet")
+  }
+
   /**
    * Deletes all Acl change notifications.
    * @throws KeeperException if there is an error while deleting Acl change notifications

http://git-wip-us.apache.org/repos/asf/kafka/blob/b73c765d/core/src/main/scala/kafka/zk/ZkData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index be6efca..8bd32d0 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -30,13 +30,6 @@ import org.apache.zookeeper.data.Stat
 
 // This file contains objects for encoding/decoding data stored in ZooKeeper nodes (znodes).
 
-object LeaderAndIsrZNode {
-  def encode(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int): String = {
-    Json.encode(Map("version" -> 1, "leader" -> leaderAndIsr.leader, "leader_epoch" -> leaderAndIsr.leaderEpoch,
-                "controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr))
-  }
-}
-
 object ControllerZNode {
   def path = "/controller"
   def encode(brokerId: Int, timestamp: Long): Array[Byte] =
@@ -174,8 +167,8 @@ object IsrChangeNotificationZNode {
 
 object IsrChangeNotificationSequenceZNode {
   val SequenceNumberPrefix = "isr_change_"
-  def path(sequenceNumber: String) = s"${IsrChangeNotificationZNode.path}/$SequenceNumberPrefix$sequenceNumber"
-  def encode(partitions: Set[TopicPartition]): Array[Byte] = {
+  def path(sequenceNumber: String = "") = s"${IsrChangeNotificationZNode.path}/$SequenceNumberPrefix$sequenceNumber"
+  def encode(partitions: collection.Set[TopicPartition]): Array[Byte] = {
     val partitionsJson = partitions.map(partition => Map("topic" -> partition.topic, "partition" -> partition.partition))
     Json.encodeAsBytes(Map("version" -> IsrChangeNotificationHandler.Version, "partitions" -> partitionsJson))
   }
@@ -317,4 +310,4 @@ object AclChangeNotificationSequenceZNode {
   def deletePath(sequenceNode: String) = s"${AclChangeNotificationZNode.path}/${sequenceNode}"
   def encode(resourceName : String): Array[Byte] = resourceName.getBytes(UTF_8)
   def decode(bytes: Array[Byte]): String = new String(bytes, UTF_8)
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b73c765d/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
index 99550d5..e46bd9b 100644
--- a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
@@ -16,6 +16,8 @@
  */
 package kafka.common
 
+import java.nio.charset.StandardCharsets
+
 import kafka.utils.TestUtils
 import kafka.zk.{AclChangeNotificationSequenceZNode, AclChangeNotificationZNode, ZooKeeperTestHarness}
 import org.junit.Test
@@ -27,8 +29,8 @@ class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness {
     @volatile var notification: String = null
     @volatile var invocationCount = 0
     val notificationHandler = new NotificationHandler {
-      override def processNotification(notificationMessage: String): Unit = {
-        notification = notificationMessage
+      override def processNotification(notificationMessage: Array[Byte]): Unit = {
+        notification = new String(notificationMessage, StandardCharsets.UTF_8)
         invocationCount += 1
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b73c765d/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
index c5b42d4..88aebd3 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
@@ -34,17 +34,17 @@ class ProducerIdManagerTest {
   @Test
   def testGetProducerId() {
     var zkVersion: Option[Int] = None
-    var data: String = null
-    EasyMock.expect(zkClient.getDataAndVersion(EasyMock.anyString)).andAnswer(new IAnswer[(Option[String], Int)] {
-      override def answer(): (Option[String], Int) = zkVersion.map(Some(data) -> _).getOrElse(None, 0)
+    var data: Array[Byte] = null
+    EasyMock.expect(zkClient.getDataAndVersion(EasyMock.anyString)).andAnswer(new IAnswer[(Option[Array[Byte]], Int)] {
+      override def answer(): (Option[Array[Byte]], Int) = zkVersion.map(Some(data) -> _).getOrElse(None, 0)
     }).anyTimes()
 
     val capturedVersion: Capture[Int] = EasyMock.newCapture()
-    val capturedData: Capture[String] = EasyMock.newCapture()
+    val capturedData: Capture[Array[Byte]] = EasyMock.newCapture()
     EasyMock.expect(zkClient.conditionalUpdatePath(EasyMock.anyString(),
       EasyMock.capture(capturedData),
       EasyMock.capture(capturedVersion),
-      EasyMock.anyObject[Option[(KafkaZkClient, String, String) => (Boolean, Int)]])).andAnswer(new IAnswer[(Boolean, Int)] {
+      EasyMock.anyObject[Option[(KafkaZkClient, String, Array[Byte]) => (Boolean, Int)]])).andAnswer(new IAnswer[(Boolean, Int)] {
         override def answer(): (Boolean, Int) = {
           val newZkVersion = capturedVersion.getValue + 1
           zkVersion = Some(newZkVersion)
@@ -76,8 +76,8 @@ class ProducerIdManagerTest {
 
   @Test(expected = classOf[KafkaException])
   def testExceedProducerIdLimit() {
-    EasyMock.expect(zkClient.getDataAndVersion(EasyMock.anyString)).andAnswer(new IAnswer[(Option[String], Int)] {
-      override def answer(): (Option[String], Int) = {
+    EasyMock.expect(zkClient.getDataAndVersion(EasyMock.anyString)).andAnswer(new IAnswer[(Option[Array[Byte]], Int)] {
+      override def answer(): (Option[Array[Byte]], Int) = {
         val json = ProducerIdManager.generateProducerIdBlockJson(
           ProducerIdBlock(0, Long.MaxValue - ProducerIdManager.PidBlockSize, Long.MaxValue))
         (Some(json), 0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b73c765d/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 85bd6a1..61da420 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -16,6 +16,7 @@
   */
 package kafka.server
 
+import java.nio.charset.StandardCharsets
 import java.util.Properties
 
 import kafka.log.LogConfig._
@@ -189,12 +190,12 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
 
     val configManager = new DynamicConfigManager(zkClient, Map(ConfigType.Topic -> handler))
     // Notifications created using the old TopicConfigManager are ignored.
-    configManager.ConfigChangedNotificationHandler.processNotification("not json")
+    configManager.ConfigChangedNotificationHandler.processNotification("not json".getBytes(StandardCharsets.UTF_8))
 
     // Incorrect Map. No version
     try {
       val jsonMap = Map("v" -> 1, "x" -> 2)
-      configManager.ConfigChangedNotificationHandler.processNotification(Json.encode(jsonMap))
+      configManager.ConfigChangedNotificationHandler.processNotification(Json.encodeAsBytes(jsonMap))
       fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap)
     }
     catch {
@@ -203,7 +204,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     // Version is provided. EntityType is incorrect
     try {
       val jsonMap = Map("version" -> 1, "entity_type" -> "garbage", "entity_name" -> "x")
-      configManager.ConfigChangedNotificationHandler.processNotification(Json.encode(jsonMap))
+      configManager.ConfigChangedNotificationHandler.processNotification(Json.encodeAsBytes(jsonMap))
       fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap)
     }
     catch {
@@ -213,7 +214,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     // EntityName isn't provided
     try {
       val jsonMap = Map("version" -> 1, "entity_type" -> ConfigType.Topic)
-      configManager.ConfigChangedNotificationHandler.processNotification(Json.encode(jsonMap))
+      configManager.ConfigChangedNotificationHandler.processNotification(Json.encodeAsBytes(jsonMap))
       fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap)
     }
     catch {
@@ -222,7 +223,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
 
     // Everything is provided
     val jsonMap = Map("version" -> 1, "entity_type" -> ConfigType.Topic, "entity_name" -> "x")
-    configManager.ConfigChangedNotificationHandler.processNotification(Json.encode(jsonMap))
+    configManager.ConfigChangedNotificationHandler.processNotification(Json.encodeAsBytes(jsonMap))
 
     // Verify that processConfigChanges was only called once
     EasyMock.verify(handler)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b73c765d/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
index ebab756..987160d 100644
--- a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
@@ -17,7 +17,6 @@
 
 package kafka.utils
 
-import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.server.{KafkaConfig, ReplicaFetcherManager}
 import kafka.api.LeaderAndIsr
 import kafka.zk.ZooKeeperTestHarness
@@ -26,23 +25,17 @@ import org.junit.Assert._
 import org.junit.{Before, Test}
 import org.easymock.EasyMock
 
-
 class ReplicationUtilsTest extends ZooKeeperTestHarness {
-  val topic = "my-topic-test"
-  val partitionId = 0
-  val brokerId = 1
-  val leaderEpoch = 1
-  val controllerEpoch = 1
-  val zkVersion = 1
-  val topicPath = "/brokers/topics/my-topic-test/partitions/0/state"
-  val topicData = Json.encode(Map("controller_epoch" -> 1, "leader" -> 1,
-    "versions" -> 1, "leader_epoch" -> 1,"isr" -> List(1,2)))
-  val topicDataVersionMismatch = Json.encode(Map("controller_epoch" -> 1, "leader" -> 1,
-    "versions" -> 2, "leader_epoch" -> 1,"isr" -> List(1,2)))
-  val topicDataMismatch = Json.encode(Map("controller_epoch" -> 1, "leader" -> 1,
-    "versions" -> 2, "leader_epoch" -> 2,"isr" -> List(1,2)))
-
-  val topicDataLeaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(1,leaderEpoch,List(1,2),0), controllerEpoch)
+  private val zkVersion = 1
+  private val topic = "my-topic-test"
+  private val partition = 0
+  private val leader = 1
+  private val leaderEpoch = 1
+  private val controllerEpoch = 1
+  private val isr = List(1, 2)
+  private val topicPath = s"/brokers/topics/$topic/partitions/$partition/state"
+  private val topicData = Json.encode(Map("controller_epoch" -> controllerEpoch, "leader" -> leader,
+    "versions" -> 1, "leader_epoch" -> leaderEpoch, "isr" -> isr))
 
   @Before
   override def setUp() {
@@ -59,7 +52,7 @@ class ReplicationUtilsTest extends ZooKeeperTestHarness {
     EasyMock.replay(log)
 
     val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
-    EasyMock.expect(logManager.getLog(new TopicPartition(topic, partitionId), false)).andReturn(Some(log)).anyTimes()
+    EasyMock.expect(logManager.getLog(new TopicPartition(topic, partition), false)).andReturn(Some(log)).anyTimes()
     EasyMock.replay(logManager)
 
     val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager])
@@ -71,36 +64,29 @@ class ReplicationUtilsTest extends ZooKeeperTestHarness {
 
     zkUtils.makeSurePersistentPathExists(ZkUtils.IsrChangeNotificationPath)
 
-    val replicas = List(0,1)
+    val replicas = List(0, 1)
 
     // regular update
-    val newLeaderAndIsr1 = new LeaderAndIsr(brokerId, leaderEpoch, replicas, 0)
-    val (updateSucceeded1,newZkVersion1) = ReplicationUtils.updateLeaderAndIsr(zkClient,
-      "my-topic-test", partitionId, newLeaderAndIsr1, controllerEpoch, 0)
+    val newLeaderAndIsr1 = new LeaderAndIsr(leader, leaderEpoch, replicas, 0)
+    val (updateSucceeded1, newZkVersion1) = ReplicationUtils.updateLeaderAndIsr(zkClient,
+      new TopicPartition(topic, partition), newLeaderAndIsr1, controllerEpoch)
     assertTrue(updateSucceeded1)
     assertEquals(newZkVersion1, 1)
 
     // mismatched zkVersion with the same data
-    val newLeaderAndIsr2 = new LeaderAndIsr(brokerId, leaderEpoch, replicas, zkVersion + 1)
-    val (updateSucceeded2,newZkVersion2) = ReplicationUtils.updateLeaderAndIsr(zkClient,
-      "my-topic-test", partitionId, newLeaderAndIsr2, controllerEpoch, zkVersion + 1)
+    val newLeaderAndIsr2 = new LeaderAndIsr(leader, leaderEpoch, replicas, zkVersion + 1)
+    val (updateSucceeded2, newZkVersion2) = ReplicationUtils.updateLeaderAndIsr(zkClient,
+      new TopicPartition(topic, partition), newLeaderAndIsr2, controllerEpoch)
     assertTrue(updateSucceeded2)
     // returns true with existing zkVersion
-    assertEquals(newZkVersion2,1)
+    assertEquals(newZkVersion2, 1)
 
     // mismatched zkVersion and leaderEpoch
-    val newLeaderAndIsr3 = new LeaderAndIsr(brokerId, leaderEpoch + 1, replicas, zkVersion + 1)
-    val (updateSucceeded3,newZkVersion3) = ReplicationUtils.updateLeaderAndIsr(zkClient,
-      "my-topic-test", partitionId, newLeaderAndIsr3, controllerEpoch, zkVersion + 1)
+    val newLeaderAndIsr3 = new LeaderAndIsr(leader, leaderEpoch + 1, replicas, zkVersion + 1)
+    val (updateSucceeded3, newZkVersion3) = ReplicationUtils.updateLeaderAndIsr(zkClient,
+      new TopicPartition(topic, partition), newLeaderAndIsr3, controllerEpoch)
     assertFalse(updateSucceeded3)
-    assertEquals(newZkVersion3,-1)
-  }
-
-  @Test
-  def testGetLeaderIsrAndEpochForPartition() {
-    val leaderIsrAndControllerEpoch = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partitionId)
-    assertEquals(topicDataLeaderIsrAndControllerEpoch, leaderIsrAndControllerEpoch.get)
-    assertEquals(None, ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partitionId + 1))
+    assertEquals(newZkVersion3, -1)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b73c765d/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
index 1ad37fc..ecd0706 100755
--- a/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
@@ -17,7 +17,9 @@
 
 package kafka.utils
 
+import kafka.api.LeaderAndIsr
 import kafka.common.TopicAndPartition
+import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.zk.ZooKeeperTestHarness
 import org.junit.Assert._
 import org.junit.Test
@@ -84,4 +86,25 @@ class ZkUtilsTest extends ZooKeeperTestHarness {
 
     assertEquals(Set(TopicAndPartition(topic, 0)), zkUtils.getAllPartitions())
   }
+
+  @Test
+  def testGetLeaderIsrAndEpochForPartition() {
+    val topic = "my-topic-test"
+    val partition = 0
+    val leader = 1
+    val leaderEpoch = 1
+    val controllerEpoch = 1
+    val isr = List(1, 2)
+    val topicPath = s"/brokers/topics/$topic/partitions/$partition/state"
+    val topicData = Json.encode(Map("controller_epoch" -> controllerEpoch, "leader" -> leader,
+      "versions" -> 1, "leader_epoch" -> leaderEpoch, "isr" -> isr))
+    zkUtils.createPersistentPath(topicPath, topicData)
+
+    val leaderIsrAndControllerEpoch = zkUtils.getLeaderIsrAndEpochForPartition(topic, partition)
+    val topicDataLeaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, leaderEpoch, isr, 0),
+      controllerEpoch)
+    assertEquals(topicDataLeaderIsrAndControllerEpoch, leaderIsrAndControllerEpoch.get)
+    assertEquals(None, zkUtils.getLeaderIsrAndEpochForPartition(topic, partition + 1))
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b73c765d/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 28d8430..ae0faed 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -17,6 +17,7 @@
 package kafka.zk
 
 import java.util.{Properties, UUID}
+import java.nio.charset.StandardCharsets.UTF_8
 
 import kafka.log.LogConfig
 import kafka.security.auth._
@@ -129,23 +130,23 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     val path = "/testpath"
 
     // test with non-existing path
-    var dataAndVersion = zkClient.getDataAndVersion(path)
-    assertTrue(dataAndVersion._1.isEmpty)
-    assertEquals(-1, dataAndVersion._2)
+    val (data0, version0) = zkClient.getDataAndVersion(path)
+    assertTrue(data0.isEmpty)
+    assertEquals(-1, version0)
 
     // create a test path
     zkClient.createRecursive(path)
-    zkClient.conditionalUpdatePath(path, "version1", 0)
+    zkClient.conditionalUpdatePath(path, "version1".getBytes(UTF_8), 0)
 
     // test with existing path
-    dataAndVersion = zkClient.getDataAndVersion(path)
-    assertEquals("version1", dataAndVersion._1.get)
-    assertEquals(1, dataAndVersion._2)
-
-    zkClient.conditionalUpdatePath(path, "version2", 1)
-    dataAndVersion = zkClient.getDataAndVersion(path)
-    assertEquals("version2", dataAndVersion._1.get)
-    assertEquals(2, dataAndVersion._2)
+    val (data1, version1) = zkClient.getDataAndVersion(path)
+    assertEquals("version1", new String(data1.get, UTF_8))
+    assertEquals(1, version1)
+
+    zkClient.conditionalUpdatePath(path, "version2".getBytes(UTF_8), 1)
+    val (data2, version2) = zkClient.getDataAndVersion(path)
+    assertEquals("version2", new String(data2.get, UTF_8))
+    assertEquals(2, version2)
   }
 
   @Test
@@ -153,7 +154,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     val path = "/testconditionalpath"
 
     // test with non-existing path
-    var statusAndVersion = zkClient.conditionalUpdatePath(path, "version0", 0)
+    var statusAndVersion = zkClient.conditionalUpdatePath(path, "version0".getBytes(UTF_8), 0)
     assertFalse(statusAndVersion._1)
     assertEquals(-1, statusAndVersion._2)
 
@@ -161,17 +162,72 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     zkClient.createRecursive(path)
 
     // test with valid expected version
-    statusAndVersion = zkClient.conditionalUpdatePath(path, "version1", 0)
+    statusAndVersion = zkClient.conditionalUpdatePath(path, "version1".getBytes(UTF_8), 0)
     assertTrue(statusAndVersion._1)
     assertEquals(1, statusAndVersion._2)
 
     // test with invalid expected version
-    statusAndVersion = zkClient.conditionalUpdatePath(path, "version2", 2)
+    statusAndVersion = zkClient.conditionalUpdatePath(path, "version2".getBytes(UTF_8), 2)
     assertFalse(statusAndVersion._1)
     assertEquals(-1, statusAndVersion._2)
   }
 
   @Test
+  def testCreateSequentialPersistentPath(): Unit = {
+    val path = "/testpath"
+    zkClient.createRecursive(path)
+
+    var result = zkClient.createSequentialPersistentPath(path + "/sequence_", null)
+    assertEquals(s"$path/sequence_0000000000", result)
+    assertTrue(zkClient.pathExists(s"$path/sequence_0000000000"))
+    assertEquals(None, dataAsString(s"$path/sequence_0000000000"))
+
+    result = zkClient.createSequentialPersistentPath(path + "/sequence_", "some value".getBytes(UTF_8))
+    assertEquals(s"$path/sequence_0000000001", result)
+    assertTrue(zkClient.pathExists(s"$path/sequence_0000000001"))
+    assertEquals(Some("some value"), dataAsString(s"$path/sequence_0000000001"))
+  }
+
+  @Test
+  def testPropagateIsrChanges(): Unit = {
+    zkClient.createRecursive("/isr_change_notification")
+
+    zkClient.propagateIsrChanges(Set(new TopicPartition("topic-a", 0), new TopicPartition("topic-b", 0)))
+    var expectedPath = "/isr_change_notification/isr_change_0000000000"
+    assertTrue(zkClient.pathExists(expectedPath))
+    assertEquals(Some("""{"version":1,"partitions":[{"topic":"topic-a","partition":0},{"topic":"topic-b","partition":0}]}"""),
+      dataAsString(expectedPath))
+
+    zkClient.propagateIsrChanges(Set(new TopicPartition("topic-b", 0)))
+    expectedPath = "/isr_change_notification/isr_change_0000000001"
+    assertTrue(zkClient.pathExists(expectedPath))
+    assertEquals(Some("""{"version":1,"partitions":[{"topic":"topic-b","partition":0}]}"""), dataAsString(expectedPath))
+  }
+
+  @Test
+  def testPropagateLogDir(): Unit = {
+    zkClient.createRecursive("/log_dir_event_notification")
+
+    val brokerId = 3
+
+    zkClient.propagateLogDirEvent(brokerId)
+    var expectedPath = "/log_dir_event_notification/log_dir_event_0000000000"
+    assertTrue(zkClient.pathExists(expectedPath))
+    assertEquals(Some("""{"version":1,"broker":3,"event":1}"""), dataAsString(expectedPath))
+
+    zkClient.propagateLogDirEvent(brokerId)
+    expectedPath = "/log_dir_event_notification/log_dir_event_0000000001"
+    assertTrue(zkClient.pathExists(expectedPath))
+    assertEquals(Some("""{"version":1,"broker":3,"event":1}"""), dataAsString(expectedPath))
+
+    val anotherBrokerId = 4
+    zkClient.propagateLogDirEvent(anotherBrokerId)
+    expectedPath = "/log_dir_event_notification/log_dir_event_0000000002"
+    assertTrue(zkClient.pathExists(expectedPath))
+    assertEquals(Some("""{"version":1,"broker":4,"event":1}"""), dataAsString(expectedPath))
+  }
+
+  @Test
   def testSetGetAndDeletePartitionReassignment() {
     zkClient.createRecursive(AdminZNode.path)
 
@@ -202,23 +258,23 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     val path = "/testpath"
 
     // test with non-existing path
-    var dataAndVersion = zkClient.getDataAndStat(path)
-    assertTrue(dataAndVersion._1.isEmpty)
-    assertEquals(0, dataAndVersion._2.getVersion)
+    val (data0, version0) = zkClient.getDataAndStat(path)
+    assertTrue(data0.isEmpty)
+    assertEquals(0, version0.getVersion)
 
     // create a test path
     zkClient.createRecursive(path)
-    zkClient.conditionalUpdatePath(path, "version1", 0)
+    zkClient.conditionalUpdatePath(path, "version1".getBytes(UTF_8), 0)
 
     // test with existing path
-    dataAndVersion = zkClient.getDataAndStat(path)
-    assertEquals("version1", dataAndVersion._1.get)
-    assertEquals(1, dataAndVersion._2.getVersion)
-
-    zkClient.conditionalUpdatePath(path, "version2", 1)
-    dataAndVersion = zkClient.getDataAndStat(path)
-    assertEquals("version2", dataAndVersion._1.get)
-    assertEquals(2, dataAndVersion._2.getVersion)
+    val (data1, version1) = zkClient.getDataAndStat(path)
+    assertEquals("version1", new String(data1.get, UTF_8))
+    assertEquals(1, version1.getVersion)
+
+    zkClient.conditionalUpdatePath(path, "version2".getBytes(UTF_8), 1)
+    val (data2, version2) = zkClient.getDataAndStat(path)
+    assertEquals("version2", new String(data2.get, UTF_8))
+    assertEquals(2, version2.getVersion)
   }
 
   @Test
@@ -353,4 +409,10 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     zkClient.deleteTopicConfigs(Seq(topic1, topic2))
     assertTrue(zkClient.getEntityConfigs(ConfigType.Topic, topic1).isEmpty)
   }
+
+  private def dataAsString(path: String): Option[String] = {
+    val (data, _) = zkClient.getDataAndStat(path)
+    data.map(new String(_, UTF_8))
+  }
+
 }


Mime
View raw message