kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1397765 [1/2] - in /incubator/kafka/branches/0.8: core/src/main/scala/kafka/ core/src/main/scala/kafka/admin/ core/src/main/scala/kafka/api/ core/src/main/scala/kafka/client/ core/src/main/scala/kafka/cluster/ core/src/main/scala/kafka/con...
Date Sat, 13 Oct 2012 03:35:05 GMT
Author: jkreps
Date: Sat Oct 13 03:35:02 2012
New Revision: 1397765

URL: http://svn.apache.org/viewvc?rev=1397765&view=rev
Log:
KAFKA-569 Split up utils package and do some cleanup. Patch reviewed by Neha.


Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ApiUtils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/client/
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/client/ClientUtils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/CommandLineUtils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Json.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
Removed:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRResponse.scala
Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/MirrorMaker.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/VerifiableProperties.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/MessageTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
    incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala Sat Oct 13 03:35:02 2012
@@ -35,7 +35,7 @@ object Kafka extends Logging {
       val verifiableProps = serverConfig.props
       val metricsConfig = new KafkaMetricsConfig(verifiableProps)
       metricsConfig.reporters.foreach(reporterType => {
-        val reporter = Utils.getObject[KafkaMetricsReporter](reporterType)
+        val reporter = Utils.createObject[KafkaMetricsReporter](reporterType)
         reporter.init(verifiableProps)
         if (reporter.isInstanceOf[KafkaMetricsReporterMBean])
           Utils.registerMBean(reporter, reporter.asInstanceOf[KafkaMetricsReporterMBean].getMBeanName)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala Sat Oct 13 03:35:02 2012
@@ -18,7 +18,8 @@
 package kafka.admin
 
 import java.util.Random
-import kafka.api.{TopicMetadata, PartitionMetadata}
+import kafka.api.{TopicMetadata, PartitionMetadata, TopicMetadataRequest, TopicMetadataResponse}
+import kafka.common._
 import kafka.cluster.Broker
 import kafka.utils.{Logging, Utils, ZkUtils}
 import org.I0Itec.zkclient.ZkClient

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala Sat Oct 13 03:35:02 2012
@@ -34,12 +34,12 @@ object CheckReassignmentStatus extends L
 
     val jsonFile = options.valueOf(jsonFileOpt)
     val zkConnect = options.valueOf(zkConnectOpt)
-    val jsonString = Utils.readFileIntoString(jsonFile)
+    val jsonString = Utils.readFileAsString(jsonFile)
     val zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
 
     try {
       // read the json file into a string
-      val partitionsToBeReassigned = SyncJSON.parseFull(jsonString) match {
+      val partitionsToBeReassigned = Json.parseFull(jsonString) match {
         case Some(reassignedPartitions) =>
           val partitions = reassignedPartitions.asInstanceOf[Array[Map[String, String]]]
           partitions.map { m =>

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala Sat Oct 13 03:35:02 2012
@@ -42,11 +42,11 @@ object PreferredReplicaLeaderElectionCom
 
     val options = parser.parse(args : _*)
 
-    Utils.checkRequiredArgs(parser, options, jsonFileOpt, zkConnectOpt)
+    CommandLineUtils.checkRequiredArgs(parser, options, jsonFileOpt, zkConnectOpt)
 
     val jsonFile = options.valueOf(jsonFileOpt)
     val zkConnect = options.valueOf(zkConnectOpt)
-    val jsonString = Utils.readFileIntoString(jsonFile)
+    val jsonString = Utils.readFileAsString(jsonFile)
     var zkClient: ZkClient = null
 
     try {
@@ -77,7 +77,7 @@ object PreferredReplicaLeaderElectionCom
   }
 
   def parsePreferredReplicaJsonData(jsonString: String): Set[TopicAndPartition] = {
-    SyncJSON.parseFull(jsonString) match {
+    Json.parseFull(jsonString) match {
       case Some(partitionList) =>
         val partitions = (partitionList.asInstanceOf[List[Any]])
         Set.empty[TopicAndPartition] ++ partitions.map { m =>
@@ -93,7 +93,7 @@ object PreferredReplicaLeaderElectionCom
                                         partitionsUndergoingPreferredReplicaElection: scala.collection.Set[TopicAndPartition]) {
     val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath
     val jsonData = Utils.arrayToJson(partitionsUndergoingPreferredReplicaElection.map { p =>
-      Utils.stringMapToJsonString(Map(("topic" -> p.topic), ("partition" -> p.partition.toString)))
+      Utils.stringMapToJson(Map(("topic" -> p.topic), ("partition" -> p.partition.toString)))
     }.toArray)
     try {
       ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala Sat Oct 13 03:35:02 2012
@@ -50,12 +50,12 @@ object ReassignPartitionsCommand extends
 
     val jsonFile = options.valueOf(jsonFileOpt)
     val zkConnect = options.valueOf(zkConnectOpt)
-    val jsonString = Utils.readFileIntoString(jsonFile)
+    val jsonString = Utils.readFileAsString(jsonFile)
     var zkClient: ZkClient = null
 
     try {
       // read the json file into a string
-      val partitionsToBeReassigned = SyncJSON.parseFull(jsonString) match {
+      val partitionsToBeReassigned = Json.parseFull(jsonString) match {
         case Some(reassignedPartitions) =>
           val partitions = reassignedPartitions.asInstanceOf[Array[Map[String, String]]]
           partitions.map { m =>

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ApiUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ApiUtils.scala?rev=1397765&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ApiUtils.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ApiUtils.scala Sat Oct 13 03:35:02 2012
@@ -0,0 +1,92 @@
+package kafka.api
+
+import java.nio._
+import kafka.common._
+
+/**
+ * Helper functions specific to parsing or serializing requests and responses
+ */
+object ApiUtils {
+  
+  val ProtocolEncoding = "UTF-8"
+
+    /**
+   * Read size prefixed string where the size is stored as a 2 byte short.
+   * @param buffer The buffer to read from
+   */
+  def readShortString(buffer: ByteBuffer): String = {
+    val size: Int = buffer.getShort()
+    if(size < 0)
+      return null
+    val bytes = new Array[Byte](size)
+    buffer.get(bytes)
+    new String(bytes, ProtocolEncoding)
+  }
+  
+  /**
+   * Write a size prefixed string where the size is stored as a 2 byte short
+   * @param buffer The buffer to write to
+   * @param string The string to write
+   */
+  def writeShortString(buffer: ByteBuffer, string: String) {
+    if(string == null) {
+      buffer.putShort(-1)
+    } else if(string.length > Short.MaxValue) {
+      throw new KafkaException("String exceeds the maximum size of " + Short.MaxValue + ".")
+    } else {
+      buffer.putShort(string.length.asInstanceOf[Short])
+      buffer.put(string.getBytes(ProtocolEncoding))
+    }
+  }
+  
+  /**
+   * Return size of a size prefixed string where the size is stored as a 2 byte short
+   * @param string The string to write
+   */
+  def shortStringLength(string: String): Int = {
+    if(string == null) {
+      2
+    } else {
+      val encodedString = string.getBytes(ProtocolEncoding)
+      if(encodedString.length > Short.MaxValue) {
+        throw new KafkaException("String exceeds the maximum size of " + Short.MaxValue + ".")
+      } else {
+        2 + encodedString.length
+      }
+    }
+  }
+  
+  /**
+   * Read an integer out of the bytebuffer from the current position and check that it falls within the given
+   * range. If not, throw KafkaException.
+   */
+  def readIntInRange(buffer: ByteBuffer, name: String, range: (Int, Int)): Int = {
+    val value = buffer.getInt
+    if(value < range._1 || value > range._2)
+      throw new KafkaException(name + " has value " + value + " which is not in the range " + range + ".")
+    else value
+  }
+
+  /**
+   * Read a short out of the bytebuffer from the current position and check that it falls within the given
+   * range. If not, throw KafkaException.
+   */
+  def readShortInRange(buffer: ByteBuffer, name: String, range: (Short, Short)): Short = {
+    val value = buffer.getShort
+    if(value < range._1 || value > range._2)
+      throw new KafkaException(name + " has value " + value + " which is not in the range " + range + ".")
+    else value
+  }
+
+  /**
+   * Read a long out of the bytebuffer from the current position and check that it falls within the given
+   * range. If not, throw KafkaException.
+   */
+  def readLongInRange(buffer: ByteBuffer, name: String, range: (Long, Long)): Long = {
+    val value = buffer.getLong
+    if(value < range._1 || value > range._2)
+      throw new KafkaException(name + " has value " + value + " which is not in the range " + range + ".")
+    else value
+  }
+  
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala Sat Oct 13 03:35:02 2012
@@ -18,7 +18,8 @@
 package kafka.api
 
 import java.nio.ByteBuffer
-import kafka.utils.{nonthreadsafe, Utils}
+import kafka.utils.nonthreadsafe
+import kafka.api.ApiUtils._
 import scala.collection.immutable.Map
 import kafka.common.TopicAndPartition
 import kafka.consumer.ConsumerConfig
@@ -35,13 +36,13 @@ object FetchRequest {
   def readFrom(buffer: ByteBuffer): FetchRequest = {
     val versionId = buffer.getShort
     val correlationId = buffer.getInt
-    val clientId = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
+    val clientId = readShortString(buffer)
     val replicaId = buffer.getInt
     val maxWait = buffer.getInt
     val minBytes = buffer.getInt
     val topicCount = buffer.getInt
     val pairs = (1 to topicCount).flatMap(_ => {
-      val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
+      val topic = readShortString(buffer)
       val partitionCount = buffer.getInt
       (1 to partitionCount).map(_ => {
         val partitionId = buffer.getInt
@@ -71,14 +72,14 @@ case class FetchRequest(versionId: Short
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
     buffer.putInt(correlationId)
-    Utils.writeShortString(buffer, clientId, RequestOrResponse.DefaultCharset)
+    writeShortString(buffer, clientId)
     buffer.putInt(replicaId)
     buffer.putInt(maxWait)
     buffer.putInt(minBytes)
     buffer.putInt(requestInfoGroupedByTopic.size) // topic count
     requestInfoGroupedByTopic.foreach {
       case (topic, partitionFetchInfos) =>
-        Utils.writeShortString(buffer, topic, RequestOrResponse.DefaultCharset)
+        writeShortString(buffer, topic)
         buffer.putInt(partitionFetchInfos.size) // partition count
         partitionFetchInfos.foreach {
           case (TopicAndPartition(_, partition), PartitionFetchInfo(offset, fetchSize)) =>
@@ -92,7 +93,7 @@ case class FetchRequest(versionId: Short
   def sizeInBytes: Int = {
     2 + /* versionId */
     4 + /* correlationId */
-    Utils.shortStringLength(clientId, RequestOrResponse.DefaultCharset) +
+    shortStringLength(clientId) +
     4 + /* replicaId */
     4 + /* maxWait */
     4 + /* minBytes */
@@ -100,7 +101,7 @@ case class FetchRequest(versionId: Short
     requestInfoGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => {
       val (topic, partitionFetchInfos) = currTopic
       foldedTopics +
-      Utils.shortStringLength(topic, RequestOrResponse.DefaultCharset) +
+      shortStringLength(topic) +
       4 + /* partition count */
       partitionFetchInfos.size * (
         4 + /* partition id */

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala Sat Oct 13 03:35:02 2012
@@ -22,7 +22,7 @@ import java.nio.channels.GatheringByteCh
 import kafka.common.{TopicAndPartition, ErrorMapping}
 import kafka.message.{MessageSet, ByteBufferMessageSet}
 import kafka.network.{MultiSend, Send}
-import kafka.utils.Utils
+import kafka.api.ApiUtils._
 
 object FetchResponsePartitionData {
   def readFrom(buffer: ByteBuffer): FetchResponsePartitionData = {
@@ -85,7 +85,7 @@ class PartitionDataSend(val partitionId:
 
 object TopicData {
   def readFrom(buffer: ByteBuffer): TopicData = {
-    val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
+    val topic = readShortString(buffer)
     val partitionCount = buffer.getInt
     val topicPartitionDataPairs = (1 to partitionCount).map(_ => {
       val partitionId = buffer.getInt
@@ -96,7 +96,7 @@ object TopicData {
   }
 
   def headerSize(topic: String) =
-    Utils.shortStringLength(topic, RequestOrResponse.DefaultCharset) +
+    shortStringLength(topic) +
     4 /* partition count */
 }
 
@@ -115,7 +115,7 @@ class TopicDataSend(val topicData: Topic
   override def complete = sent >= size
 
   private val buffer = ByteBuffer.allocate(topicData.headerSize)
-  Utils.writeShortString(buffer, topicData.topic)
+  writeShortString(buffer, topicData.topic)
   buffer.putInt(topicData.partitionData.size)
   buffer.rewind()
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala Sat Oct 13 03:35:02 2012
@@ -20,6 +20,7 @@ package kafka.api
 
 import java.nio._
 import kafka.utils._
+import kafka.api.ApiUtils._
 import collection.mutable.Map
 import collection.mutable.HashMap
 
@@ -30,14 +31,14 @@ object LeaderAndIsr {
 }
 
 case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int], var zkVersion: Int) {
-  def this(leader: Int, ISR: List[Int]) = this(leader, LeaderAndIsr.initialLeaderEpoch, ISR, LeaderAndIsr.initialZKVersion)
+  def this(leader: Int, isr: List[Int]) = this(leader, LeaderAndIsr.initialLeaderEpoch, isr, LeaderAndIsr.initialZKVersion)
 
   override def toString(): String = {
     val jsonDataMap = new HashMap[String, String]
     jsonDataMap.put("leader", leader.toString)
     jsonDataMap.put("leaderEpoch", leaderEpoch.toString)
     jsonDataMap.put("ISR", isr.mkString(","))
-    Utils.stringMapToJsonString(jsonDataMap)
+    Utils.stringMapToJson(jsonDataMap)
   }
 }
 
@@ -46,11 +47,11 @@ object PartitionStateInfo {
   def readFrom(buffer: ByteBuffer): PartitionStateInfo = {
     val leader = buffer.getInt
     val leaderGenId = buffer.getInt
-    val ISRString = Utils.readShortString(buffer, "UTF-8")
-    val ISR = ISRString.split(",").map(_.toInt).toList
+    val isrString = readShortString(buffer)
+    val isr = isrString.split(",").map(_.toInt).toList
     val zkVersion = buffer.getInt
     val replicationFactor = buffer.getInt
-    PartitionStateInfo(LeaderAndIsr(leader, leaderGenId, ISR, zkVersion), replicationFactor)
+    PartitionStateInfo(LeaderAndIsr(leader, leaderGenId, isr, zkVersion), replicationFactor)
   }
 }
 
@@ -58,7 +59,7 @@ case class PartitionStateInfo(val leader
   def writeTo(buffer: ByteBuffer) {
     buffer.putInt(leaderAndIsr.leader)
     buffer.putInt(leaderAndIsr.leaderEpoch)
-    Utils.writeShortString(buffer, leaderAndIsr.isr.mkString(","), "UTF-8")
+    writeShortString(buffer, leaderAndIsr.isr.mkString(","))
     buffer.putInt(leaderAndIsr.zkVersion)
     buffer.putInt(replicationFactor)
   }
@@ -79,13 +80,13 @@ object LeaderAndIsrRequest {
 
   def readFrom(buffer: ByteBuffer): LeaderAndIsrRequest = {
     val versionId = buffer.getShort
-    val clientId = Utils.readShortString(buffer)
+    val clientId = readShortString(buffer)
     val ackTimeoutMs = buffer.getInt
     val partitionStateInfosCount = buffer.getInt
     val partitionStateInfos = new HashMap[(String, Int), PartitionStateInfo]
 
     for(i <- 0 until partitionStateInfosCount){
-      val topic = Utils.readShortString(buffer, "UTF-8")
+      val topic = readShortString(buffer)
       val partition = buffer.getInt
       val partitionStateInfo = PartitionStateInfo.readFrom(buffer)
 
@@ -108,11 +109,11 @@ case class LeaderAndIsrRequest (versionI
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
-    Utils.writeShortString(buffer, clientId)
+    writeShortString(buffer, clientId)
     buffer.putInt(ackTimeoutMs)
     buffer.putInt(partitionStateInfos.size)
     for((key, value) <- partitionStateInfos){
-      Utils.writeShortString(buffer, key._1, "UTF-8")
+      writeShortString(buffer, key._1)
       buffer.putInt(key._2)
       value.writeTo(buffer)
     }

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala?rev=1397765&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala Sat Oct 13 03:35:02 2012
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import kafka.common.ErrorMapping
+import java.nio.ByteBuffer
+import kafka.utils.Utils
+import kafka.api.ApiUtils._
+import collection.mutable.HashMap
+import collection.Map
+
+
+object LeaderAndIsrResponse {
+  def readFrom(buffer: ByteBuffer): LeaderAndIsrResponse = {
+    val versionId = buffer.getShort
+    val errorCode = buffer.getShort
+    val numEntries = buffer.getInt
+    val responseMap = new HashMap[(String, Int), Short]()
+    for (i<- 0 until numEntries){
+      val topic = readShortString(buffer)
+      val partition = buffer.getInt
+      val partitionErrorCode = buffer.getShort
+      responseMap.put((topic, partition), partitionErrorCode)
+    }
+    new LeaderAndIsrResponse(versionId, responseMap, errorCode)
+  }
+}
+
+
+case class LeaderAndIsrResponse(versionId: Short,
+                                responseMap: Map[(String, Int), Short],
+                                errorCode: Short = ErrorMapping.NoError)
+        extends RequestOrResponse {
+  def sizeInBytes(): Int ={
+    var size = 2 + 2 + 4
+    for ((key, value) <- responseMap){
+      size += 2 + key._1.length + 4 + 2
+    }
+    size
+  }
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putShort(versionId)
+    buffer.putShort(errorCode)
+    buffer.putInt(responseMap.size)
+    for ((key:(String, Int), value) <- responseMap){
+      writeShortString(buffer, key._1)
+      buffer.putInt(key._2)
+      buffer.putShort(value)
+    }
+  }
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala Sat Oct 13 03:35:02 2012
@@ -20,6 +20,7 @@ package kafka.api
 import java.nio.ByteBuffer
 import kafka.utils.Utils
 import kafka.common.TopicAndPartition
+import kafka.api.ApiUtils._
 
 
 object OffsetRequest {
@@ -33,11 +34,11 @@ object OffsetRequest {
 
   def readFrom(buffer: ByteBuffer): OffsetRequest = {
     val versionId = buffer.getShort
-    val clientId = Utils.readShortString(buffer)
+    val clientId = readShortString(buffer)
     val replicaId = buffer.getInt
     val topicCount = buffer.getInt
     val pairs = (1 to topicCount).flatMap(_ => {
-      val topic = Utils.readShortString(buffer)
+      val topic = readShortString(buffer)
       val partitionCount = buffer.getInt
       (1 to partitionCount).map(_ => {
         val partitionId = buffer.getInt
@@ -64,13 +65,13 @@ case class OffsetRequest(requestInfo: Ma
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
-    Utils.writeShortString(buffer, clientId)
+    writeShortString(buffer, clientId)
     buffer.putInt(replicaId)
 
     buffer.putInt(requestInfoGroupedByTopic.size) // topic count
     requestInfoGroupedByTopic.foreach {
       case((topic, partitionInfos)) =>
-        Utils.writeShortString(buffer, topic)
+        writeShortString(buffer, topic)
         buffer.putInt(partitionInfos.size) // partition count
         partitionInfos.foreach {
           case (TopicAndPartition(_, partition), partitionInfo) =>
@@ -83,13 +84,13 @@ case class OffsetRequest(requestInfo: Ma
 
   def sizeInBytes =
     2 + /* versionId */
-    Utils.shortStringLength(clientId, RequestOrResponse.DefaultCharset) +
+    shortStringLength(clientId) +
     4 + /* replicaId */
     4 + /* topic count */
     requestInfoGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => {
       val (topic, partitionInfos) = currTopic
       foldedTopics +
-      Utils.shortStringLength(topic, RequestOrResponse.DefaultCharset) +
+      shortStringLength(topic) +
       4 + /* partition count */
       partitionInfos.size * (
         4 + /* partition */

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetResponse.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetResponse.scala Sat Oct 13 03:35:02 2012
@@ -20,6 +20,7 @@ package kafka.api
 import java.nio.ByteBuffer
 import kafka.common.{ErrorMapping, TopicAndPartition}
 import kafka.utils.Utils
+import kafka.api.ApiUtils._
 
 
 object OffsetResponse {
@@ -28,7 +29,7 @@ object OffsetResponse {
     val versionId = buffer.getShort
     val numTopics = buffer.getInt
     val pairs = (1 to numTopics).flatMap(_ => {
-      val topic = Utils.readShortString(buffer)
+      val topic = readShortString(buffer)
       val numPartitions = buffer.getInt
       (1 to numPartitions).map(_ => {
         val partition = buffer.getInt
@@ -61,7 +62,7 @@ case class OffsetResponse(versionId: Sho
     offsetsGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => {
       val (topic, errorAndOffsetsMap) = currTopic
       foldedTopics +
-      Utils.shortStringLength(topic) +
+      shortStringLength(topic) +
       4 + /* partition count */
       errorAndOffsetsMap.foldLeft(0)((foldedPartitions, currPartition) => {
         foldedPartitions +
@@ -78,7 +79,7 @@ case class OffsetResponse(versionId: Sho
     buffer.putInt(offsetsGroupedByTopic.size) // topic count
     offsetsGroupedByTopic.foreach {
       case((topic, errorAndOffsetsMap)) =>
-        Utils.writeShortString(buffer, topic)
+        writeShortString(buffer, topic)
         buffer.putInt(errorAndOffsetsMap.size) // partition count
         errorAndOffsetsMap.foreach {
           case((TopicAndPartition(_, partition), errorAndOffsets)) =>

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala Sat Oct 13 03:35:02 2012
@@ -22,6 +22,7 @@ import kafka.message._
 import kafka.utils._
 import scala.collection.Map
 import kafka.common.TopicAndPartition
+import kafka.api.ApiUtils._
 
 
 object ProducerRequest {
@@ -30,14 +31,14 @@ object ProducerRequest {
   def readFrom(buffer: ByteBuffer): ProducerRequest = {
     val versionId: Short = buffer.getShort
     val correlationId: Int = buffer.getInt
-    val clientId: String = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
+    val clientId: String = readShortString(buffer)
     val requiredAcks: Short = buffer.getShort
     val ackTimeoutMs: Int = buffer.getInt
     //build the topic structure
     val topicCount = buffer.getInt
     val partitionDataPairs = (1 to topicCount).flatMap(_ => {
       // process topic
-      val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
+      val topic = readShortString(buffer)
       val partitionCount = buffer.getInt
       (1 to partitionCount).map(_ => {
         val partition = buffer.getInt
@@ -75,7 +76,7 @@ case class ProducerRequest(versionId: Sh
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
     buffer.putInt(correlationId)
-    Utils.writeShortString(buffer, clientId, RequestOrResponse.DefaultCharset)
+    writeShortString(buffer, clientId)
     buffer.putShort(requiredAcks)
     buffer.putInt(ackTimeoutMs)
 
@@ -83,7 +84,7 @@ case class ProducerRequest(versionId: Sh
     buffer.putInt(dataGroupedByTopic.size) //the number of topics
     dataGroupedByTopic.foreach {
       case (topic, topicAndPartitionData) =>
-        Utils.writeShortString(buffer, topic, RequestOrResponse.DefaultCharset) //write the topic
+        writeShortString(buffer, topic) //write the topic
         buffer.putInt(topicAndPartitionData.size) //the number of partitions
         topicAndPartitionData.foreach(partitionAndData => {
           val partition = partitionAndData._1.partition
@@ -100,13 +101,13 @@ case class ProducerRequest(versionId: Sh
   def sizeInBytes: Int = {
     2 + /* versionId */
     4 + /* correlationId */
-    Utils.shortStringLength(clientId, RequestOrResponse.DefaultCharset) + /* client id */
+    shortStringLength(clientId) + /* client id */
     2 + /* requiredAcks */
     4 + /* ackTimeoutMs */
     4 + /* number of topics */
     dataGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => {
       foldedTopics +
-      Utils.shortStringLength(currTopic._1, RequestOrResponse.DefaultCharset) +
+      shortStringLength(currTopic._1) +
       4 + /* the number of partitions */
       {
         currTopic._2.foldLeft(0)((foldedPartitions, currPartition) => {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala Sat Oct 13 03:35:02 2012
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer
 import kafka.utils.Utils
 import scala.collection.Map
 import kafka.common.{TopicAndPartition, ErrorMapping}
+import kafka.api.ApiUtils._
 
 
 object ProducerResponse {
@@ -29,7 +30,7 @@ object ProducerResponse {
     val correlationId = buffer.getInt
     val topicCount = buffer.getInt
     val statusPairs = (1 to topicCount).flatMap(_ => {
-      val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
+      val topic = readShortString(buffer)
       val partitionCount = buffer.getInt
       (1 to partitionCount).map(_ => {
         val partition = buffer.getInt
@@ -64,7 +65,7 @@ case class ProducerResponse(versionId: S
     4 + /* topic count */
     groupedStatus.foldLeft (0) ((foldedTopics, currTopic) => {
       foldedTopics +
-      Utils.shortStringLength(currTopic._1, RequestOrResponse.DefaultCharset) +
+      shortStringLength(currTopic._1) +
       4 + /* partition count for this topic */
       currTopic._2.size * {
         4 + /* partition id */
@@ -83,7 +84,7 @@ case class ProducerResponse(versionId: S
 
     groupedStatus.foreach(topicStatus => {
       val (topic, errorsAndOffsets) = topicStatus
-      Utils.writeShortString(buffer, topic, RequestOrResponse.DefaultCharset)
+      writeShortString(buffer, topic)
       buffer.putInt(errorsAndOffsets.size) // partition count
       errorsAndOffsets.foreach {
         case((TopicAndPartition(_, partition), ProducerResponseStatus(error, nextOffset))) =>

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala Sat Oct 13 03:35:02 2012
@@ -29,12 +29,12 @@ object RequestKeys {
   val StopReplicaKey: Short = 5
 
   val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]=
-    Map( ProduceKey -> ("Produce", ProducerRequest.readFrom),
-         FetchKey -> ("Fetch", FetchRequest.readFrom),
-         OffsetsKey -> ("Offsets", OffsetRequest.readFrom),
-         MetadataKey -> ("Metadata", TopicMetadataRequest.readFrom),
-         LeaderAndIsrKey -> ("LeaderAndIsr", LeaderAndIsrRequest.readFrom),
-         StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom) )
+    Map(ProduceKey -> ("Produce", ProducerRequest.readFrom),
+        FetchKey -> ("Fetch", FetchRequest.readFrom),
+        OffsetsKey -> ("Offsets", OffsetRequest.readFrom),
+        MetadataKey -> ("Metadata", TopicMetadataRequest.readFrom),
+        LeaderAndIsrKey -> ("LeaderAndIsr", LeaderAndIsrRequest.readFrom),
+        StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom))
 
   def nameForKey(key: Short): String = {
     keyToNameAndDeserializerMap.get(key) match {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala Sat Oct 13 03:35:02 2012
@@ -19,12 +19,6 @@ package kafka.api
 
 import java.nio._
 
-
-object RequestOrResponse {
-  val DefaultCharset = "UTF-8"
-}
-
-
 object Request {
   val OrdinaryConsumerId: Int = -1
   val DebuggingConsumerId: Int = -2

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala Sat Oct 13 03:35:02 2012
@@ -19,7 +19,7 @@ package kafka.api
 
 
 import java.nio._
-import kafka.utils._
+import kafka.api.ApiUtils._
 
 object StopReplicaRequest {
   val CurrentVersion = 1.shortValue()
@@ -28,13 +28,12 @@ object StopReplicaRequest {
 
   def readFrom(buffer: ByteBuffer): StopReplicaRequest = {
     val versionId = buffer.getShort
-    val clientId = Utils.readShortString(buffer)
+    val clientId = readShortString(buffer)
     val ackTimeoutMs = buffer.getInt
     val topicPartitionPairCount = buffer.getInt
     val topicPartitionPairSet = new collection.mutable.HashSet[(String, Int)]()
-    for (i <- 0 until topicPartitionPairCount) {
-      topicPartitionPairSet.add(Utils.readShortString(buffer, "UTF-8"), buffer.getInt)
-    }
+    for (i <- 0 until topicPartitionPairCount)
+      topicPartitionPairSet.add(readShortString(buffer), buffer.getInt)
     new StopReplicaRequest(versionId, clientId, ackTimeoutMs, topicPartitionPairSet.toSet)
   }
 }
@@ -51,11 +50,11 @@ case class StopReplicaRequest(versionId:
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
-    Utils.writeShortString(buffer, clientId)
+    writeShortString(buffer, clientId)
     buffer.putInt(ackTimeoutMs)
     buffer.putInt(partitions.size)
     for ((topic, partitionId) <- partitions){
-      Utils.writeShortString(buffer, topic, "UTF-8")
+      writeShortString(buffer, topic)
       buffer.putInt(partitionId)
     }
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaResponse.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaResponse.scala Sat Oct 13 03:35:02 2012
@@ -22,6 +22,7 @@ import kafka.utils.Utils
 import collection.mutable.HashMap
 import collection.mutable.Map
 import kafka.common.ErrorMapping
+import kafka.api.ApiUtils._
 
 
 object StopReplicaResponse {
@@ -32,7 +33,7 @@ object StopReplicaResponse {
 
     val responseMap = new HashMap[(String, Int), Short]()
     for (i<- 0 until numEntries){
-      val topic = Utils.readShortString(buffer, "UTF-8")
+      val topic = readShortString(buffer)
       val partition = buffer.getInt
       val partitionErrorCode = buffer.getShort()
       responseMap.put((topic, partition), partitionErrorCode)
@@ -58,7 +59,7 @@ case class StopReplicaResponse(val versi
     buffer.putShort(errorCode)
     buffer.putInt(responseMap.size)
     for ((key:(String, Int), value) <- responseMap){
-      Utils.writeShortString(buffer, key._1, "UTF-8")
+      writeShortString(buffer, key._1)
       buffer.putInt(key._2)
       buffer.putShort(value)
     }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala Sat Oct 13 03:35:02 2012
@@ -19,7 +19,8 @@ package kafka.api
 
 import kafka.cluster.Broker
 import java.nio.ByteBuffer
-import kafka.utils.Utils._
+import kafka.api.ApiUtils._
+import kafka.utils.Logging
 import collection.mutable.ListBuffer
 import kafka.common.{KafkaException, ErrorMapping}
 
@@ -54,9 +55,9 @@ case object LeaderDoesNotExist extends L
 object TopicMetadata {
 
   def readFrom(buffer: ByteBuffer): TopicMetadata = {
-    val errorCode = getShortInRange(buffer, "error code", (-1, Short.MaxValue))
+    val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue))
     val topic = readShortString(buffer)
-    val numPartitions = getIntInRange(buffer, "number of partitions", (0, Int.MaxValue))
+    val numPartitions = readIntInRange(buffer, "number of partitions", (0, Int.MaxValue))
     val partitionsMetadata = new ListBuffer[PartitionMetadata]()
     for(i <- 0 until numPartitions)
       partitionsMetadata += PartitionMetadata.readFrom(buffer)
@@ -64,7 +65,7 @@ object TopicMetadata {
   }
 }
 
-case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadata], errorCode: Short = ErrorMapping.NoError) {
+case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadata], errorCode: Short = ErrorMapping.NoError) extends Logging {
   def sizeInBytes: Int = {
     var size: Int = 2   /* error code */
     size += shortStringLength(topic)
@@ -87,8 +88,8 @@ case class TopicMetadata(topic: String, 
 object PartitionMetadata {
 
   def readFrom(buffer: ByteBuffer): PartitionMetadata = {
-    val errorCode = getShortInRange(buffer, "error code", (-1, Short.MaxValue))
-    val partitionId = getIntInRange(buffer, "partition id", (0, Int.MaxValue)) /* partition id */
+    val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue))
+    val partitionId = readIntInRange(buffer, "partition id", (0, Int.MaxValue)) /* partition id */
     val doesLeaderExist = getLeaderRequest(buffer.get)
     val leader = doesLeaderExist match {
       case LeaderExists => /* leader exists */
@@ -97,14 +98,14 @@ object PartitionMetadata {
     }
 
     /* list of all replicas */
-    val numReplicas = getShortInRange(buffer, "number of all replicas", (0, Short.MaxValue))
+    val numReplicas = readShortInRange(buffer, "number of all replicas", (0, Short.MaxValue))
     val replicas = new Array[Broker](numReplicas)
     for(i <- 0 until numReplicas) {
       replicas(i) = Broker.readFrom(buffer)
     }
 
     /* list of in-sync replicas */
-    val numIsr = getShortInRange(buffer, "number of in-sync replicas", (0, Short.MaxValue))
+    val numIsr = readShortInRange(buffer, "number of in-sync replicas", (0, Short.MaxValue))
     val isr = new Array[Broker](numIsr)
     for(i <- 0 until numIsr) {
       isr(i) = Broker.readFrom(buffer)
@@ -122,8 +123,11 @@ object PartitionMetadata {
   }
 }
 
-case class PartitionMetadata(partitionId: Int, val leader: Option[Broker], replicas: Seq[Broker], isr: Seq[Broker] = Seq.empty,
-                             errorCode: Short = ErrorMapping.NoError) {
+case class PartitionMetadata(partitionId: Int, 
+                             val leader: Option[Broker], 
+                             replicas: Seq[Broker], 
+                             isr: Seq[Broker] = Seq.empty,
+                             errorCode: Short = ErrorMapping.NoError) extends Logging {
   def sizeInBytes: Int = {
     var size: Int = 2 /* error code */ + 4 /* partition id */ + 1 /* if leader exists*/
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala Sat Oct 13 03:35:02 2012
@@ -18,11 +18,11 @@
 package kafka.api
 
 import java.nio.ByteBuffer
-import kafka.utils.Utils._
+import kafka.api.ApiUtils._
 import collection.mutable.ListBuffer
-import kafka.utils._
+import kafka.utils.Logging
 
-object TopicMetadataRequest {
+object TopicMetadataRequest extends Logging {
   val CurrentVersion = 1.shortValue()
   val DefaultClientId = ""
 
@@ -33,11 +33,11 @@ object TopicMetadataRequest {
 
   def readFrom(buffer: ByteBuffer): TopicMetadataRequest = {
     val versionId = buffer.getShort
-    val clientId = Utils.readShortString(buffer)
-    val numTopics = getIntInRange(buffer, "number of topics", (0, Int.MaxValue))
+    val clientId = readShortString(buffer)
+    val numTopics = readIntInRange(buffer, "number of topics", (0, Int.MaxValue))
     val topics = new ListBuffer[String]()
     for(i <- 0 until numTopics)
-      topics += readShortString(buffer, "UTF-8")
+      topics += readShortString(buffer)
     val topicsList = topics.toList
     debug("topic = %s".format(topicsList.head))
     new TopicMetadataRequest(versionId, clientId, topics.toList)
@@ -54,7 +54,7 @@ def this(topics: Seq[String]) =
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
-    Utils.writeShortString(buffer, clientId)
+    writeShortString(buffer, clientId)
     buffer.putInt(topics.size)
     topics.foreach(topic => writeShortString(buffer, topic))
   }

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/client/ClientUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/client/ClientUtils.scala?rev=1397765&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/client/ClientUtils.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/client/ClientUtils.scala Sat Oct 13 03:35:02 2012
@@ -0,0 +1,60 @@
+package kafka.client
+
+import scala.collection._
+import kafka.cluster._
+import kafka.api._
+import kafka.producer._
+import kafka.common.KafkaException
+import kafka.utils.{Utils, Logging}
+
+/**
+ * Helper functions common to clients (producer, consumer, or admin)
+ */
+object ClientUtils extends Logging{
+
+  def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker]): TopicMetadataResponse = {
+    var fetchMetaDataSucceeded: Boolean = false
+    var i: Int = 0
+    val topicMetadataRequest = new TopicMetadataRequest(topics.toSeq)
+    var topicMetadataResponse: TopicMetadataResponse = null
+    var t: Throwable = null
+    while(i < brokers.size && !fetchMetaDataSucceeded) {
+      val producer: SyncProducer = ProducerPool.createSyncProducer(None, brokers(i))
+      info("Fetching metadata for topic %s".format(topics))
+      try {
+        topicMetadataResponse = producer.send(topicMetadataRequest)
+        fetchMetaDataSucceeded = true
+      }
+      catch {
+        case e =>
+          warn("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, brokers(i).toString), e)
+          t = e
+      } finally {
+        i = i + 1
+        producer.close()
+      }
+    }
+    if(!fetchMetaDataSucceeded){
+      throw new KafkaException("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, brokers), t)
+    }
+    return topicMetadataResponse
+  }
+  
+  /**
+   * Parse a list of broker urls in the form host1:port1, host2:port2, ... 
+   */
+  def parseBrokerList(brokerListStr: String): Seq[Broker] = {
+    val brokersStr = Utils.parseCsvList(brokerListStr)
+
+    brokersStr.zipWithIndex.map(b =>{
+      val brokerStr = b._1
+      val brokerId = b._2
+      val brokerInfos = brokerStr.split(":")
+      val hostName = brokerInfos(0)
+      val port = brokerInfos(1).toInt
+      val creatorId = hostName + "-" + System.currentTimeMillis()
+      new Broker(brokerId, creatorId, hostName, port)
+    })
+  }
+  
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala Sat Oct 13 03:35:02 2012
@@ -18,6 +18,7 @@
 package kafka.cluster
 
 import kafka.utils.Utils._
+import kafka.api.ApiUtils._
 import java.nio.ByteBuffer
 import kafka.common.BrokerNotAvailableException
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala Sat Oct 13 03:35:02 2012
@@ -42,7 +42,7 @@ class Partition(val topic: String,
   var leaderReplicaIdOpt: Option[Int] = None
   var inSyncReplicas: Set[Replica] = Set.empty[Replica]
   private val assignedReplicaMap = new Pool[Int,Replica]
-  private val leaderISRUpdateLock = new Object
+  private val leaderIsrUpdateLock = new Object
   private var zkVersion: Int = LeaderAndIsr.initialZKVersion
   private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1
   this.logIdent = "Partition [%s, %d] on broker %d: ".format(topic, partitionId, localBrokerId)
@@ -90,7 +90,7 @@ class Partition(val topic: String,
   }
 
   def leaderReplicaIfLocal(): Option[Replica] = {
-    leaderISRUpdateLock synchronized {
+    leaderIsrUpdateLock synchronized {
       leaderReplicaIdOpt match {
         case Some(leaderReplicaId) =>
           if (leaderReplicaId == localBrokerId)
@@ -114,17 +114,17 @@ class Partition(val topic: String,
   /**
    *  If the leaderEpoch of the incoming request is higher than locally cached epoch, make it the new leader of follower to the new leader.
    */
-  def makeLeaderOrFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr, isMakingLeader: Boolean): Boolean = {
-    leaderISRUpdateLock synchronized {
-      if (leaderEpoch >= leaderAndISR.leaderEpoch){
+  def makeLeaderOrFollower(topic: String, partitionId: Int, leaderAndIsr: LeaderAndIsr, isMakingLeader: Boolean): Boolean = {
+    leaderIsrUpdateLock synchronized {
+      if (leaderEpoch >= leaderAndIsr.leaderEpoch){
         info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become %s request"
-          .format(leaderEpoch, leaderAndISR.leaderEpoch, if(isMakingLeader) "leader" else "follower"))
+          .format(leaderEpoch, leaderAndIsr.leaderEpoch, if(isMakingLeader) "leader" else "follower"))
         return false
       }
       if(isMakingLeader)
-        makeLeader(topic, partitionId, leaderAndISR)
+        makeLeader(topic, partitionId, leaderAndIsr)
       else
-        makeFollower(topic, partitionId, leaderAndISR)
+        makeFollower(topic, partitionId, leaderAndIsr)
       true
     }
   }
@@ -136,17 +136,17 @@ class Partition(val topic: String,
    *  3. reset LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time)
    *  4. set the new leader and ISR
    */
-  private def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr) {
-    trace("Started to become leader at the request %s".format(leaderAndISR.toString()))
+  private def makeLeader(topic: String, partitionId: Int, leaderAndIsr: LeaderAndIsr) {
+    trace("Started to become leader at the request %s".format(leaderAndIsr.toString()))
     // stop replica fetcher thread, if any
     replicaFetcherManager.removeFetcher(topic, partitionId)
 
-    val newInSyncReplicas = leaderAndISR.isr.map(r => getOrCreateReplica(r)).toSet
+    val newInSyncReplicas = leaderAndIsr.isr.map(r => getOrCreateReplica(r)).toSet
     // reset LogEndOffset for remote replicas
     assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = ReplicaManager.UnknownLogEndOffset)
     inSyncReplicas = newInSyncReplicas
-    leaderEpoch = leaderAndISR.leaderEpoch
-    zkVersion = leaderAndISR.zkVersion
+    leaderEpoch = leaderAndIsr.leaderEpoch
+    zkVersion = leaderAndIsr.zkVersion
     leaderReplicaIdOpt = Some(localBrokerId)
     // we may need to increment high watermark since ISR could be down to 1
     maybeIncrementLeaderHW(getReplica().get)
@@ -158,9 +158,9 @@ class Partition(val topic: String,
    *  3. set the leader and set ISR to empty
    *  4. start a fetcher to the new leader
    */
-  private def makeFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr) = {
-    trace("Started to become follower at the request %s".format(leaderAndISR.toString()))
-    val newLeaderBrokerId: Int = leaderAndISR.leader
+  private def makeFollower(topic: String, partitionId: Int, leaderAndIsr: LeaderAndIsr) = {
+    trace("Started to become follower at the request %s".format(leaderAndIsr.toString()))
+    val newLeaderBrokerId: Int = leaderAndIsr.leader
     info("Starting the follower state transition to follow leader %d for topic %s partition %d"
       .format(newLeaderBrokerId, topic, partitionId))
     ZkUtils.getBrokerInfo(zkClient, newLeaderBrokerId) match {
@@ -171,8 +171,8 @@ class Partition(val topic: String,
         val localReplica = getOrCreateReplica()
         localReplica.log.get.truncateTo(localReplica.highWatermark)
         inSyncReplicas = Set.empty[Replica]
-        leaderEpoch = leaderAndISR.leaderEpoch
-        zkVersion = leaderAndISR.zkVersion
+        leaderEpoch = leaderAndIsr.leaderEpoch
+        zkVersion = leaderAndIsr.zkVersion
         leaderReplicaIdOpt = Some(newLeaderBrokerId)
         // start fetcher thread to current leader
         replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker)
@@ -182,8 +182,8 @@ class Partition(val topic: String,
     }
   }
 
-  def updateLeaderHWAndMaybeExpandISR(replicaId: Int, offset: Long) {
-    leaderISRUpdateLock synchronized {
+  def updateLeaderHWAndMaybeExpandIsr(replicaId: Int, offset: Long) {
+    leaderIsrUpdateLock synchronized {
       debug("Recording follower %d position %d for topic %s partition %d.".format(replicaId, offset, topic, partitionId))
       val replica = getOrCreateReplica(replicaId)
       replica.logEndOffset = offset
@@ -198,7 +198,7 @@ class Partition(val topic: String,
             val newInSyncReplicas = inSyncReplicas + replica
             info("Expanding ISR for topic %s partition %d to %s".format(topic, partitionId, newInSyncReplicas.map(_.brokerId).mkString(", ")))
             // update ISR in ZK and cache
-            updateISR(newInSyncReplicas)
+            updateIsr(newInSyncReplicas)
             replicaManager.isrExpandRate.mark()
           }
           maybeIncrementLeaderHW(leaderReplica)
@@ -208,7 +208,7 @@ class Partition(val topic: String,
   }
 
   def checkEnoughReplicasReachOffset(requiredOffset: Long, requiredAcks: Int): (Boolean, Short) = {
-    leaderISRUpdateLock synchronized {
+    leaderIsrUpdateLock synchronized {
       leaderReplicaIfLocal() match {
         case Some(_) =>
           val numAcks = inSyncReplicas.count(r => {
@@ -247,8 +247,8 @@ class Partition(val topic: String,
         .format(topic, partitionId, oldHighWatermark, newHighWatermark, allLogEndOffsets.mkString(",")))
   }
 
-  def maybeShrinkISR(replicaMaxLagTimeMs: Long,  replicaMaxLagBytes: Long) {
-    leaderISRUpdateLock synchronized {
+  def maybeShrinkIsr(replicaMaxLagTimeMs: Long,  replicaMaxLagBytes: Long) {
+    leaderIsrUpdateLock synchronized {
       leaderReplicaIfLocal() match {
         case Some(leaderReplica) =>
           val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs, replicaMaxLagBytes)
@@ -257,7 +257,7 @@ class Partition(val topic: String,
             assert(newInSyncReplicas.size > 0)
             info("Shrinking ISR for topic %s partition %d to %s".format(topic, partitionId, newInSyncReplicas.map(_.brokerId).mkString(",")))
             // update ISR in zk and in cache
-            updateISR(newInSyncReplicas)
+            updateIsr(newInSyncReplicas)
             // we may need to increment high watermark since ISR could be down to 1
             maybeIncrementLeaderHW(leaderReplica)
             replicaManager.isrShrinkRate.mark()
@@ -289,15 +289,15 @@ class Partition(val topic: String,
     stuckReplicas ++ slowReplicas
   }
 
-  private def updateISR(newISR: Set[Replica]) {
-    info("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newISR.mkString(", ")))
-    val newLeaderAndISR = new LeaderAndIsr(localBrokerId, leaderEpoch, newISR.map(r => r.brokerId).toList, zkVersion)
+  private def updateIsr(newIsr: Set[Replica]) {
+    info("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newIsr.mkString(", ")))
+    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion)
     val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
-      ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), newLeaderAndISR.toString(), zkVersion)
+      ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), newLeaderAndIsr.toString(), zkVersion)
     if (updateSucceeded){
-      inSyncReplicas = newISR
+      inSyncReplicas = newIsr
       zkVersion = newVersion
-      trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newISR.mkString(","), zkVersion))
+      trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newIsr.mkString(","), zkVersion))
     } else {
       info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion))
     }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala Sat Oct 13 03:35:02 2012
@@ -25,7 +25,7 @@ import java.util.Properties
 import java.util.Random
 import java.io.PrintStream
 import kafka.message._
-import kafka.utils.{Utils, Logging}
+import kafka.utils.{Utils, Logging, ZkUtils, CommandLineUtils}
 import kafka.utils.ZKStringSerializer
 import kafka.serializer.StringDecoder
 
@@ -109,8 +109,7 @@ object ConsoleConsumer extends Logging {
             "skip it instead of halt.")
 
     val options: OptionSet = tryParse(parser, args)
-    Utils.checkRequiredArgs(parser, options, zkConnectOpt)
-
+    CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
     val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has)
     if (topicOrFilterOpt.size != 1) {
       error("Exactly one of whitelist/blacklist/topic is required.")
@@ -145,14 +144,14 @@ object ConsoleConsumer extends Logging {
     val connector = Consumer.create(config)
 
     if(options.has(resetBeginningOpt))
-      tryCleanupZookeeper(options.valueOf(zkConnectOpt), options.valueOf(groupIdOpt))
+      ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + options.valueOf(groupIdOpt))
 
     Runtime.getRuntime.addShutdownHook(new Thread() {
       override def run() {
         connector.shutdown()
         // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack
-        if(!options.has(groupIdOpt))
-          tryCleanupZookeeper(options.valueOf(zkConnectOpt), options.valueOf(groupIdOpt))
+        if(!options.has(groupIdOpt))  
+          ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + options.valueOf(groupIdOpt))
       }
     })
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala Sat Oct 13 03:35:02 2012
@@ -28,6 +28,7 @@ import kafka.utils.ZkUtils._
 import kafka.utils.{ShutdownableThread, SystemTime}
 import kafka.utils.Utils._
 import kafka.common.TopicAndPartition
+import kafka.client.ClientUtils
 
 /**
  *  Usage:
@@ -52,7 +53,7 @@ class ConsumerFetcherManager(private val
           cond.await()
 
         val brokers = getAllBrokersInCluster(zkClient)
-        val topicsMetadata = getTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, brokers).topicsMetadata
+        val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, brokers).topicsMetadata
         val leaderForPartitionsMap = new HashMap[(String, Int), Broker]
         topicsMetadata.foreach(
           tmd => {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala Sat Oct 13 03:35:02 2012
@@ -20,7 +20,7 @@ package kafka.consumer
 import scala.collection._
 import org.I0Itec.zkclient.ZkClient
 import java.util.regex.Pattern
-import kafka.utils.{SyncJSON, ZKGroupDirs, ZkUtils, Logging}
+import kafka.utils.{Json, ZKGroupDirs, ZkUtils, Logging}
 
 private[kafka] trait TopicCount {
   def getConsumerThreadIdsPerTopic: Map[String, Set[String]]
@@ -88,7 +88,7 @@ private[kafka] object TopicCount extends
     else {
       var topMap : Map[String,Int] = null
       try {
-        SyncJSON.parseFull(topicCountString) match {
+        Json.parseFull(topicCountString) match {
           case Some(m) => topMap = m.asInstanceOf[Map[String,Int]]
           case None => throw new RuntimeException("error constructing TopicCount : " + topicCountString)
         }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Sat Oct 13 03:35:02 2012
@@ -31,6 +31,7 @@ import java.util.UUID
 import kafka.serializer.Decoder
 import kafka.utils.ZkUtils._
 import kafka.common._
+import kafka.client.ClientUtils
 import com.yammer.metrics.core.Gauge
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.Utils._
@@ -390,7 +391,7 @@ private[kafka] class ZookeeperConsumerCo
       val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic
       val consumersPerTopicMap = getConsumersPerTopic(zkClient, group)
       val brokers = getAllBrokersInCluster(zkClient)
-      val topicsMetadata = getTopicMetadata(myTopicThreadIdsMap.keySet, brokers).topicsMetadata
+      val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, brokers).topicsMetadata
       val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
       val leaderIdForPartitionsMap = new mutable.HashMap[(String, Int), Int]
       topicsMetadata.foreach(m =>{

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala Sat Oct 13 03:35:02 2012
@@ -119,7 +119,7 @@ class RequestSendThread(val controllerId
         var response: RequestOrResponse = null
         request.requestId.get match {
           case RequestKeys.LeaderAndIsrKey =>
-            response = LeaderAndISRResponse.readFrom(receive.buffer)
+            response = LeaderAndIsrResponse.readFrom(receive.buffer)
           case RequestKeys.StopReplicaKey =>
             response = StopReplicaResponse.readFrom(receive.buffer)
         }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala Sat Oct 13 03:35:02 2012
@@ -65,9 +65,9 @@ private[kafka] class LogManager(val conf
         warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?")
       } else {
         info("Loading log '" + dir.getName() + "'")
-        val topic = Utils.getTopicPartition(dir.getName)._1
-        val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollDefaultIntervalMs)
-        val maxLogFileSize = logFileSizeMap.get(topic).getOrElse(config.logFileSize)
+        val topicPartition = parseTopicPartitionName(dir.getName)
+        val rollIntervalMs = logRollMsMap.get(topicPartition.topic).getOrElse(this.logRollDefaultIntervalMs)
+        val maxLogFileSize = logFileSizeMap.get(topicPartition.topic).getOrElse(config.logFileSize)
         val log = new Log(dir, 
                           maxLogFileSize, 
                           config.maxMessageSize, 
@@ -78,10 +78,9 @@ private[kafka] class LogManager(val conf
                           config.logIndexIntervalBytes,
                           time, 
                           config.brokerId)
-        val topicPartition = Utils.getTopicPartition(dir.getName)
-        logs.putIfNotExists(topicPartition._1, new Pool[Int, Log]())
-        val parts = logs.get(topicPartition._1)
-        parts.put(topicPartition._2, log)
+        logs.putIfNotExists(topicPartition.topic, new Pool[Int, Log]())
+        val parts = logs.get(topicPartition.topic)
+        parts.put(topicPartition.partition, log)
       }
     }
   }
@@ -168,7 +167,7 @@ private[kafka] class LogManager(val conf
   /* Runs through the log removing segments older than a certain age */
   private def cleanupExpiredSegments(log: Log): Int = {
     val startMs = time.milliseconds
-    val topic = Utils.getTopicPartition(log.name)._1
+    val topic = parseTopicPartitionName(log.name).topic
     val logCleanupThresholdMs = logRetentionMsMap.get(topic).getOrElse(this.logCleanupDefaultAgeMs)
     val toBeDeleted = log.markDeletedWhile(startMs - _.messageSet.file.lastModified > logCleanupThresholdMs)
     val total = log.deleteSegments(toBeDeleted)
@@ -180,7 +179,7 @@ private[kafka] class LogManager(val conf
    *  is at least logRetentionSize bytes in size
    */
   private def cleanupSegmentsToMaintainSize(log: Log): Int = {
-    val topic = Utils.getTopicPartition(log.dir.getName)._1
+    val topic = parseTopicPartitionName(log.dir.getName).topic
     val maxLogRetentionSize = logRetentionSizeMap.get(topic).getOrElse(config.logRetentionSize)
     if(maxLogRetentionSize < 0 || log.size < maxLogRetentionSize) return 0
     var diff = log.size - maxLogRetentionSize
@@ -256,5 +255,10 @@ private[kafka] class LogManager(val conf
 
 
   def topics(): Iterable[String] = logs.keys
+  
+  private def parseTopicPartitionName(name: String): TopicAndPartition = {
+    val index = name.lastIndexOf('-')
+    TopicAndPartition(name.substring(0,index), name.substring(index+1).toInt)
+  }
 
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala Sat Oct 13 03:35:02 2012
@@ -116,7 +116,7 @@ class Message(val buffer: ByteBuffer) {
     buffer.rewind()
     
     // now compute the checksum and fill it in
-    Utils.putUnsignedInt(buffer, CrcOffset, computeChecksum)
+    Utils.writeUnsignedInt(buffer, CrcOffset, computeChecksum)
   }
   
   def this(bytes: Array[Byte], key: Array[Byte], codec: CompressionCodec) = 
@@ -140,7 +140,7 @@ class Message(val buffer: ByteBuffer) {
   /**
    * Retrieve the previously computed CRC for this message
    */
-  def checksum: Long = Utils.getUnsignedInt(buffer, CrcOffset)
+  def checksum: Long = Utils.readUnsignedInt(buffer, CrcOffset)
   
     /**
    * Returns true if the crc stored with the message matches the crc computed off the message contents

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala Sat Oct 13 03:35:02 2012
@@ -28,7 +28,7 @@ class KafkaMetricsConfig(props: Verifiab
    * Comma-separated list of reporter types. These classes should be on the
    * classpath and will be instantiated at run-time.
    */
-  val reporters = Utils.getCSVList(props.getString("kafka.metrics.reporters", ""))
+  val reporters = Utils.parseCsvList(props.getString("kafka.metrics.reporters", ""))
 
   /**
    * The metrics polling interval (in seconds).

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala Sat Oct 13 03:35:02 2012
@@ -21,6 +21,8 @@ import kafka.api.{TopicMetadataRequest, 
 import kafka.common.KafkaException
 import kafka.utils.{Logging, Utils}
 import kafka.common.ErrorMapping
+import kafka.cluster.Broker
+import kafka.client.ClientUtils
 
 
 class BrokerPartitionInfo(producerConfig: ProducerConfig,
@@ -28,7 +30,7 @@ class BrokerPartitionInfo(producerConfig
                           topicPartitionInfo: HashMap[String, TopicMetadata])
         extends Logging {
   val brokerList = producerConfig.brokerList
-  val brokers = Utils.getAllBrokersFromBrokerList(brokerList)
+  val brokers = ClientUtils.parseBrokerList(brokerList)
 
   /**
    * Return a sequence of (brokerId, numPartitions).
@@ -71,7 +73,7 @@ class BrokerPartitionInfo(producerConfig
    */
   def updateInfo(topics: Set[String]) = {
     var topicsMetadata: Seq[TopicMetadata] = Nil
-    val topicMetadataResponse = Utils.getTopicMetadata(topics, brokers)
+    val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers)
     topicsMetadata = topicMetadataResponse.topicsMetadata
     // throw partition specific exception
     topicsMetadata.foreach(tmd =>{
@@ -88,6 +90,7 @@ class BrokerPartitionInfo(producerConfig
     })
     producerPool.updateProducer(topicsMetadata)
   }
+  
 }
 
 case class PartitionAndLeader(topic: String, partitionId: Int, leaderBrokerIdOpt: Option[Int])

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/DefaultPartitioner.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/DefaultPartitioner.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/DefaultPartitioner.scala Sat Oct 13 03:35:02 2012
@@ -17,6 +17,8 @@
 
 package kafka.producer
 
+import kafka.utils.Utils
+
 private[kafka] class DefaultPartitioner[T] extends Partitioner[T] {
   private val random = new java.util.Random
   
@@ -24,6 +26,6 @@ private[kafka] class DefaultPartitioner[
     if(key == null)
       random.nextInt(numPartitions)
     else
-      math.abs(key.hashCode) % numPartitions
+      Utils.abs(key.hashCode) % numPartitions
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala Sat Oct 13 03:35:02 2012
@@ -18,6 +18,7 @@ package kafka.producer
 
 import async.{AsyncProducerStats, DefaultEventHandler, ProducerSendThread, EventHandler}
 import kafka.utils._
+import java.util.Random
 import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
 import kafka.serializer.Encoder
 import java.util.concurrent.atomic.AtomicBoolean
@@ -33,13 +34,14 @@ extends Logging {
 
   private val queue = new LinkedBlockingQueue[ProducerData[K,V]](config.queueSize)
 
+  private val random = new Random
   private var sync: Boolean = true
   private var producerSendThread: ProducerSendThread[K,V] = null
   config.producerType match {
     case "sync" =>
     case "async" =>
       sync = false
-      val asyncProducerID = Utils.getNextRandomInt
+      val asyncProducerID = random.nextInt(Int.MaxValue)
       producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + asyncProducerID, queue,
         eventHandler, config.queueTime, config.batchSize)
       producerSendThread.start
@@ -49,8 +51,8 @@ extends Logging {
   def this(config: ProducerConfig) =
     this(config,
          new DefaultEventHandler[K,V](config,
-                                      Utils.getObject[Partitioner[K]](config.partitionerClass),
-                                      Utils.getObject[Encoder[V]](config.serializerClass),
+                                      Utils.createObject[Partitioner[K]](config.partitionerClass),
+                                      Utils.createObject[Encoder[V]](config.serializerClass),
                                       new ProducerPool(config)))
 
   /**

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala Sat Oct 13 03:35:02 2012
@@ -78,7 +78,7 @@ class ProducerConfig private (val props:
    *
    *  If the compression codec is NoCompressionCodec, compression is disabled for all topics
    */
-  val compressedTopics = Utils.getCSVList(props.getString("compressed.topics", null))
+  val compressedTopics = Utils.parseCsvList(props.getString("compressed.topics", null))
 
   /**
    * The producer using the zookeeper software load balancer maintains a ZK cache that gets

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala Sat Oct 13 03:35:02 2012
@@ -24,6 +24,7 @@ import kafka.serializer.Encoder
 import kafka.utils.{Utils, Logging}
 import scala.collection.{Seq, Map}
 import scala.collection.mutable.{ListBuffer, HashMap}
+import java.util.concurrent.atomic._
 import kafka.api.{TopicMetadata, ProducerRequest}
 
 
@@ -35,6 +36,7 @@ class DefaultEventHandler[K,V](config: P
   extends EventHandler[K,V] with Logging {
   val isSync = ("sync" == config.producerType)
 
+  val counter = new AtomicInteger(0)
   val brokerPartitionInfo = new BrokerPartitionInfo(config, producerPool, topicPartitionInfos)
 
   private val lock = new Object()
@@ -185,8 +187,11 @@ class DefaultEventHandler[K,V](config: P
     if(numPartitions <= 0)
       throw new UnknownTopicOrPartitionException("Invalid number of partitions: " + numPartitions +
         "\n Valid values are > 0")
-    val partition = if(key == null) Utils.getNextRandomInt(numPartitions)
-    else partitioner.partition(key, numPartitions)
+    val partition = 
+      if(key == null) 
+        Utils.abs(counter.getAndIncrement()) % numPartitions
+      else 
+        partitioner.partition(key, numPartitions)
     if(partition < 0 || partition >= numPartitions)
       throw new UnknownTopicOrPartitionException("Invalid partition id : " + partition +
         "\n Valid values are in the range inclusive [0, " + (numPartitions-1) + "]")

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Sat Oct 13 03:35:02 2012
@@ -57,22 +57,22 @@ class KafkaApis(val requestChannel: Requ
       case RequestKeys.FetchKey => handleFetchRequest(request)
       case RequestKeys.OffsetsKey => handleOffsetRequest(request)
       case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
-      case RequestKeys.LeaderAndIsrKey => handleLeaderAndISRRequest(request)
+      case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
       case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
       case requestId => throw new KafkaException("No mapping found for handler id " + requestId)
     }
     request.apiLocalCompleteTimeNs = SystemTime.nanoseconds
   }
 
-  def handleLeaderAndISRRequest(request: RequestChannel.Request) {
-    val leaderAndISRRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest]
+  def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
+    val leaderAndIsrRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest]
     if(requestLogger.isTraceEnabled)
-      requestLogger.trace("Handling leader and isr request " + leaderAndISRRequest)
-    trace("Handling leader and isr request " + leaderAndISRRequest)
+      requestLogger.trace("Handling leader and ISR request " + leaderAndIsrRequest)
+    trace("Handling leader and ISR request " + leaderAndIsrRequest)
     try {
-      val responseMap = replicaManager.becomeLeaderOrFollower(leaderAndISRRequest)
-      val leaderAndISRResponse = new LeaderAndISRResponse(leaderAndISRRequest.versionId, responseMap)
-      requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndISRResponse)))
+      val responseMap = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest)
+      val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.versionId, responseMap)
+      requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndIsrResponse)))
     } catch {
       case e: KafkaStorageException =>
         fatal("Disk error during leadership change.", e)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1397765&r1=1397764&r2=1397765&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Sat Oct 13 03:35:02 2012
@@ -22,6 +22,7 @@ import kafka.message.Message
 import kafka.consumer.ConsumerConfig
 import java.net.InetAddress
 import kafka.utils.{Topic, Utils, VerifiableProperties, ZKConfig}
+import scala.collection._
 
 /**
  * Configuration settings for the kafka server
@@ -73,32 +74,32 @@ class KafkaConfig private (val props: Ve
   /* the default number of log partitions per topic */
   val numPartitions = props.getIntInRange("num.partitions", 1, (1, Int.MaxValue))
   
-  /* the directory in which the log data is kept */
+  /* the directories in which the log data is kept */
   val logDir = props.getString("log.dir")
   
   /* the maximum size of a single log file */
   val logFileSize = props.getIntInRange("log.file.size", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue))
 
   /* the maximum size of a single log file for some specific topic */
-  val logFileSizeMap = Utils.getTopicFileSize(props.getString("topic.log.file.size", ""))
+  val logFileSizeMap = props.getMap("topic.log.file.size", _.toInt > 0).mapValues(_.toInt)
 
   /* the maximum time before a new log segment is rolled out */
   val logRollHours = props.getIntInRange("log.roll.hours", 24*7, (1, Int.MaxValue))
 
   /* the number of hours before rolling out a new log segment for some specific topic */
-  val logRollHoursMap = Utils.getTopicRollHours(props.getString("topic.log.roll.hours", ""))
+  val logRollHoursMap = props.getMap("topic.log.roll.hours", _.toInt > 0).mapValues(_.toInt)  
 
   /* the number of hours to keep a log file before deleting it */
   val logRetentionHours = props.getIntInRange("log.retention.hours", 24*7, (1, Int.MaxValue))
 
   /* the number of hours to keep a log file before deleting it for some specific topic*/
-  val logRetentionHoursMap = Utils.getTopicRetentionHours(props.getString("topic.log.retention.hours", ""))
+  val logRetentionHoursMap = props.getMap("topic.log.retention.hours", _.toInt > 0).mapValues(_.toInt)
 
   /* the maximum size of the log before deleting it */
   val logRetentionSize = props.getLong("log.retention.size", -1)
 
   /* the maximum size of the log for some specific topic before deleting it */
-  val logRetentionSizeMap = Utils.getTopicRetentionSize(props.getString("topic.log.retention.size", ""))
+  val logRetentionSizeMap = props.getMap("topic.log.retention.size", _.toLong > 0).mapValues(_.toLong)
 
   /* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */
   val logCleanupIntervalMinutes = props.getIntInRange("log.cleanup.interval.mins", 10, (1, Int.MaxValue))
@@ -113,7 +114,7 @@ class KafkaConfig private (val props: Ve
   val flushInterval = props.getIntInRange("log.flush.interval", 500, (1, Int.MaxValue))
 
   /* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000  */
-  val flushIntervalMap = Utils.getTopicFlushIntervals(props.getString("topic.flush.intervals.ms", ""))
+  val flushIntervalMap = props.getMap("topic.flush.intervals.ms", _.toInt > 0).mapValues(_.toInt)
 
   /* the frequency in ms that the log flusher checks whether any log needs to be flushed to disk */
   val flushSchedulerThreadRate = props.getInt("log.default.flush.scheduler.interval.ms",  3000)
@@ -161,4 +162,5 @@ class KafkaConfig private (val props: Ve
   /* number of fetcher threads used to replicate messages from a source broker.
    * Increasing this value can increase the degree of I/O parallelism in the follower broker. */
   val numReplicaFetchers = props.getInt("replica.fetchers", 1)
+  
  }



Mime
View raw message