kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [23/37] git commit: standardizing json values stored in ZK; patched by Swapnil Ghike; reviewed by Jun Rao; kafka-755
Date Mon, 04 Mar 2013 04:22:01 GMT
standardizing json values stored in ZK; patched by Swapnil Ghike; reviewed by Jun Rao; kafka-755


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/828ce83d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/828ce83d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/828ce83d

Branch: refs/heads/trunk
Commit: 828ce83dcb442d1cc6c0dbc538d12bbb04a0884d
Parents: 144a0a2
Author: Jun Rao <junrao@gmail.com>
Authored: Fri Feb 22 16:43:54 2013 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Fri Feb 22 16:43:54 2013 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/admin/AdminUtils.scala   |   16 +-
 .../scala/kafka/admin/CreateTopicCommand.scala     |   10 +-
 .../PreferredReplicaLeaderElectionCommand.scala    |    6 +-
 .../kafka/admin/ReassignPartitionsCommand.scala    |    3 +-
 core/src/main/scala/kafka/api/FetchRequest.scala   |    2 +-
 .../main/scala/kafka/api/LeaderAndIsrRequest.scala |    2 +-
 .../main/scala/kafka/api/StopReplicaRequest.scala  |    2 +-
 .../scala/kafka/api/TopicMetadataRequest.scala     |    2 +-
 core/src/main/scala/kafka/cluster/Broker.scala     |   18 ++-
 .../src/main/scala/kafka/consumer/TopicCount.scala |  127 +++++-------
 .../consumer/ZookeeperConsumerConnector.scala      |    8 +-
 .../scala/kafka/controller/KafkaController.scala   |    3 +-
 core/src/main/scala/kafka/utils/Utils.scala        |   66 +++++--
 core/src/main/scala/kafka/utils/ZkUtils.scala      |  151 +++++++--------
 .../test/scala/unit/kafka/admin/AdminTest.scala    |   77 ++++----
 15 files changed, 248 insertions(+), 245 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/828ce83d/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 8e44cb0..93a634e 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -20,7 +20,7 @@ package kafka.admin
 import java.util.Random
 import kafka.api.{TopicMetadata, PartitionMetadata}
 import kafka.cluster.Broker
-import kafka.utils.{Logging, Utils, ZkUtils}
+import kafka.utils.{Logging, ZkUtils}
 import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import scala.collection._
@@ -50,9 +50,9 @@ object AdminUtils extends Logging {
    * p3        p4        p0        p1        p2       (3nd replica)
    * p7        p8        p9        p5        p6       (3nd replica)
    */
-  def assignReplicasToBrokers(brokerList: Seq[String], nPartitions: Int, replicationFactor: Int,
+  def assignReplicasToBrokers(brokerList: Seq[Int], nPartitions: Int, replicationFactor: Int,
                               fixedStartIndex: Int = -1)  // for testing only
-  : Map[Int, Seq[String]] = {
+  : Map[Int, Seq[Int]] = {
     if (nPartitions <= 0)
       throw new AdministrationException("number of partitions must be larger than 0")
     if (replicationFactor <= 0)
@@ -60,7 +60,7 @@ object AdminUtils extends Logging {
     if (replicationFactor > brokerList.size)
       throw new AdministrationException("replication factor: " + replicationFactor +
         " larger than available brokers: " + brokerList.size)
-    val ret = new mutable.HashMap[Int, List[String]]()
+    val ret = new mutable.HashMap[Int, List[Int]]()
     val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
 
     var secondReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
@@ -76,12 +76,12 @@ object AdminUtils extends Logging {
     ret.toMap
   }
 
-  def createTopicPartitionAssignmentPathInZK(topic: String, replicaAssignment: Map[Int, Seq[String]], zkClient: ZkClient) {
+  def createTopicPartitionAssignmentPathInZK(topic: String, replicaAssignment: Map[Int, Seq[Int]], zkClient: ZkClient) {
     try {
       val zkPath = ZkUtils.getTopicPath(topic)
-      val jsonPartitionMap = Utils.mapToJson(replicaAssignment.map(e => (e._1.toString -> e._2)))
-      ZkUtils.createPersistentPath(zkClient, zkPath, jsonPartitionMap)
-      debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap))
+      val jsonPartitionData = ZkUtils.replicaAssignmentZkdata(replicaAssignment.map(e => (e._1.toString -> e._2)))
+      ZkUtils.createPersistentPath(zkClient, zkPath, jsonPartitionData)
+      debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData))
     } catch {
       case e: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic))
       case e2 => throw new AdministrationException(e2.toString)

http://git-wip-us.apache.org/repos/asf/kafka/blob/828ce83d/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
index fd3a397..e762115 100644
--- a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
@@ -48,8 +48,8 @@ object CreateTopicCommand extends Logging {
                            .defaultsTo(1)
     val replicaAssignmentOpt = parser.accepts("replica-assignment-list", "for manually assigning replicas to brokers")
                            .withRequiredArg
-                           .describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2 , " +
-                                        "broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...")
+                           .describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2, " +
+                                        "broker_id_for_part2_replica1 : broker_id_for_part2_replica2, ...")
                            .ofType(classOf[String])
                            .defaultsTo("")
 
@@ -96,11 +96,11 @@ object CreateTopicCommand extends Logging {
     AdminUtils.createTopicPartitionAssignmentPathInZK(topic, partitionReplicaAssignment, zkClient)
   }
 
-  def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[String]): Map[Int, List[String]] = {
+  def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int]): Map[Int, List[Int]] = {
     val partitionList = replicaAssignmentList.split(",")
-    val ret = new mutable.HashMap[Int, List[String]]()
+    val ret = new mutable.HashMap[Int, List[Int]]()
     for (i <- 0 until partitionList.size) {
-      val brokerList = partitionList(i).split(":").map(s => s.trim())
+      val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
       if (brokerList.size <= 0)
         throw new AdministrationException("replication factor must be larger than 0")
       if (brokerList.size != brokerList.toSet.size)

http://git-wip-us.apache.org/repos/asf/kafka/blob/828ce83d/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index 7b40019..ebcf669 100644
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -92,9 +92,9 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
   def writePreferredReplicaElectionData(zkClient: ZkClient,
                                         partitionsUndergoingPreferredReplicaElection: scala.collection.Set[TopicAndPartition]) {
     val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath
-    val jsonData = Utils.arrayToJson(partitionsUndergoingPreferredReplicaElection.map { p =>
-      Utils.stringMapToJson(Map(("topic" -> p.topic), ("partition" -> p.partition.toString)))
-    }.toArray)
+    val jsonData = Utils.seqToJson(partitionsUndergoingPreferredReplicaElection.map { p =>
+      Utils.mapToJson(Map(("topic" -> p.topic), ("partition" -> p.partition.toString)), valueInQuotes = true)
+    }.toSeq.sorted, valueInQuotes = false)
     try {
       ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
       info("Created preferred replica election path with %s".format(jsonData))

http://git-wip-us.apache.org/repos/asf/kafka/blob/828ce83d/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index f7e7b72..b2204b8 100644
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -99,8 +99,7 @@ class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.immut
   def reassignPartitions(): Boolean = {
     try {
       val validPartitions = partitions.filter(p => validatePartition(zkClient, p._1.topic, p._1.partition))
-      val jsonReassignmentData = Utils.mapToJson(validPartitions.map(p =>
-        ("%s,%s".format(p._1.topic, p._1.partition)) -> p._2.map(_.toString)))
+      val jsonReassignmentData = Utils.mapWithSeqValuesToJson(validPartitions.map(p => ("%s,%s".format(p._1.topic, p._1.partition)) -> p._2))
       ZkUtils.createPersistentPath(zkClient, ZkUtils.ReassignPartitionsPath, jsonReassignmentData)
       true
     }catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/828ce83d/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index 19c961e..dc4ed8e 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -24,7 +24,7 @@ import scala.collection.immutable.Map
 import kafka.common.{ErrorMapping, TopicAndPartition}
 import kafka.consumer.ConsumerConfig
 import java.util.concurrent.atomic.AtomicInteger
-import kafka.network.{RequestChannel}
+import kafka.network.RequestChannel
 
 
 case class PartitionFetchInfo(offset: Long, fetchSize: Int)

http://git-wip-us.apache.org/repos/asf/kafka/blob/828ce83d/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index 616f679..d146b14 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -41,7 +41,7 @@ case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int
     jsonDataMap.put("leader", leader.toString)
     jsonDataMap.put("leaderEpoch", leaderEpoch.toString)
     jsonDataMap.put("ISR", isr.mkString(","))
-    Utils.stringMapToJson(jsonDataMap)
+    Utils.mapToJson(jsonDataMap, valueInQuotes = true)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/828ce83d/core/src/main/scala/kafka/api/StopReplicaRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
index 0580636..be3c7be 100644
--- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala
+++ b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
@@ -23,7 +23,7 @@ import kafka.api.ApiUtils._
 import kafka.network.{BoundedByteBufferSend, RequestChannel, InvalidRequestException}
 import kafka.common.ErrorMapping
 import kafka.network.RequestChannel.Response
-import kafka.utils.{Logging}
+import kafka.utils.Logging
 
 
 object StopReplicaRequest extends Logging {

http://git-wip-us.apache.org/repos/asf/kafka/blob/828ce83d/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
index 824f8f1..88007b1 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
@@ -23,7 +23,7 @@ import collection.mutable.ListBuffer
 import kafka.network.{BoundedByteBufferSend, RequestChannel}
 import kafka.common.ErrorMapping
 import kafka.network.RequestChannel.Response
-import kafka.utils.{Logging}
+import kafka.utils.Logging
 
 object TopicMetadataRequest extends Logging {
   val CurrentVersion = 0.shortValue

http://git-wip-us.apache.org/repos/asf/kafka/blob/828ce83d/core/src/main/scala/kafka/cluster/Broker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index ffedecd..6e91d29 100644
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -18,9 +18,10 @@
 package kafka.cluster
 
 import kafka.utils.Utils._
+import kafka.utils.Json
 import kafka.api.ApiUtils._
 import java.nio.ByteBuffer
-import kafka.common.BrokerNotAvailableException
+import kafka.common.{KafkaException, BrokerNotAvailableException}
 
 /**
  * A Kafka broker
@@ -30,8 +31,19 @@ private[kafka] object Broker {
   def createBroker(id: Int, brokerInfoString: String): Broker = {
     if(brokerInfoString == null)
       throw new BrokerNotAvailableException("Broker id %s does not exist".format(id))
-    val brokerInfo = brokerInfoString.split(":")
-    new Broker(id, brokerInfo(0), brokerInfo(1).toInt)
+    try {
+      Json.parseFull(brokerInfoString) match {
+        case Some(m) =>
+          val brokerInfo = m.asInstanceOf[Map[String, Any]]
+          val host = brokerInfo.get("host").get.toString
+          val port = brokerInfo.get("port").get.asInstanceOf[Int]
+          new Broker(id, host, port)
+        case None =>
+          throw new BrokerNotAvailableException("Broker id %s does not exist".format(id))
+      }
+    } catch {
+      case t => throw new KafkaException("Failed to parse the broker info from zookeeper: " + brokerInfoString, t)
+    }
   }
 
   def readFrom(buffer: ByteBuffer): Broker = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/828ce83d/core/src/main/scala/kafka/consumer/TopicCount.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala
index 9cf171e..c8e8406 100644
--- a/core/src/main/scala/kafka/consumer/TopicCount.scala
+++ b/core/src/main/scala/kafka/consumer/TopicCount.scala
@@ -19,12 +19,14 @@ package kafka.consumer
 
 import scala.collection._
 import org.I0Itec.zkclient.ZkClient
-import java.util.regex.Pattern
 import kafka.utils.{Json, ZKGroupDirs, ZkUtils, Logging}
+import kafka.common.KafkaException
 
 private[kafka] trait TopicCount {
+
   def getConsumerThreadIdsPerTopic: Map[String, Set[String]]
   def dbString: String
+  def pattern: String
   
   protected def makeConsumerThreadIdsPerTopic(consumerIdString: String,
                                             topicCountMap: Map[String,  Int]) = {
@@ -41,75 +43,56 @@ private[kafka] trait TopicCount {
 }
 
 private[kafka] object TopicCount extends Logging {
+  val whiteListPattern = "white_list"
+  val blackListPattern = "black_list"
+  val staticPattern = "static"
 
-  /*
-   * Example of whitelist topic count stored in ZooKeeper:
-   * Topics with whitetopic as prefix, and four streams: *4*whitetopic.*
-   *
-   * Example of blacklist topic count stored in ZooKeeper:
-   * Topics with blacktopic as prefix, and four streams: !4!blacktopic.*
-   */
-
-  val WHITELIST_MARKER = "*"
-  val BLACKLIST_MARKER = "!"
-  private val WHITELIST_PATTERN =
-    Pattern.compile("""\*(\p{Digit}+)\*(.*)""")
-  private val BLACKLIST_PATTERN =
-    Pattern.compile("""!(\p{Digit}+)!(.*)""")
-
-  def constructTopicCount(group: String,
-                          consumerId: String,
-                          zkClient: ZkClient) : TopicCount = {
+  def constructTopicCount(group: String, consumerId: String, zkClient: ZkClient) : TopicCount = {
     val dirs = new ZKGroupDirs(group)
     val topicCountString = ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)._1
-    val hasWhitelist = topicCountString.startsWith(WHITELIST_MARKER)
-    val hasBlacklist = topicCountString.startsWith(BLACKLIST_MARKER)
-
-    if (hasWhitelist || hasBlacklist)
-      info("Constructing topic count for %s from %s using %s as pattern."
-        .format(consumerId, topicCountString,
-          if (hasWhitelist) WHITELIST_PATTERN else BLACKLIST_PATTERN))
-
-    if (hasWhitelist || hasBlacklist) {
-      val matcher = if (hasWhitelist)
-        WHITELIST_PATTERN.matcher(topicCountString)
-      else
-        BLACKLIST_PATTERN.matcher(topicCountString)
-      require(matcher.matches())
-      val numStreams = matcher.group(1).toInt
-      val regex = matcher.group(2)
-      val filter = if (hasWhitelist)
-        new Whitelist(regex)
-      else
-        new Blacklist(regex)
-
-      new WildcardTopicCount(zkClient, consumerId, filter, numStreams)
-    }
-    else {
-      var topMap : Map[String,Int] = null
-      try {
-        Json.parseFull(topicCountString) match {
-          case Some(m) => topMap = m.asInstanceOf[Map[String,Int]]
-          case None => throw new RuntimeException("error constructing TopicCount : " + topicCountString)
-        }
-      }
-      catch {
-        case e =>
-          error("error parsing consumer json string " + topicCountString, e)
-          throw e
+    var subscriptionPattern: String = null
+    var topMap: Map[String, Int] = null
+    try {
+      Json.parseFull(topicCountString) match {
+        case Some(m) =>
+          val consumerRegistrationMap = m.asInstanceOf[Map[String, Any]]
+          consumerRegistrationMap.get("pattern") match {
+            case Some(pattern) => subscriptionPattern = pattern.asInstanceOf[String]
+            case None => throw new KafkaException("error constructing TopicCount : " + topicCountString)
+          }
+          consumerRegistrationMap.get("subscription") match {
+            case Some(sub) => topMap = sub.asInstanceOf[Map[String, Int]]
+            case None => throw new KafkaException("error constructing TopicCount : " + topicCountString)
+          }
+        case None => throw new KafkaException("error constructing TopicCount : " + topicCountString)
       }
+    } catch {
+      case e =>
+        error("error parsing consumer json string " + topicCountString, e)
+        throw e
+    }
+
+    val hasWhiteList = whiteListPattern.equals(subscriptionPattern)
+    val hasBlackList = blackListPattern.equals(subscriptionPattern)
 
+    if (topMap.isEmpty || !(hasWhiteList || hasBlackList)) {
       new StaticTopicCount(consumerId, topMap)
+    } else {
+      val regex = topMap.head._1
+      val numStreams = topMap.head._2
+      val filter =
+        if (hasWhiteList)
+          new Whitelist(regex)
+        else
+          new Blacklist(regex)
+      new WildcardTopicCount(zkClient, consumerId, filter, numStreams)
     }
   }
 
-  def constructTopicCount(consumerIdString: String, topicCount: Map[String,  Int]) =
+  def constructTopicCount(consumerIdString: String, topicCount: Map[String, Int]) =
     new StaticTopicCount(consumerIdString, topicCount)
 
-  def constructTopicCount(consumerIdString: String,
-                          filter: TopicFilter,
-                          numStreams: Int,
-                          zkClient: ZkClient) =
+  def constructTopicCount(consumerIdString: String, filter: TopicFilter, numStreams: Int, zkClient: ZkClient) =
     new WildcardTopicCount(zkClient, consumerIdString, filter, numStreams)
 
 }
@@ -118,8 +101,7 @@ private[kafka] class StaticTopicCount(val consumerIdString: String,
                                 val topicCountMap: Map[String, Int])
                                 extends TopicCount {
 
-  def getConsumerThreadIdsPerTopic =
-    makeConsumerThreadIdsPerTopic(consumerIdString, topicCountMap)
+  def getConsumerThreadIdsPerTopic = makeConsumerThreadIdsPerTopic(consumerIdString, topicCountMap)
 
   override def equals(obj: Any): Boolean = {
     obj match {
@@ -132,8 +114,7 @@ private[kafka] class StaticTopicCount(val consumerIdString: String,
   /**
    *  return json of
    *  { "topic1" : 4,
-   *    "topic2" : 4
-   *  }
+   *    "topic2" : 4 }
    */
   def dbString = {
     val builder = new StringBuilder
@@ -148,6 +129,8 @@ private[kafka] class StaticTopicCount(val consumerIdString: String,
     builder.append(" }")
     builder.toString()
   }
+
+  def pattern = TopicCount.staticPattern
 }
 
 private[kafka] class WildcardTopicCount(zkClient: ZkClient,
@@ -155,19 +138,17 @@ private[kafka] class WildcardTopicCount(zkClient: ZkClient,
                                         topicFilter: TopicFilter,
                                         numStreams: Int) extends TopicCount {
   def getConsumerThreadIdsPerTopic = {
-    val wildcardTopics = ZkUtils.getChildrenParentMayNotExist(
-      zkClient, ZkUtils.BrokerTopicsPath).filter(topicFilter.isTopicAllowed(_))
-    makeConsumerThreadIdsPerTopic(consumerIdString,
-                                  Map(wildcardTopics.map((_, numStreams)): _*))
+    val wildcardTopics = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerTopicsPath).filter(topicFilter.isTopicAllowed(_))
+    makeConsumerThreadIdsPerTopic(consumerIdString, Map(wildcardTopics.map((_, numStreams)): _*))
   }
 
-  def dbString = {
-    val marker = topicFilter match {
-      case wl: Whitelist => TopicCount.WHITELIST_MARKER
-      case bl: Blacklist => TopicCount.BLACKLIST_MARKER
-    }
+  def dbString = "{ \"%s\" : %d }".format(topicFilter.regex, numStreams)
 
-    "%s%d%s%s".format(marker, numStreams, marker, topicFilter.regex)
+  def pattern: String = {
+    topicFilter match {
+      case wl: Whitelist => TopicCount.whiteListPattern
+      case bl: Blacklist => TopicCount.blackListPattern
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/828ce83d/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index a780a41..9db9a8b 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -33,7 +33,6 @@ import kafka.utils.ZkUtils._
 import kafka.common._
 import kafka.client.ClientUtils
 import com.yammer.metrics.core.Gauge
-import kafka.api.OffsetRequest
 import kafka.metrics._
 import scala.Some
 
@@ -213,9 +212,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
   private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) = {
     info("begin registering consumer " + consumerIdString + " in ZK")
-    createEphemeralPathExpectConflict(zkClient,
-                                      dirs.consumerRegistryDir + "/" + consumerIdString,
-                                      topicCount.dbString)
+    val consumerRegistrationInfo =
+      Utils.mergeJsonObjects(Seq(Utils.mapToJson(Map("version" -> 1.toString, "subscription" -> topicCount.dbString), valueInQuotes = false),
+                                 Utils.mapToJson(Map("pattern" -> topicCount.pattern), valueInQuotes = true)))
+    createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo)
     info("end registering consumer " + consumerIdString + " in ZK")
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/828ce83d/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index f1a12c0..4d253da 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -613,8 +613,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
                                          newReplicaAssignmentForTopic: Map[TopicAndPartition, Seq[Int]]) {
     try {
       val zkPath = ZkUtils.getTopicPath(topicAndPartition.topic)
-      val jsonPartitionMap = Utils.mapToJson(newReplicaAssignmentForTopic.map(e =>
-        (e._1.partition.toString -> e._2.map(_.toString))))
+      val jsonPartitionMap = ZkUtils.replicaAssignmentZkdata(newReplicaAssignmentForTopic.map(e => (e._1.partition.toString -> e._2)))
       ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionMap)
       debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap))
     } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/828ce83d/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index 0185c14..942d6c3 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -464,17 +464,20 @@ object Utils extends Logging {
   def nullOrEmpty(s: String): Boolean = s == null || s.equals("")
 
   /**
-   * Format a Map[String, String] as JSON
+   * Format a Map[String, String] as JSON object.
    */
-  def stringMapToJson(jsonDataMap: Map[String, String]): String = {
+  def mapToJson(jsonDataMap: Map[String, String], valueInQuotes: Boolean): String = {
     val builder = new StringBuilder
     builder.append("{ ")
     var numElements = 0
-    for ( (key, value) <- jsonDataMap) {
+    for ( (key, value) <- jsonDataMap.toList.sorted) {
       if (numElements > 0)
-        builder.append(",")
+        builder.append(", ")
       builder.append("\"" + key + "\":")
-      builder.append("\"" + value + "\"")
+      if (valueInQuotes)
+        builder.append("\"" + value + "\"")
+      else
+        builder.append(value)
       numElements += 1
     }
     builder.append(" }")
@@ -482,40 +485,61 @@ object Utils extends Logging {
   }
 
   /**
-   * Format an arbitrary map as JSON
+   * Format a Seq[String] as JSON array.
    */
-  def mapToJson[T <: Any](map: Map[String, Seq[String]]): String = {
+  def seqToJson(jsonData: Seq[String], valueInQuotes: Boolean): String = {
+    val builder = new StringBuilder
+    builder.append("[ ")
+    if (valueInQuotes)
+      builder.append(jsonData.map("\"" + _ + "\"")).mkString(", ")
+    else
+      builder.append(jsonData.mkString(", "))
+    builder.append(" ]")
+    builder.toString
+  }
+
+  /**
+   * Format a Map[String, Seq[Int]] as JSON
+   */
+
+  def mapWithSeqValuesToJson(jsonDataMap: Map[String, Seq[Int]]): String = {
     val builder = new StringBuilder
     builder.append("{ ")
     var numElements = 0
-    for ( (key, value) <- map ) {
+    for ((key, value) <- jsonDataMap.toList.sortBy(_._1)) {
       if (numElements > 0)
-        builder.append(",")
+        builder.append(", ")
       builder.append("\"" + key + "\": ")
-      builder.append("[%s]".format(value.map("\""+_+"\"").mkString(",")))
+      builder.append(Utils.seqToJson(value.map(_.toString), valueInQuotes = false))
       numElements += 1
     }
     builder.append(" }")
     builder.toString
   }
 
+
   /**
-   * Format a string array as json
+   * Merge arbitrary JSON objects.
    */
-  def arrayToJson[T <: Any](arr: Array[String]): String = {
+  def mergeJsonObjects(objects: Seq[String]): String = {
     val builder = new StringBuilder
-    builder.append("[ ")
-    var numElements = 0
-    for ( value <- arr ) {
-      if (numElements > 0)
-        builder.append(",")
-      builder.append(" " + value + "  ")
-      numElements += 1
-    }
-    builder.append(" ]")
+    builder.append("{ ")
+    var obs = List[String]()
+    objects.foreach(ob => obs = obs ::: getJsonContents(ob).split(',').toList)
+    obs = obs.sorted.map(_.trim)
+    builder.append(obs.mkString(", "))
+    builder.append(" }")
     builder.toString
   }
 
+  /**
+   * Get the contents of a JSON object or array.
+   */
+  def getJsonContents(str: String): String = {
+    str.trim().substring(1, str.length - 1)
+  }
+
+
 
   /**
    * Create a circular (looping) iterator over a collection.

http://git-wip-us.apache.org/repos/asf/kafka/blob/828ce83d/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 113ad37..f0aba12 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -22,21 +22,24 @@ import kafka.consumer.TopicCount
 import org.I0Itec.zkclient.{IZkDataListener, ZkClient}
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError}
 import org.I0Itec.zkclient.serialize.ZkSerializer
-import scala.collection._
+import collection._
 import kafka.api.LeaderAndIsr
-import mutable.HashMap
 import org.apache.zookeeper.data.Stat
 import java.util.concurrent.locks.{ReentrantLock, Condition}
 import kafka.admin._
-import kafka.common.{TopicAndPartition, KafkaException, NoEpochForPartitionException}
-import kafka.controller.{LeaderIsrAndControllerEpoch, PartitionAndReplica, ReassignedPartitionsContext}
+import kafka.common.{KafkaException, NoEpochForPartitionException}
+import kafka.controller.ReassignedPartitionsContext
+import kafka.controller.PartitionAndReplica
+import scala.Some
+import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.common.TopicAndPartition
 
 object ZkUtils extends Logging {
   val ConsumersPath = "/consumers"
   val BrokerIdsPath = "/brokers/ids"
   val BrokerTopicsPath = "/brokers/topics"
   val ControllerPath = "/controller"
-  val ControllerEpochPath = "/controllerEpoch"
+  val ControllerEpochPath = "/controller_epoch"
   val ReassignPartitionsPath = "/admin/reassign_partitions"
   val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"
 
@@ -60,15 +63,11 @@ object ZkUtils extends Logging {
   }
 
   def getTopicPartitionLeaderAndIsrPath(topic: String, partitionId: Int): String ={
-    getTopicPartitionPath(topic, partitionId) + "/" + "leaderAndISR"
+    getTopicPartitionPath(topic, partitionId) + "/" + "state"
   }
 
-  def getSortedBrokerList(zkClient: ZkClient): Seq[String] ={
-    ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted
-  }
-
-  def getAllLiveBrokerIds(zkClient: ZkClient): Set[Int] = {
-    ZkUtils.getChildren(zkClient, BrokerIdsPath).map(_.toInt).toSet
+  def getSortedBrokerList(zkClient: ZkClient): Seq[Int] ={
+    ZkUtils.getChildren(zkClient, BrokerIdsPath).map(_.toInt).sorted
   }
 
   def getAllBrokersInCluster(zkClient: ZkClient): Seq[Broker] = {
@@ -95,16 +94,15 @@ object ZkUtils extends Logging {
   : Option[LeaderIsrAndControllerEpoch] = {
     Json.parseFull(leaderAndIsrStr) match {
       case Some(m) =>
-        val leaderIsrAndEpochInfo = m.asInstanceOf[Map[String, String]]
-        val leader = leaderIsrAndEpochInfo.get("leader").get.toInt
-        val epoch = leaderIsrAndEpochInfo.get("leaderEpoch").get.toInt
-        val isrString = leaderIsrAndEpochInfo.get("ISR").get
-        val controllerEpoch = leaderIsrAndEpochInfo.get("controllerEpoch").get.toInt
-        val isr = Utils.parseCsvList(isrString).map(r => r.toInt)
+        val leaderIsrAndEpochInfo = m.asInstanceOf[Map[String, Any]]
+        val leader = leaderIsrAndEpochInfo.get("leader").get.asInstanceOf[Int]
+        val epoch = leaderIsrAndEpochInfo.get("leader_epoch").get.asInstanceOf[Int]
+        val isr = leaderIsrAndEpochInfo.get("isr").get.asInstanceOf[List[Int]]
+        val controllerEpoch = leaderIsrAndEpochInfo.get("controller_epoch").get.asInstanceOf[Int]
         val zkPathVersion = stat.getVersion
         debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for topic %s and partition %d".format(leader, epoch,
           isr.toString(), zkPathVersion, topic, partition))
-        Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr.toList, zkPathVersion), controllerEpoch))
+        Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion), controllerEpoch))
       case None => None
     }
   }
@@ -115,7 +113,7 @@ object ZkUtils extends Logging {
       case Some(leaderAndIsr) =>
         Json.parseFull(leaderAndIsr) match {
           case Some(m) =>
-            Some(m.asInstanceOf[Map[String, String]].get("leader").get.toInt)
+            Some(m.asInstanceOf[Map[String, Any]].get("leader").get.asInstanceOf[Int])
           case None => None
         }
       case None => None
@@ -133,7 +131,7 @@ object ZkUtils extends Logging {
       case Some(leaderAndIsr) =>
         Json.parseFull(leaderAndIsr) match {
           case None => throw new NoEpochForPartitionException("No epoch, leaderAndISR data for topic %s partition %d is invalid".format(topic, partition))
-          case Some(m) => m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
+          case Some(m) => m.asInstanceOf[Map[String, Any]].get("leader_epoch").get.asInstanceOf[Int]
         }
       case None => throw new NoEpochForPartitionException("No epoch, ISR path for topic %s partition %d is empty"
         .format(topic, partition))
@@ -148,9 +146,7 @@ object ZkUtils extends Logging {
     leaderAndIsrOpt match {
       case Some(leaderAndIsr) =>
         Json.parseFull(leaderAndIsr) match {
-          case Some(m) =>
-            val isrString = m.asInstanceOf[Map[String, String]].get("ISR").get
-            Utils.parseCsvList(isrString).map(r => r.toInt)
+          case Some(m) => m.asInstanceOf[Map[String, Any]].get("isr").get.asInstanceOf[Seq[Int]]
           case None => Seq.empty[Int]
         }
       case None => Seq.empty[Int]
@@ -165,9 +161,12 @@ object ZkUtils extends Logging {
     jsonPartitionMapOpt match {
       case Some(jsonPartitionMap) =>
         Json.parseFull(jsonPartitionMap) match {
-          case Some(m) => m.asInstanceOf[Map[String, List[String]]].get(partition.toString) match {
+          case Some(m) => m.asInstanceOf[Map[String, Any]].get("partitions") match {
+            case Some(replicaMap) => replicaMap.asInstanceOf[Map[String, Seq[Int]]].get(partition.toString) match {
+              case Some(seq) => seq
+              case None => Seq.empty[Int]
+            }
             case None => Seq.empty[Int]
-            case Some(seq) => seq.map(_.toInt)
           }
           case None => Seq.empty[Int]
         }
@@ -183,9 +182,11 @@ object ZkUtils extends Logging {
 
   def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, jmxPort: Int) {
     val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
-    val broker = new Broker(id, host, port)
+    val brokerInfo =
+      Utils.mergeJsonObjects(Seq(Utils.mapToJson(Map("host" -> host), valueInQuotes = true),
+                                 Utils.mapToJson(Map("version" -> 1.toString, "jmx_port" -> jmxPort.toString, "port" -> port.toString), valueInQuotes = false)))
     try {
-      createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZkString + ":" + jmxPort)
+      createEphemeralPathExpectConflict(zkClient, brokerIdPath, brokerInfo)
     } catch {
       case e: ZkNodeExistsException =>
         throw new RuntimeException("A broker is already registered on the path " + brokerIdPath + ". This probably " + "indicates that you either have configured a brokerid that is already in use, or " + "else you have shutdown this broker and restarted it faster than the zookeeper " + "timeout so it appears to be re-registering.")
@@ -199,12 +200,17 @@ object ZkUtils extends Logging {
   }
 
   def leaderAndIsrZkData(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int): String = {
-    val jsonDataMap = new HashMap[String, String]
-    jsonDataMap.put("leader", leaderAndIsr.leader.toString)
-    jsonDataMap.put("leaderEpoch", leaderAndIsr.leaderEpoch.toString)
-    jsonDataMap.put("ISR", if(leaderAndIsr.isr.isEmpty) "" else leaderAndIsr.isr.mkString(","))
-    jsonDataMap.put("controllerEpoch", controllerEpoch.toString)
-    Utils.stringMapToJson(jsonDataMap)
+    val isrInfo = Utils.seqToJson(leaderAndIsr.isr.map(_.toString), valueInQuotes = false)
+    Utils.mapToJson(Map("version" -> 1.toString, "leader" -> leaderAndIsr.leader.toString, "leader_epoch" -> leaderAndIsr.leaderEpoch.toString,
+                        "controller_epoch" -> controllerEpoch.toString, "isr" -> isrInfo), valueInQuotes = false)
+  }
+
+  /**
+   * Get JSON partition to replica map from zookeeper.
+   */
+  def replicaAssignmentZkdata(map: Map[String, Seq[Int]]): String = {
+    val jsonReplicaAssignmentMap = Utils.mapWithSeqValuesToJson(map.map(e => (e._1.toString -> e._2)))
+    Utils.mapToJson(Map("version" -> 1.toString, "partitions" -> jsonReplicaAssignmentMap), valueInQuotes = false)
   }
 
   /**
@@ -453,30 +459,7 @@ object ZkUtils extends Logging {
     cluster
   }
 
-  def getReplicaAssignmentForTopics(zkClient: ZkClient, topics: Iterator[String]):
-  mutable.Map[(String, Int), Seq[Int]] = {
-    val ret = new mutable.HashMap[(String, Int), Seq[Int]]
-    topics.foreach { topic =>
-      val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1
-      jsonPartitionMapOpt match {
-        case Some(jsonPartitionMap) =>
-          Json.parseFull(jsonPartitionMap) match {
-            case Some(m) =>
-              val replicaMap = m.asInstanceOf[Map[String, Seq[String]]]
-              for((partition, replicas) <- replicaMap){
-                ret.put((topic, partition.toInt), replicas.map(_.toInt))
-                debug("Replicas assigned to topic [%s], partition [%s] are [%s]".format(topic, partition, replicas))
-              }
-            case None =>
-          }
-        case None =>
-      }
-    }
-    ret
-  }
-
-  def getPartitionLeaderAndIsrForTopics(zkClient: ZkClient, topics: Seq[String]):
-  mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = {
+  def getPartitionLeaderAndIsrForTopics(zkClient: ZkClient, topics: Seq[String]): mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = {
     val ret = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
     val partitionsForTopics = getPartitionsForTopics(zkClient, topics)
     for((topic, partitions) <- partitionsForTopics) {
@@ -497,12 +480,15 @@ object ZkUtils extends Logging {
       jsonPartitionMapOpt match {
         case Some(jsonPartitionMap) =>
           Json.parseFull(jsonPartitionMap) match {
-            case Some(m) =>
-              val replicaMap = m.asInstanceOf[Map[String, Seq[String]]]
-              for((partition, replicas) <- replicaMap){
-                ret.put(TopicAndPartition(topic, partition.toInt), replicas.map(_.toInt))
-                debug("Replicas assigned to topic [%s], partition [%s] are [%s]".format(topic, partition, replicas))
-              }
+            case Some(m) => m.asInstanceOf[Map[String, Any]].get("partitions") match {
+              case Some(repl)  =>
+                val replicaMap = repl.asInstanceOf[Map[String, Seq[Int]]]
+                for((partition, replicas) <- replicaMap){
+                  ret.put(TopicAndPartition(topic, partition.toInt), replicas)
+                  debug("Replicas assigned to topic [%s], partition [%s] are [%s]".format(topic, partition, replicas))
+                }
+              case None =>
+            }
             case None =>
           }
         case None =>
@@ -511,17 +497,19 @@ object ZkUtils extends Logging {
     ret
   }
 
-  def getPartitionAssignmentForTopics(zkClient: ZkClient, topics: Seq[String]):
-  mutable.Map[String, collection.Map[Int, Seq[Int]]] = {
+  def getPartitionAssignmentForTopics(zkClient: ZkClient, topics: Seq[String]): mutable.Map[String, collection.Map[Int, Seq[Int]]] = {
     val ret = new mutable.HashMap[String, Map[Int, Seq[Int]]]()
     topics.foreach{ topic =>
       val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1
       val partitionMap = jsonPartitionMapOpt match {
         case Some(jsonPartitionMap) =>
           Json.parseFull(jsonPartitionMap) match {
-            case Some(m) =>
-              val m1 = m.asInstanceOf[Map[String, Seq[String]]]
-              m1.map(p => (p._1.toInt, p._2.map(_.toInt)))
+            case Some(m) => m.asInstanceOf[Map[String, Any]].get("partitions") match {
+              case Some(replicaMap) =>
+                val m1 = replicaMap.asInstanceOf[Map[String, Seq[Int]]]
+                m1.map(p => (p._1.toInt, p._2))
+              case None => Map[Int, Seq[Int]]()
+            }
             case None => Map[Int, Seq[Int]]()
           }
         case None => Map[Int, Seq[Int]]()
@@ -544,8 +532,7 @@ object ZkUtils extends Logging {
   }
 
   def getPartitionsForTopics(zkClient: ZkClient, topics: Seq[String]): mutable.Map[String, Seq[Int]] = {
-    getPartitionAssignmentForTopics(zkClient, topics).map
-    { topicAndPartitionMap =>
+    getPartitionAssignmentForTopics(zkClient, topics).map { topicAndPartitionMap =>
       val topic = topicAndPartitionMap._1
       val partitionMap = topicAndPartitionMap._2
       debug("partition assignment of /brokers/topics/%s is %s".format(topic, partitionMap))
@@ -553,8 +540,7 @@ object ZkUtils extends Logging {
     }
   }
 
-  def getPartitionsAssignedToBroker(zkClient: ZkClient, topics: Seq[String], brokerId: Int):
-    Seq[(String, Int)] = {
+  def getPartitionsAssignedToBroker(zkClient: ZkClient, topics: Seq[String], brokerId: Int): Seq[(String, Int)] = {
     val topicsAndPartitions = getPartitionAssignmentForTopics(zkClient, topics)
     topicsAndPartitions.map { topicAndPartitionMap =>
       val topic = topicAndPartitionMap._1
@@ -584,11 +570,11 @@ object ZkUtils extends Logging {
   def parsePartitionReassignmentData(jsonData: String):Map[TopicAndPartition, Seq[Int]] = {
     Json.parseFull(jsonData) match {
       case Some(m) =>
-        val replicaMap = m.asInstanceOf[Map[String, Seq[String]]]
+        val replicaMap = m.asInstanceOf[Map[String, Seq[Int]]]
         replicaMap.map { reassignedPartitions =>
-          val topic = reassignedPartitions._1.split(",").head
-          val partition = reassignedPartitions._1.split(",").last.toInt
-          val newReplicas = reassignedPartitions._2.map(_.toInt)
+          val topic = reassignedPartitions._1.split(",").head.trim
+          val partition = reassignedPartitions._1.split(",").last.trim.toInt
+          val newReplicas = reassignedPartitions._2
           TopicAndPartition(topic, partition) -> newReplicas
         }
       case None => Map.empty[TopicAndPartition, Seq[Int]]
@@ -602,8 +588,7 @@ object ZkUtils extends Logging {
         deletePath(zkClient, zkPath)
         info("No more partitions need to be reassigned. Deleting zk path %s".format(zkPath))
       case _ =>
-        val jsonData = Utils.mapToJson(partitionsToBeReassigned.map(p =>
-          ("%s,%s".format(p._1.topic, p._1.partition)) -> p._2.map(_.toString)))
+        val jsonData = Utils.mapWithSeqValuesToJson(partitionsToBeReassigned.map(p => ("%s,%s".format(p._1.topic, p._1.partition)) -> p._2))
         try {
           updatePersistentPath(zkClient, zkPath, jsonData)
           info("Updated partition reassignment path with %s".format(jsonData))
@@ -641,7 +626,7 @@ object ZkUtils extends Logging {
       case Some(m) =>
         val topicAndPartitions = m.asInstanceOf[Array[Map[String, String]]]
         val partitions = topicAndPartitions.map { p =>
-          val topicPartitionMap = p.asInstanceOf[Map[String, String]]
+          val topicPartitionMap = p
           val topic = topicPartitionMap.get("topic").get
           val partition = topicPartitionMap.get("partition").get.toInt
           TopicAndPartition(topic, partition)
@@ -698,8 +683,10 @@ object ZkUtils extends Logging {
 
   def getAllTopics(zkClient: ZkClient): Seq[String] = {
     val topics = ZkUtils.getChildrenParentMayNotExist(zkClient, BrokerTopicsPath)
-    if(topics == null) Seq.empty[String]
-    else topics
+    if(topics == null)
+      Seq.empty[String]
+    else
+      topics
   }
 
   def getAllPartitions(zkClient: ZkClient): Set[TopicAndPartition] = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/828ce83d/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 7a31b51..4dd1ba7 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -21,15 +21,15 @@ import org.junit.Test
 import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
 import kafka.server.KafkaConfig
-import kafka.utils.{ZkUtils, TestUtils}
+import kafka.utils.{Logging, ZkUtils, TestUtils}
 import kafka.common.{TopicExistsException, ErrorMapping, TopicAndPartition}
 
 
-class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
+class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
 
   @Test
   def testReplicaAssignment() {
-    val brokerList = List("0", "1", "2", "3", "4")
+    val brokerList = List(0, 1, 2, 3, 4)
 
     // test 0 replication factor
     try {
@@ -54,16 +54,16 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
     // correct assignment
     {
       val expectedAssignment = Map(
-        0 -> List("0", "1", "2"),
-        1 -> List("1", "2", "3"),
-        2 -> List("2", "3", "4"),
-        3 -> List("3", "4", "0"),
-        4 -> List("4", "0", "1"),
-        5 -> List("0", "2", "3"),
-        6 -> List("1", "3", "4"),
-        7 -> List("2", "4", "0"),
-        8 -> List("3", "0", "1"),
-        9 -> List("4", "1", "2")
+        0 -> List(0, 1, 2),
+        1 -> List(1, 2, 3),
+        2 -> List(2, 3, 4),
+        3 -> List(3, 4, 0),
+        4 -> List(4, 0, 1),
+        5 -> List(0, 2, 3),
+        6 -> List(1, 3, 4),
+        7 -> List(2, 4, 0),
+        8 -> List(3, 0, 1),
+        9 -> List(4, 1, 2)
       )
 
       val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerList, 10, 3, 0)
@@ -74,7 +74,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
 
   @Test
   def testManualReplicaAssignment() {
-    val brokerList = Set("0", "1", "2", "3", "4")
+    val brokerList = Set(0, 1, 2, 3, 4)
 
     // duplicated brokers
     try {
@@ -113,8 +113,8 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
     {
       val replicationAssignmentStr = "0:1:2,1:2:3"
       val expectedReplicationAssignment = Map(
-        0 -> List("0", "1", "2"),
-        1 -> List("1", "2", "3")
+        0 -> List(0, 1, 2),
+        1 -> List(1, 2, 3)
       )
       val actualReplicationAssignment = CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList)
       assertEquals(expectedReplicationAssignment.size, actualReplicationAssignment.size)
@@ -127,18 +127,18 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
   @Test
   def testTopicCreationInZK() {
     val expectedReplicaAssignment = Map(
-      0  -> List("0", "1", "2"),
-      1  -> List("1", "2", "3"),
-      2  -> List("2", "3", "4"),
-      3  -> List("3", "4", "0"),
-      4  -> List("4", "0", "1"),
-      5  -> List("0", "2", "3"),
-      6  -> List("1", "3", "4"),
-      7  -> List("2", "4", "0"),
-      8  -> List("3", "0", "1"),
-      9  -> List("4", "1", "2"),
-      10 -> List("1", "2", "3"),
-      11 -> List("1", "3", "4")
+      0  -> List(0, 1, 2),
+      1  -> List(1, 2, 3),
+      2  -> List(2, 3, 4),
+      3  -> List(3, 4, 0),
+      4  -> List(4, 0, 1),
+      5  -> List(0, 2, 3),
+      6  -> List(1, 3, 4),
+      7  -> List(2, 4, 0),
+      8  -> List(3, 0, 1),
+      9  -> List(4, 1, 2),
+      10 -> List(1, 2, 3),
+      11 -> List(1, 3, 4)
     )
     val leaderForPartitionMap = Map(
       0 -> 0,
@@ -161,7 +161,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
     // create leaders for all partitions
     TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
     val actualReplicaAssignment = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata.map(p => p.replicas)
-    val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList
+    val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id).toList).toList
     assertEquals(expectedReplicaAssignment.size, actualReplicaList.size)
     for(i <- 0 until actualReplicaList.size)
       assertEquals(expectedReplicaAssignment.get(i).get, actualReplicaList(i))
@@ -178,8 +178,8 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
   @Test
   def testGetTopicMetadata() {
     val expectedReplicaAssignment = Map(
-      0 -> List("0", "1", "2"),
-      1 -> List("1", "2", "3")
+      0 -> List(0, 1, 2),
+      1 -> List(1, 2, 3)
     )
     val leaderForPartitionMap = Map(
       0 -> 0,
@@ -200,7 +200,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
         assertNotNull("partition metadata list cannot be null", newTopicMetadata.partitionsMetadata)
         assertEquals("partition metadata list length should be 2", 2, newTopicMetadata.partitionsMetadata.size)
         val actualReplicaAssignment = newTopicMetadata.partitionsMetadata.map(p => p.replicas)
-        val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList
+        val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id).toList).toList
         assertEquals(expectedReplicaAssignment.size, actualReplicaList.size)
         for(i <- 0 until actualReplicaList.size) {
           assertEquals(expectedReplicaAssignment(i), actualReplicaList(i))
@@ -210,7 +210,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
 
   @Test
   def testPartitionReassignmentWithLeaderInNewReplicas() {
-    val expectedReplicaAssignment = Map(0  -> List("0", "1", "2"))
+    val expectedReplicaAssignment = Map(0  -> List(0, 1, 2))
     val topic = "test"
     // create brokers
     val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b)))
@@ -235,7 +235,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
 
   @Test
   def testPartitionReassignmentWithLeaderNotInNewReplicas() {
-    val expectedReplicaAssignment = Map(0  -> List("0", "1", "2"))
+    val expectedReplicaAssignment = Map(0  -> List(0, 1, 2))
     val topic = "test"
     // create brokers
     val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b)))
@@ -261,7 +261,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
 
   @Test
   def testPartitionReassignmentNonOverlappingReplicas() {
-    val expectedReplicaAssignment = Map(0  -> List("0", "1"))
+    val expectedReplicaAssignment = Map(0  -> List(0, 1))
     val topic = "test"
     // create brokers
     val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b)))
@@ -304,7 +304,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
 
   @Test
   def testResumePartitionReassignmentThatWasCompleted() {
-    val expectedReplicaAssignment = Map(0  -> List("0", "1"))
+    val expectedReplicaAssignment = Map(0  -> List(0, 1))
     val topic = "test"
     // create the topic
     AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
@@ -339,7 +339,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
 
   @Test
   def testBasicPreferredReplicaElection() {
-    val expectedReplicaAssignment = Map(1  -> List("0", "1", "2"))
+    val expectedReplicaAssignment = Map(1  -> List(0, 1, 2))
     val topic = "test"
     val partition = 1
     val preferredReplica = 0
@@ -360,7 +360,8 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
 
   @Test
   def testShutdownBroker() {
-    val expectedReplicaAssignment = Map(1  -> List("0", "1", "2"))
+    info("inside testShutdownBroker")
+    val expectedReplicaAssignment = Map(1  -> List(0, 1, 2))
     val topic = "test"
     val partition = 1
     // create brokers


Mime
View raw message