kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject [21/30] git commit: KAFKA-642 Fixes to protocol. Patch reviewed by Neha and Joel.
Date Tue, 18 Dec 2012 17:44:12 GMT
KAFKA-642 Fixes to protocol. Patch reviewed by Neha and Joel.



git-svn-id: https://svn.apache.org/repos/asf/kafka/branches/0.8@1417734 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e1144765
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e1144765
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e1144765

Branch: refs/heads/trunk
Commit: e11447650a9a9fbaebff0e5c6a369375e087f1ae
Parents: 6ca7b3a
Author: Edward Jay Kreps <jkreps@apache.org>
Authored: Thu Dec 6 04:23:44 2012 +0000
Committer: Edward Jay Kreps <jkreps@apache.org>
Committed: Thu Dec 6 04:23:44 2012 +0000

----------------------------------------------------------------------
 .../main/scala/kafka/api/LeaderAndIsrRequest.scala |    8 +-
 .../scala/kafka/api/LeaderAndIsrResponse.scala     |    6 +-
 core/src/main/scala/kafka/api/OffsetRequest.scala  |    6 +-
 core/src/main/scala/kafka/api/OffsetResponse.scala |    6 +-
 .../main/scala/kafka/api/StopReplicaRequest.scala  |    8 +-
 .../main/scala/kafka/api/StopReplicaResponse.scala |    6 +-
 core/src/main/scala/kafka/api/TopicMetadata.scala  |  120 ++++-----------
 .../scala/kafka/api/TopicMetadataRequest.scala     |   11 +-
 .../scala/kafka/api/TopicMetadataResponse.scala    |   32 +++-
 core/src/main/scala/kafka/client/ClientUtils.scala |    3 +-
 core/src/main/scala/kafka/cluster/Broker.scala     |   20 +--
 .../main/scala/kafka/consumer/SimpleConsumer.scala |   19 ++-
 .../scala/kafka/javaapi/TopicMetadataRequest.scala |    7 +-
 core/src/main/scala/kafka/message/Message.scala    |   64 +++++----
 .../main/scala/kafka/producer/ProducerConfig.scala |    2 +-
 .../scala/kafka/producer/SyncProducerConfig.scala  |    4 -
 .../kafka/producer/async/DefaultEventHandler.scala |    7 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |   16 +-
 .../main/scala/kafka/server/KafkaZooKeeper.scala   |    3 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala      |    4 +-
 .../api/RequestResponseSerializationTest.scala     |   19 ++-
 .../unit/kafka/consumer/ConsumerIteratorTest.scala |    2 +-
 .../scala/unit/kafka/integration/FetcherTest.scala |    2 +-
 .../test/scala/unit/kafka/log/LogOffsetTest.scala  |    8 +-
 .../unit/kafka/network/SocketServerTest.scala      |    2 +-
 .../unit/kafka/producer/AsyncProducerTest.scala    |   15 +-
 .../unit/kafka/producer/SyncProducerTest.scala     |    6 +-
 .../unit/kafka/server/LeaderElectionTest.scala     |    2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   38 +++--
 29 files changed, 222 insertions(+), 224 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e1144765/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index f57de6e..7459f4a 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -87,6 +87,7 @@ object LeaderAndIsrRequest {
 
   def readFrom(buffer: ByteBuffer): LeaderAndIsrRequest = {
     val versionId = buffer.getShort
+    val correlationId = buffer.getInt
     val clientId = readShortString(buffer)
     val ackTimeoutMs = buffer.getInt
     val controllerEpoch = buffer.getInt
@@ -106,11 +107,12 @@ object LeaderAndIsrRequest {
     for (i <- 0 until leadersCount)
       leaders += Broker.readFrom(buffer)
 
-    new LeaderAndIsrRequest(versionId, clientId, ackTimeoutMs, partitionStateInfos.toMap, leaders, controllerEpoch)
+    new LeaderAndIsrRequest(versionId, correlationId, clientId, ackTimeoutMs, partitionStateInfos.toMap, leaders, controllerEpoch)
   }
 }
 
 case class LeaderAndIsrRequest (versionId: Short,
+                                correlationId: Int,
                                 clientId: String,
                                 ackTimeoutMs: Int,
                                 partitionStateInfos: Map[(String, Int), PartitionStateInfo],
@@ -119,12 +121,13 @@ case class LeaderAndIsrRequest (versionId: Short,
         extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) {
 
   def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker], controllerEpoch: Int) = {
-    this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout,
+    this(LeaderAndIsrRequest.CurrentVersion, 0, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout,
       partitionStateInfos, liveBrokers, controllerEpoch)
   }
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
+    buffer.putInt(correlationId)
     writeShortString(buffer, clientId)
     buffer.putInt(ackTimeoutMs)
     buffer.putInt(controllerEpoch)
@@ -141,6 +144,7 @@ case class LeaderAndIsrRequest (versionId: Short,
   def sizeInBytes(): Int = {
     var size =
       2 /* version id */ +
+      4 /* correlation id */ + 
       (2 + clientId.length) /* client id */ +
       4 /* ack timeout */ +
       4 /* controller epoch */ +

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1144765/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala b/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
index f2e86be..c8f1630 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
@@ -27,6 +27,7 @@ import collection.Map
 object LeaderAndIsrResponse {
   def readFrom(buffer: ByteBuffer): LeaderAndIsrResponse = {
     val versionId = buffer.getShort
+    val correlationId = buffer.getInt
     val errorCode = buffer.getShort
     val numEntries = buffer.getInt
     val responseMap = new HashMap[(String, Int), Short]()
@@ -36,18 +37,20 @@ object LeaderAndIsrResponse {
       val partitionErrorCode = buffer.getShort
       responseMap.put((topic, partition), partitionErrorCode)
     }
-    new LeaderAndIsrResponse(versionId, responseMap, errorCode)
+    new LeaderAndIsrResponse(versionId, correlationId, responseMap, errorCode)
   }
 }
 
 
 case class LeaderAndIsrResponse(versionId: Short,
+                                correlationId: Int,
                                 responseMap: Map[(String, Int), Short],
                                 errorCode: Short = ErrorMapping.NoError)
         extends RequestOrResponse {
   def sizeInBytes(): Int ={
     var size =
       2 /* version id */ +
+      4 /* correlation id */ + 
       2 /* error code */ +
       4 /* number of responses */
     for ((key, value) <- responseMap) {
@@ -61,6 +64,7 @@ case class LeaderAndIsrResponse(versionId: Short,
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
+    buffer.putInt(correlationId)
     buffer.putShort(errorCode)
     buffer.putInt(responseMap.size)
     for ((key:(String, Int), value) <- responseMap){

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1144765/core/src/main/scala/kafka/api/OffsetRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala
index ee3dff5..5239538 100644
--- a/core/src/main/scala/kafka/api/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetRequest.scala
@@ -33,6 +33,7 @@ object OffsetRequest {
 
   def readFrom(buffer: ByteBuffer): OffsetRequest = {
     val versionId = buffer.getShort
+    val correlationId = buffer.getInt
     val clientId = readShortString(buffer)
     val replicaId = buffer.getInt
     val topicCount = buffer.getInt
@@ -54,16 +55,18 @@ case class PartitionOffsetRequestInfo(time: Long, maxNumOffsets: Int)
 
 case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequestInfo],
                          versionId: Short = OffsetRequest.CurrentVersion,
+                         correlationId: Int = 0,
                          clientId: String = OffsetRequest.DefaultClientId,
                          replicaId: Int = Request.OrdinaryConsumerId)
         extends RequestOrResponse(Some(RequestKeys.OffsetsKey)) {
 
-  def this(requestInfo: Map[TopicAndPartition, PartitionOffsetRequestInfo], replicaId: Int) = this(requestInfo, OffsetRequest.CurrentVersion, OffsetRequest.DefaultClientId, replicaId)
+  def this(requestInfo: Map[TopicAndPartition, PartitionOffsetRequestInfo], correlationId: Int, replicaId: Int) = this(requestInfo, OffsetRequest.CurrentVersion, correlationId, OffsetRequest.DefaultClientId, replicaId)
 
   lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
+    buffer.putInt(correlationId)
     writeShortString(buffer, clientId)
     buffer.putInt(replicaId)
 
@@ -83,6 +86,7 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ
 
   def sizeInBytes =
     2 + /* versionId */
+    4 + /* correlationId */
     shortStringLength(clientId) +
     4 + /* replicaId */
     4 + /* topic count */

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1144765/core/src/main/scala/kafka/api/OffsetResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetResponse.scala b/core/src/main/scala/kafka/api/OffsetResponse.scala
index 10a7715..7818b66 100644
--- a/core/src/main/scala/kafka/api/OffsetResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetResponse.scala
@@ -26,6 +26,7 @@ object OffsetResponse {
 
   def readFrom(buffer: ByteBuffer): OffsetResponse = {
     val versionId = buffer.getShort
+    val correlationId = buffer.getInt
     val numTopics = buffer.getInt
     val pairs = (1 to numTopics).flatMap(_ => {
       val topic = readShortString(buffer)
@@ -38,7 +39,7 @@ object OffsetResponse {
         (TopicAndPartition(topic, partition), PartitionOffsetsResponse(error, offsets))
       })
     })
-    OffsetResponse(versionId, Map(pairs:_*))
+    OffsetResponse(versionId, correlationId, Map(pairs:_*))
   }
 
 }
@@ -48,6 +49,7 @@ case class PartitionOffsetsResponse(error: Short, offsets: Seq[Long])
 
 
 case class OffsetResponse(versionId: Short,
+                          correlationId: Int,
                           partitionErrorAndOffsets: Map[TopicAndPartition, PartitionOffsetsResponse])
         extends RequestOrResponse {
 
@@ -57,6 +59,7 @@ case class OffsetResponse(versionId: Short,
 
   val sizeInBytes = {
     2 + /* versionId */
+    4 + /* correlation id */
     4 + /* topic count */
     offsetsGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => {
       val (topic, errorAndOffsetsMap) = currTopic
@@ -75,6 +78,7 @@ case class OffsetResponse(versionId: Short,
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
+    buffer.putInt(correlationId)
     buffer.putInt(offsetsGroupedByTopic.size) // topic count
     offsetsGroupedByTopic.foreach {
       case((topic, errorAndOffsetsMap)) =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1144765/core/src/main/scala/kafka/api/StopReplicaRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
index 9088fa9..6583d64 100644
--- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala
+++ b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
@@ -31,6 +31,7 @@ object StopReplicaRequest extends Logging {
 
   def readFrom(buffer: ByteBuffer): StopReplicaRequest = {
     val versionId = buffer.getShort
+    val correlationId = buffer.getInt
     val clientId = readShortString(buffer)
     val ackTimeoutMs = buffer.getInt
     val controllerEpoch = buffer.getInt
@@ -45,11 +46,12 @@ object StopReplicaRequest extends Logging {
     (1 to topicPartitionPairCount) foreach { _ =>
       topicPartitionPairSet.add(readShortString(buffer), buffer.getInt)
     }
-    StopReplicaRequest(versionId, clientId, ackTimeoutMs, deletePartitions, topicPartitionPairSet.toSet, controllerEpoch)
+    StopReplicaRequest(versionId, correlationId, clientId, ackTimeoutMs, deletePartitions, topicPartitionPairSet.toSet, controllerEpoch)
   }
 }
 
 case class StopReplicaRequest(versionId: Short,
+                              correlationId: Int,
                               clientId: String,
                               ackTimeoutMs: Int,
                               deletePartitions: Boolean,
@@ -58,12 +60,13 @@ case class StopReplicaRequest(versionId: Short,
         extends RequestOrResponse(Some(RequestKeys.StopReplicaKey)) {
 
   def this(deletePartitions: Boolean, partitions: Set[(String, Int)], controllerEpoch: Int) = {
-    this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout,
+    this(StopReplicaRequest.CurrentVersion, 0, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout,
          deletePartitions, partitions, controllerEpoch)
   }
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
+    buffer.putInt(correlationId)
     writeShortString(buffer, clientId)
     buffer.putInt(ackTimeoutMs)
     buffer.putInt(controllerEpoch)
@@ -78,6 +81,7 @@ case class StopReplicaRequest(versionId: Short,
   def sizeInBytes(): Int = {
     var size =
       2 + /* versionId */
+      4 + /* correlation id */
       ApiUtils.shortStringLength(clientId) +
       4 + /* ackTimeoutMs */
       4 + /* controller epoch */

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1144765/core/src/main/scala/kafka/api/StopReplicaResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/StopReplicaResponse.scala b/core/src/main/scala/kafka/api/StopReplicaResponse.scala
index e0d3de6..6062a0c 100644
--- a/core/src/main/scala/kafka/api/StopReplicaResponse.scala
+++ b/core/src/main/scala/kafka/api/StopReplicaResponse.scala
@@ -27,6 +27,7 @@ import kafka.api.ApiUtils._
 object StopReplicaResponse {
   def readFrom(buffer: ByteBuffer): StopReplicaResponse = {
     val versionId = buffer.getShort
+    val correlationId = buffer.getInt
     val errorCode = buffer.getShort
     val numEntries = buffer.getInt
 
@@ -37,17 +38,19 @@ object StopReplicaResponse {
       val partitionErrorCode = buffer.getShort()
       responseMap.put((topic, partition), partitionErrorCode)
     }
-    new StopReplicaResponse(versionId, responseMap.toMap, errorCode)
+    new StopReplicaResponse(versionId, correlationId, responseMap.toMap, errorCode)
   }
 }
 
 
 case class StopReplicaResponse(val versionId: Short,
+                               val correlationId: Int,
                                val responseMap: Map[(String, Int), Short],
                                val errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{
   def sizeInBytes(): Int ={
     var size =
       2 /* version id */ +
+      4 /* correlation id */ + 
       2 /* error code */ +
       4 /* number of responses */
     for ((key, value) <- responseMap) {
@@ -61,6 +64,7 @@ case class StopReplicaResponse(val versionId: Short,
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
+    buffer.putInt(correlationId)
     buffer.putShort(errorCode)
     buffer.putInt(responseMap.size)
     for ((key:(String, Int), value) <- responseMap){

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1144765/core/src/main/scala/kafka/api/TopicMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadata.scala b/core/src/main/scala/kafka/api/TopicMetadata.scala
index e2d03e8..409de76 100644
--- a/core/src/main/scala/kafka/api/TopicMetadata.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadata.scala
@@ -21,57 +21,29 @@ import kafka.cluster.Broker
 import java.nio.ByteBuffer
 import kafka.api.ApiUtils._
 import kafka.utils.Logging
-import collection.mutable.ListBuffer
-import kafka.common.{KafkaException, ErrorMapping}
-
-/**
- * topic (2 bytes + topic.length)
- * number of partitions (4 bytes)
- *
- * partition id (4 bytes)
- *
- * does leader exist (1 byte)
- * leader info (4 + creator.length + host.length + 4 (port) + 4 (id))
- * number of replicas (2 bytes)
- * replica info (4 + creator.length + host.length + 4 (port) + 4 (id))
- * number of in sync replicas (2 bytes)
- * replica info (4 + creator.length + host.length + 4 (port) + 4 (id))
- *
- * does log metadata exist (1 byte)
- * number of log segments (4 bytes)
- * total size of log in bytes (8 bytes)
- *
- * number of log segments (4 bytes)
- * beginning offset (8 bytes)
- * last modified timestamp (8 bytes)
- * size of log segment (8 bytes)
- *
- */
-
-sealed trait LeaderRequest { def requestId: Byte }
-case object LeaderExists extends LeaderRequest { val requestId: Byte = 1 }
-case object LeaderDoesNotExist extends LeaderRequest { val requestId: Byte = 0 }
+import collection.mutable.ArrayBuffer
+import kafka.common._
 
 object TopicMetadata {
+  
+  val NoLeaderNodeId = -1
 
-  def readFrom(buffer: ByteBuffer): TopicMetadata = {
+  def readFrom(buffer: ByteBuffer, brokers: Map[Int, Broker]): TopicMetadata = {
     val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue))
     val topic = readShortString(buffer)
     val numPartitions = readIntInRange(buffer, "number of partitions", (0, Int.MaxValue))
-    val partitionsMetadata = new ListBuffer[PartitionMetadata]()
+    val partitionsMetadata = new ArrayBuffer[PartitionMetadata]()
     for(i <- 0 until numPartitions)
-      partitionsMetadata += PartitionMetadata.readFrom(buffer)
+      partitionsMetadata += PartitionMetadata.readFrom(buffer, brokers)
     new TopicMetadata(topic, partitionsMetadata, errorCode)
   }
 }
 
 case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadata], errorCode: Short = ErrorMapping.NoError) extends Logging {
   def sizeInBytes: Int = {
-    var size: Int = 2   /* error code */
-    size += shortStringLength(topic)
-    size += partitionsMetadata.foldLeft(4 /* number of partitions */)(_ + _.sizeInBytes)
-    debug("Size of topic metadata = " + size)
-    size
+    2 /* error code */ + 
+    shortStringLength(topic) + 
+    4 + partitionsMetadata.map(_.sizeInBytes).sum /* size and partition data array */
   }
 
   def writeTo(buffer: ByteBuffer) {
@@ -87,40 +59,24 @@ case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadat
 
 object PartitionMetadata {
 
-  def readFrom(buffer: ByteBuffer): PartitionMetadata = {
+  def readFrom(buffer: ByteBuffer, brokers: Map[Int, Broker]): PartitionMetadata = {
     val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue))
     val partitionId = readIntInRange(buffer, "partition id", (0, Int.MaxValue)) /* partition id */
-    val doesLeaderExist = getLeaderRequest(buffer.get)
-    val leader = doesLeaderExist match {
-      case LeaderExists => /* leader exists */
-        Some(Broker.readFrom(buffer))
-      case LeaderDoesNotExist => None
-    }
+    val leaderId = buffer.getInt
+    val leader = brokers.get(leaderId)
 
     /* list of all replicas */
-    val numReplicas = readShortInRange(buffer, "number of all replicas", (0, Short.MaxValue))
-    val replicas = new Array[Broker](numReplicas)
-    for(i <- 0 until numReplicas) {
-      replicas(i) = Broker.readFrom(buffer)
-    }
+    val numReplicas = readIntInRange(buffer, "number of all replicas", (0, Int.MaxValue))
+    val replicaIds = (0 until numReplicas).map(_ => buffer.getInt)
+    val replicas = replicaIds.map(brokers)
 
     /* list of in-sync replicas */
-    val numIsr = readShortInRange(buffer, "number of in-sync replicas", (0, Short.MaxValue))
-    val isr = new Array[Broker](numIsr)
-    for(i <- 0 until numIsr) {
-      isr(i) = Broker.readFrom(buffer)
-    }
+    val numIsr = readIntInRange(buffer, "number of in-sync replicas", (0, Int.MaxValue))
+    val isrIds = (0 until numIsr).map(_ => buffer.getInt)
+    val isr = isrIds.map(brokers)
 
     new PartitionMetadata(partitionId, leader, replicas, isr, errorCode)
   }
-
-  private def getLeaderRequest(requestId: Byte): LeaderRequest = {
-    requestId match {
-      case LeaderExists.requestId => LeaderExists
-      case LeaderDoesNotExist.requestId => LeaderDoesNotExist
-      case _ => throw new KafkaException("Unknown leader request id " + requestId)
-    }
-  }
 }
 
 case class PartitionMetadata(partitionId: Int, 
@@ -129,42 +85,28 @@ case class PartitionMetadata(partitionId: Int,
                              isr: Seq[Broker] = Seq.empty,
                              errorCode: Short = ErrorMapping.NoError) extends Logging {
   def sizeInBytes: Int = {
-    var size: Int = 2 /* error code */ + 4 /* partition id */ + 1 /* if leader exists*/
-
-    leader match {
-      case Some(l) => size += l.sizeInBytes
-      case None =>
-    }
-
-    size += 2 /* number of replicas */
-    size += replicas.foldLeft(0)(_ + _.sizeInBytes)
-    size += 2 /* number of in sync replicas */
-    size += isr.foldLeft(0)(_ + _.sizeInBytes)
-
-    debug("Size of partition metadata = " + size)
-    size
+    2 /* error code */ + 
+    4 /* partition id */ + 
+    4 /* leader */ + 
+    4 + 4 * replicas.size /* replica array */ + 
+    4 + 4 * isr.size /* isr array */
   }
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(errorCode)
     buffer.putInt(partitionId)
 
-    /* if leader exists*/
-    leader match {
-      case Some(l) =>
-        buffer.put(LeaderExists.requestId)
-        /* leader id host_name port */
-        l.writeTo(buffer)
-      case None => buffer.put(LeaderDoesNotExist.requestId)
-    }
+    /* leader */
+    val leaderId = if(leader.isDefined) leader.get.id else TopicMetadata.NoLeaderNodeId
+    buffer.putInt(leaderId)
 
     /* number of replicas */
-    buffer.putShort(replicas.size.toShort)
-    replicas.foreach(r => r.writeTo(buffer))
+    buffer.putInt(replicas.size)
+    replicas.foreach(r => buffer.putInt(r.id))
 
     /* number of in-sync replicas */
-    buffer.putShort(isr.size.toShort)
-    isr.foreach(r => r.writeTo(buffer))
+    buffer.putInt(isr.size)
+    isr.foreach(r => buffer.putInt(r.id))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1144765/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
index 70c42e3..0a99779 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
@@ -33,6 +33,7 @@ object TopicMetadataRequest extends Logging {
 
   def readFrom(buffer: ByteBuffer): TopicMetadataRequest = {
     val versionId = buffer.getShort
+    val correlationId = buffer.getInt
     val clientId = readShortString(buffer)
     val numTopics = readIntInRange(buffer, "number of topics", (0, Int.MaxValue))
     val topics = new ListBuffer[String]()
@@ -40,26 +41,28 @@ object TopicMetadataRequest extends Logging {
       topics += readShortString(buffer)
     val topicsList = topics.toList
     debug("topic = %s".format(topicsList.head))
-    new TopicMetadataRequest(versionId, clientId, topics.toList)
+    new TopicMetadataRequest(versionId, clientId, topics.toList, correlationId)
   }
 }
 
 case class TopicMetadataRequest(val versionId: Short,
                                 val clientId: String,
-                                val topics: Seq[String])
+                                val topics: Seq[String],
+                                val correlationId: Int)
  extends RequestOrResponse(Some(RequestKeys.MetadataKey)){
 
 def this(topics: Seq[String]) =
-  this(TopicMetadataRequest.CurrentVersion, TopicMetadataRequest.DefaultClientId, topics)
+  this(TopicMetadataRequest.CurrentVersion, TopicMetadataRequest.DefaultClientId, topics, 0)
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
+    buffer.putInt(correlationId) // correlation id not set yet
     writeShortString(buffer, clientId)
     buffer.putInt(topics.size)
     topics.foreach(topic => writeShortString(buffer, topic))
   }
 
   def sizeInBytes(): Int = {
-    2 + (2 + clientId.length) + 4 /* number of topics */ + topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */
+    2 + 4 + shortStringLength(clientId) + 4 /* number of topics */ + topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1144765/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
index 25068d1..0631201 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
@@ -17,30 +17,46 @@
 
 package kafka.api
 
+import kafka.cluster.Broker
 import java.nio.ByteBuffer
 
 object TopicMetadataResponse {
 
   def readFrom(buffer: ByteBuffer): TopicMetadataResponse = {
     val versionId = buffer.getShort
+    val correlationId = buffer.getInt
+    val brokerCount = buffer.getInt
+    val brokers = (0 until brokerCount).map(_ => Broker.readFrom(buffer))
+    val brokerMap = brokers.map(b => (b.id, b)).toMap
     val topicCount = buffer.getInt
-    val topicsMetadata = new Array[TopicMetadata](topicCount)
-    for( i <- 0 until topicCount) {
-      topicsMetadata(i) = TopicMetadata.readFrom(buffer)
-    }
-    new TopicMetadataResponse(versionId, topicsMetadata.toSeq)
+    val topicsMetadata = (0 until topicCount).map(_ => TopicMetadata.readFrom(buffer, brokerMap))
+    new TopicMetadataResponse(versionId, topicsMetadata, correlationId)
   }
 }
 
 case class TopicMetadataResponse(versionId: Short,
-                                 topicsMetadata: Seq[TopicMetadata]) extends RequestOrResponse
-{
-  val sizeInBytes = 2 + topicsMetadata.foldLeft(4)(_ + _.sizeInBytes)
+                                 topicsMetadata: Seq[TopicMetadata],
+                                 correlationId: Int) extends RequestOrResponse {
+  val sizeInBytes: Int = {
+    val brokers = extractBrokers(topicsMetadata).values
+    2 + 4 + 4 + brokers.map(_.sizeInBytes).sum + 4 + topicsMetadata.map(_.sizeInBytes).sum
+  }
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
+    buffer.putInt(correlationId)
+    /* brokers */
+    val brokers = extractBrokers(topicsMetadata).values
+    buffer.putInt(brokers.size)
+    brokers.foreach(_.writeTo(buffer))
     /* topic metadata */
     buffer.putInt(topicsMetadata.length)
     topicsMetadata.foreach(_.writeTo(buffer))
   }
+    
+  def extractBrokers(topicMetadatas: Seq[TopicMetadata]): Map[Int, Broker] = {
+    val parts = topicsMetadata.flatMap(_.partitionsMetadata)
+    val brokers = parts.flatMap(_.replicas) ++ parts.map(_.leader).collect{case Some(l) => l}
+    brokers.map(b => (b.id, b)).toMap
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1144765/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index cc4df5d..c61833b 100644
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -75,8 +75,7 @@ object ClientUtils extends Logging{
       val brokerInfos = brokerStr.split(":")
       val hostName = brokerInfos(0)
       val port = brokerInfos(1).toInt
-      val creatorId = hostName + "-" + System.currentTimeMillis()
-      new Broker(brokerId, creatorId, hostName, port)
+      new Broker(brokerId, hostName, port)
     })
   }
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1144765/core/src/main/scala/kafka/cluster/Broker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index 6e072bf..ffedecd 100644
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -31,38 +31,32 @@ private[kafka] object Broker {
     if(brokerInfoString == null)
       throw new BrokerNotAvailableException("Broker id %s does not exist".format(id))
     val brokerInfo = brokerInfoString.split(":")
-    new Broker(id, brokerInfo(0), brokerInfo(1), brokerInfo(2).toInt)
+    new Broker(id, brokerInfo(0), brokerInfo(1).toInt)
   }
 
   def readFrom(buffer: ByteBuffer): Broker = {
     val id = buffer.getInt
-    val creatorId = readShortString(buffer)
     val host = readShortString(buffer)
     val port = buffer.getInt
-    new Broker(id, creatorId, host, port)
+    new Broker(id, host, port)
   }
 }
 
-private[kafka] case class Broker(val id: Int, val creatorId: String, val host: String, val port: Int) {
+private[kafka] case class Broker(val id: Int, val host: String, val port: Int) {
   
-  override def toString(): String = new String("id:" + id + ",creatorId:" + creatorId + ",host:" + host + ",port:" + port)
+  override def toString(): String = new String("id:" + id + ",host:" + host + ",port:" + port)
 
-  def getZkString(): String = new String(creatorId + ":" + host + ":" + port)
+  def getZkString(): String = host + ":" + port
 
-  def getConnectionString(): String = new String(host + ":" + port)
+  def getConnectionString(): String = host + ":" + port
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putInt(id)
-    writeShortString(buffer, creatorId)
     writeShortString(buffer, host)
     buffer.putInt(port)
   }
 
-  def sizeInBytes: Int = {
-    val size = shortStringLength(creatorId) + shortStringLength(host) /* host name */ + 4 /* port */ + 4 /* broker id*/
-    debug("Size of broker info = " + size)
-    size
-  }
+  def sizeInBytes: Int = shortStringLength(host) /* host name */ + 4 /* port */ + 4 /* broker id*/
 
   override def equals(obj: Any): Boolean = {
     obj match {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1144765/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index 7ecd11f..5e1e6ab 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -30,8 +30,12 @@ import kafka.cluster.Broker
 
 
 object SimpleConsumer extends Logging {
-  def earliestOrLatestOffset(broker: Broker, topic: String, partitionId: Int, earliestOrLatest: Long,
-                             clientId: String, isFromOrdinaryConsumer: Boolean): Long = {
+  def earliestOrLatestOffset(broker: Broker, 
+                             topic: String, 
+                             partitionId: Int, 
+                             earliestOrLatest: Long,
+                             clientId: String, 
+                             isFromOrdinaryConsumer: Boolean): Long = {
     var simpleConsumer: SimpleConsumer = null
     var producedOffset: Long = -1L
     try {
@@ -42,7 +46,7 @@ object SimpleConsumer extends Logging {
         new OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)))
       else
         new OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)),
-                          Request.DebuggingConsumerId)
+                          0, Request.DebuggingConsumerId)
       producedOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
     } catch {
       case e =>
@@ -55,8 +59,13 @@ object SimpleConsumer extends Logging {
     producedOffset
   }
 
-  def earliestOrLatestOffset(zkClient: ZkClient, topic: String, brokerId: Int, partitionId: Int,
-                             earliestOrLatest: Long, clientId: String, isFromOrdinaryConsumer: Boolean = true): Long = {
+  def earliestOrLatestOffset(zkClient: ZkClient, 
+                             topic: String, 
+                             brokerId: Int, 
+                             partitionId: Int,
+                             earliestOrLatest: Long, 
+                             clientId: String, 
+                             isFromOrdinaryConsumer: Boolean = true): Long = {
     val cluster = getCluster(zkClient)
     val broker = cluster.getBroker(brokerId) match {
       case Some(b) => b

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1144765/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
index ad98b75..dbf04fd 100644
--- a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
@@ -20,14 +20,15 @@ import kafka.api._
 import java.nio.ByteBuffer
 import scala.collection.JavaConversions
 
-class TopicMetadataRequest(val versionId: Short,
+class TopicMetadataRequest(val correlationId: Int,
+                           val versionId: Short,
                            val clientId: String,
                            val topics: java.util.List[String]) extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey)) {
   val underlying: kafka.api.TopicMetadataRequest =
-    new kafka.api.TopicMetadataRequest(versionId, clientId, JavaConversions.asBuffer(topics))
+    new kafka.api.TopicMetadataRequest(versionId, clientId, JavaConversions.asBuffer(topics), correlationId)
 
   def this(topics: java.util.List[String]) =
-    this(kafka.api.TopicMetadataRequest.CurrentVersion, kafka.api.TopicMetadataRequest.DefaultClientId, topics)
+    this(0, kafka.api.TopicMetadataRequest.CurrentVersion, kafka.api.TopicMetadataRequest.DefaultClientId, topics)
 
   def writeTo(buffer: ByteBuffer) = underlying.writeTo(buffer)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1144765/core/src/main/scala/kafka/message/Message.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/Message.scala b/core/src/main/scala/kafka/message/Message.scala
index aedab42..38c0a9a 100644
--- a/core/src/main/scala/kafka/message/Message.scala
+++ b/core/src/main/scala/kafka/message/Message.scala
@@ -38,12 +38,15 @@ object Message {
   val KeySizeOffset = AttributesOffset + AttributesLength
   val KeySizeLength = 4
   val KeyOffset = KeySizeOffset + KeySizeLength
-  val MessageOverhead = KeyOffset
+  val ValueSizeLength = 4
+  
+  /** The amount of overhead bytes in a message */
+  val MessageOverhead = KeyOffset + ValueSizeLength
   
   /**
    * The minimum valid size for the message header
    */
-  val MinHeaderSize = CrcLength + MagicLength + AttributesLength + KeySizeLength
+  val MinHeaderSize = CrcLength + MagicLength + AttributesLength + KeySizeLength + ValueSizeLength
   
   /**
    * The current "magic" value
@@ -97,22 +100,24 @@ class Message(val buffer: ByteBuffer) {
                              Message.AttributesLength + 
                              Message.KeySizeLength + 
                              (if(key == null) 0 else key.length) + 
+                             Message.ValueSizeLength + 
                              (if(payloadSize >= 0) payloadSize else bytes.length - payloadOffset)))
     // skip crc, we will fill that in at the end
-    buffer.put(MagicOffset, CurrentMagicValue)
-    var attributes:Byte = 0
+    buffer.position(MagicOffset)
+    buffer.put(CurrentMagicValue)
+    var attributes: Byte = 0
     if (codec.codec > 0)
       attributes =  (attributes | (CompressionCodeMask & codec.codec)).toByte
-    buffer.put(AttributesOffset, attributes)
+    buffer.put(attributes)
     if(key == null) {
-      buffer.putInt(KeySizeOffset, -1)
-      buffer.position(KeyOffset)
+      buffer.putInt(-1)
     } else {
-      buffer.putInt(KeySizeOffset, key.length)
-      buffer.position(KeyOffset)
+      buffer.putInt(key.length)
       buffer.put(key, 0, key.length)
     }
-    buffer.put(bytes, payloadOffset, if(payloadSize >= 0) payloadSize else bytes.length - payloadOffset)
+    val size = if(payloadSize >= 0) payloadSize else bytes.length - payloadOffset
+    buffer.putInt(size)
+    buffer.put(bytes, payloadOffset, size)
     buffer.rewind()
     
     // now compute the checksum and fill it in
@@ -171,9 +176,14 @@ class Message(val buffer: ByteBuffer) {
   def hasKey: Boolean = keySize >= 0
   
   /**
+   * The position where the payload size is stored
+   */
+  private def payloadSizeOffset = Message.KeyOffset + max(0, keySize)
+  
+  /**
    * The length of the message value in bytes
    */
-  def payloadSize: Int = size - KeyOffset - max(0, keySize)
+  def payloadSize: Int = buffer.getInt(payloadSizeOffset)
   
   /**
    * The magic version of this message
@@ -194,29 +204,27 @@ class Message(val buffer: ByteBuffer) {
   /**
    * A ByteBuffer containing the content of the message
    */
-  def payload: ByteBuffer = {
-    var payload = buffer.duplicate
-    payload.position(KeyOffset + max(keySize, 0))
-    payload = payload.slice()
-    payload.limit(payloadSize)
-    payload.rewind()
-    payload
-  }
+  def payload: ByteBuffer = sliceDelimited(payloadSizeOffset)
   
   /**
    * A ByteBuffer containing the message key
    */
-  def key: ByteBuffer = {
-    val s = keySize
-    if(s < 0) {
+  def key: ByteBuffer = sliceDelimited(KeySizeOffset)
+  
+  /**
+   * Read a size-delimited byte buffer starting at the given offset
+   */
+  private def sliceDelimited(start: Int): ByteBuffer = {
+    val size = buffer.getInt(start)
+    if(size < 0) {
       null
     } else {
-      var key = buffer.duplicate
-      key.position(KeyOffset)
-      key = key.slice()
-      key.limit(s)
-      key.rewind()
-      key
+      var b = buffer.duplicate
+      b.position(start + 4)
+      b = b.slice()
+      b.limit(size)
+      b.rewind
+      b
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1144765/core/src/main/scala/kafka/producer/ProducerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerConfig.scala b/core/src/main/scala/kafka/producer/ProducerConfig.scala
index 2c9f2d1..30b1dc3 100644
--- a/core/src/main/scala/kafka/producer/ProducerConfig.scala
+++ b/core/src/main/scala/kafka/producer/ProducerConfig.scala
@@ -33,7 +33,7 @@ class ProducerConfig private (val props: VerifiableProperties)
   /** This is for bootstrapping and the producer will only use it for getting metadata
    * (topics, partitions and replicas). The socket connections for sending the actual data
    * will be established based on the broker information returned in the metadata. The
-   * format is host1:por1,host2:port2, and the list can be a subset of brokers or
+   * format is host1:port1,host2:port2, and the list can be a subset of brokers or
    * a VIP pointing to a subset of brokers.
    */
   val brokerList = props.getString("broker.list")

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1144765/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
index f94415a..5ebd29a 100644
--- a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
@@ -41,9 +41,6 @@ trait SyncProducerConfigShared {
   val maxMessageSize = props.getInt("max.message.size", 1000000)
 
   /* the client application sending the producer requests */
-  val correlationId = props.getInt("producer.request.correlation_id", SyncProducerConfig.DefaultCorrelationId)
-
-  /* the client application sending the producer requests */
   val clientId = props.getString("clientid", SyncProducerConfig.DefaultClientId)
 
   /*
@@ -61,7 +58,6 @@ trait SyncProducerConfigShared {
 }
 
 object SyncProducerConfig {
-  val DefaultCorrelationId = -1
   val DefaultClientId = ""
   val DefaultRequiredAcks : Short = 0
   val DefaultAckTimeoutMs = 1500

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1144765/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 9f3e2ea..7d0f609 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -39,7 +39,8 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
   extends EventHandler[K,V] with Logging {
   val isSync = ("sync" == config.producerType)
 
-  val counter = new AtomicInteger(0)
+  val partitionCounter = new AtomicInteger(0)
+  val correlationCounter = new AtomicInteger(0)
   val brokerPartitionInfo = new BrokerPartitionInfo(config, producerPool, topicPartitionInfos)
 
   private val lock = new Object()
@@ -191,7 +192,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
         "\n Valid values are > 0")
     val partition =
       if(key == null)
-        Utils.abs(counter.getAndIncrement()) % numPartitions
+        Utils.abs(partitionCounter.getAndIncrement()) % numPartitions
       else
         partitioner.partition(key, numPartitions)
     if(partition < 0 || partition >= numPartitions)
@@ -212,7 +213,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
       warn("Failed to send to broker %d with data %s".format(brokerId, messagesPerTopic))
       messagesPerTopic.keys.toSeq
     } else if(messagesPerTopic.size > 0) {
-      val producerRequest = new ProducerRequest(config.correlationId, config.clientId, config.requiredAcks,
+      val producerRequest = new ProducerRequest(correlationCounter.getAndIncrement(), config.clientId, config.requiredAcks,
         config.requestTimeoutMs, messagesPerTopic)
       try {
         val syncProducer = producerPool.getProducer(brokerId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1144765/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index b3dc79d..6c01025 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -88,7 +88,7 @@ class KafkaApis(val requestChannel: RequestChannel,
               case (topicAndPartition, partitionOffsetRequest) =>
                 (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), null))
             }
-            val errorResponse = OffsetResponse(apiRequest.versionId, partitionOffsetResponseMap)
+            val errorResponse = OffsetResponse(apiRequest.versionId, apiRequest.correlationId, partitionOffsetResponseMap)
             requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
             error("error when handling request %s".format(apiRequest), e)
           case RequestKeys.MetadataKey =>
@@ -96,7 +96,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             val topicMeatadata = apiRequest.topics.map {
               topic => TopicMetadata(topic, Nil, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
             }
-            val errorResponse = TopicMetadataResponse(apiRequest.versionId, topicMeatadata)
+            val errorResponse = TopicMetadataResponse(apiRequest.versionId, topicMeatadata, apiRequest.correlationId)
             requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
             error("error when handling request %s".format(apiRequest), e)
           case RequestKeys.LeaderAndIsrKey =>
@@ -104,7 +104,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             val responseMap = apiRequest.partitionStateInfos.map {
               case (topicAndPartition, partitionAndState) => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
             }
-            val errorResponse = LeaderAndIsrResponse(apiRequest.versionId, responseMap)
+            val errorResponse = LeaderAndIsrResponse(apiRequest.versionId, apiRequest.correlationId, responseMap)
             requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
             error("error when handling request %s".format(apiRequest), e)
           case RequestKeys.StopReplicaKey =>
@@ -113,7 +113,7 @@ class KafkaApis(val requestChannel: RequestChannel,
               case topicAndPartition => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
             }.toMap
             error("error when handling request %s".format(apiRequest), e)
-            val errorResponse = StopReplicaResponse(apiRequest.versionId, responseMap)
+            val errorResponse = StopReplicaResponse(apiRequest.versionId, apiRequest.correlationId, responseMap)
             requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
         }
     } finally
@@ -127,7 +127,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     trace("Handling leader and ISR request " + leaderAndIsrRequest)
     try {
       val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest)
-      val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.versionId, response, error)
+      val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.versionId, leaderAndIsrRequest.correlationId, response, error)
       requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndIsrResponse)))
     } catch {
       case e: KafkaStorageException =>
@@ -144,7 +144,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     trace("Handling stop replica request " + stopReplicaRequest)
 
     val (response, error) = replicaManager.stopReplicas(stopReplicaRequest)
-    val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, response.toMap, error)
+    val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, stopReplicaRequest.correlationId, response.toMap, error)
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse)))
 
     replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads()
@@ -409,7 +409,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Nil) )
       }
     })
-    val response = OffsetResponse(OffsetRequest.CurrentVersion, responseMap)
+    val response = OffsetResponse(OffsetRequest.CurrentVersion, offsetRequest.correlationId, responseMap)
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }
 
@@ -458,7 +458,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
       })
     topicsMetadata.foreach(metadata => trace("Sending topic metadata " + metadata.toString))
-    val response = new TopicMetadataResponse(metadataRequest.versionId, topicsMetadata.toSeq)
+    val response = new TopicMetadataResponse(metadataRequest.versionId, topicsMetadata.toSeq, metadataRequest.correlationId)
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1144765/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
index 67a0be8..e1c11f2 100644
--- a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
+++ b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
@@ -43,8 +43,7 @@ class KafkaZooKeeper(config: KafkaConfig) extends Logging {
   private def registerBrokerInZk() {
     info("Registering broker " + brokerIdPath)
     val hostName = config.hostName
-    val creatorId = hostName + "-" + System.currentTimeMillis
-    ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, creatorId, config.port)
+    ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, config.port)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1144765/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 8e40f2b..358c4fd 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -180,9 +180,9 @@ object ZkUtils extends Logging {
     replicas.contains(brokerId.toString)
   }
 
-  def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, creator: String, port: Int) {
+  def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int) {
     val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
-    val broker = new Broker(id, creator, host, port)
+    val broker = new Broker(id, host, port)
     try {
       createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZkString)
     } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1144765/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 531f32e..391e724 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -75,10 +75,11 @@ object SerializationTestUtils{
     TopicAndPartition(topic2, 3) -> PartitionFetchInfo(4000, 100)
   )
 
-  private val partitionMetaData0 = new PartitionMetadata(0, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty)
-  private val partitionMetaData1 = new PartitionMetadata(1, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty)
-  private val partitionMetaData2 = new PartitionMetadata(2, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty)
-  private val partitionMetaData3 = new PartitionMetadata(3, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty)
+  private val brokers = List(new Broker(0, "localhost", 1011), new Broker(1, "localhost", 1012), new Broker(2, "localhost", 1013))
+  private val partitionMetaData0 = new PartitionMetadata(0, Some(brokers.head), replicas = brokers, isr = brokers, errorCode = 0)
+  private val partitionMetaData1 = new PartitionMetadata(1, Some(brokers.head), replicas = brokers, isr = brokers.tail, errorCode = 1)
+  private val partitionMetaData2 = new PartitionMetadata(2, Some(brokers.head), replicas = brokers, isr = brokers, errorCode = 2)
+  private val partitionMetaData3 = new PartitionMetadata(3, Some(brokers.head), replicas = brokers, isr = brokers.tail.tail, errorCode = 3)
   private val partitionMetaDataSeq = Seq(partitionMetaData0, partitionMetaData1, partitionMetaData2, partitionMetaData3)
   private val topicmetaData1 = new TopicMetadata(topic1, partitionMetaDataSeq)
   private val topicmetaData2 = new TopicMetadata(topic2, partitionMetaDataSeq)
@@ -94,7 +95,7 @@ object SerializationTestUtils{
   def createTestLeaderAndIsrResponse() : LeaderAndIsrResponse = {
     val responseMap = Map(((topic1, 0), ErrorMapping.NoError),
                           ((topic2, 0), ErrorMapping.NoError))
-    new LeaderAndIsrResponse(1, responseMap)
+    new LeaderAndIsrResponse(1, 1, responseMap)
   }
 
   def createTestStopReplicaRequest() : StopReplicaRequest = {
@@ -104,7 +105,7 @@ object SerializationTestUtils{
   def createTestStopReplicaResponse() : StopReplicaResponse = {
     val responseMap = Map(((topic1, 0), ErrorMapping.NoError),
                           ((topic2, 0), ErrorMapping.NoError))
-    new StopReplicaResponse(1, responseMap.toMap)
+    new StopReplicaResponse(1, 0, responseMap.toMap)
   }
 
   def createTestProducerRequest: ProducerRequest = {
@@ -131,17 +132,17 @@ object SerializationTestUtils{
   )
 
   def createTestOffsetResponse: OffsetResponse = {
-    new OffsetResponse(OffsetRequest.CurrentVersion, collection.immutable.Map(
+    new OffsetResponse(OffsetRequest.CurrentVersion, 0, collection.immutable.Map(
       TopicAndPartition(topic1, 1) -> PartitionOffsetsResponse(ErrorMapping.NoError, Seq(1000l, 2000l, 3000l, 4000l)))
     )
   }
 
   def createTestTopicMetadataRequest: TopicMetadataRequest = {
-    new TopicMetadataRequest(1, "client 1", Seq(topic1, topic2))
+    new TopicMetadataRequest(1, "client 1", Seq(topic1, topic2), 1)
   }
 
   def createTestTopicMetadataResponse: TopicMetadataResponse = {
-    new TopicMetadataResponse(1, Seq(topicmetaData1, topicmetaData2))
+    new TopicMetadataResponse(1, Seq(topicmetaData1, topicmetaData2), 1)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1144765/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index bb39e09..246b1ec 100644
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -47,7 +47,7 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
   val group = "group1"
   val consumer0 = "consumer0"
   val consumedOffset = 5
-  val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, c.brokerId.toString, "localhost", c.port)))
+  val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port)))
   val queue = new LinkedBlockingQueue[FetchedDataChunk]
   val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
                                                            c.brokerId,

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1144765/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index dec0453..021f419 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -40,7 +40,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
     yield new KafkaConfig(props)
   val messages = new mutable.HashMap[Int, Seq[Array[Byte]]]
   val topic = "topic"
-  val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, c.brokerId.toString, "localhost", c.port)))
+  val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port)))
   val shutdown = ZookeeperConsumerConnector.shutdownCommand
   val queue = new LinkedBlockingQueue[FetchedDataChunk]
   val topicInfos = configs.map(c => new PartitionTopicInfo(topic,

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1144765/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
index b6bab2d..c6ea3b6 100644
--- a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
@@ -92,7 +92,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
     log.flush()
 
     val offsets = log.getOffsetsBefore(OffsetRequest.LatestTime, 10)
-    assertEquals(Seq(20L, 15L, 10L, 5L, 0L), offsets)
+    assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), offsets)
 
     waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000)
     val topicAndPartition = TopicAndPartition(topic, part)
@@ -101,7 +101,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
       replicaId = 0)
     val consumerOffsets =
       simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
-    assertEquals(Seq(20L, 15L, 10L, 5L, 0L), consumerOffsets)
+    assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), consumerOffsets)
 
     // try to fetch using latest offset
     val fetchResponse = simpleConsumer.fetch(
@@ -157,14 +157,14 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
     val now = time.milliseconds
 
     val offsets = log.getOffsetsBefore(now, 10)
-    assertEquals(Seq(20L, 15L, 10L, 5L, 0L), offsets)
+    assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), offsets)
 
     waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000)
     val topicAndPartition = TopicAndPartition(topic, part)
     val offsetRequest = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(now, 10)), replicaId = 0)
     val consumerOffsets =
       simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
-    assertEquals(Seq(20L, 15L, 10L, 5L, 0L), consumerOffsets)
+    assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), consumerOffsets)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1144765/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 9074ca8..3b5ec7f 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -74,7 +74,7 @@ class SocketServerTest extends JUnitSuite {
   @Test
   def simpleRequest() {
     val socket = connect()
-    val correlationId = SyncProducerConfig.DefaultCorrelationId
+    val correlationId = -1
     val clientId = SyncProducerConfig.DefaultClientId
     val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
     val ack = SyncProducerConfig.DefaultRequiredAcks

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1144765/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index d67abe9..534df19 100644
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -168,8 +168,8 @@ class AsyncProducerTest extends JUnit3Suite {
 
     val props = new Properties()
     props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
-    val broker1 = new Broker(0, "localhost", "localhost", 9092)
-    val broker2 = new Broker(1, "localhost", "localhost", 9093)
+    val broker1 = new Broker(0, "localhost", 9092)
+    val broker2 = new Broker(1, "localhost", 9093)
     broker1
     // form expected partitions metadata
     val partition1Metadata = new PartitionMetadata(0, Some(broker1), List(broker1, broker2))
@@ -427,17 +427,18 @@ class AsyncProducerTest extends JUnit3Suite {
     // produce request for topic1 and partitions 0 and 1.  Let the first request fail
     // entirely.  The second request will succeed for partition 1 but fail for partition 0.
     // On the third try for partition 0, let it succeed.
-    val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), 0)
+    val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 0, correlationId = 0)
+    val request2 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 0, correlationId = 1)
     val response1 = ProducerResponse(ProducerRequest.CurrentVersion, 0,
       Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NotLeaderForPartitionCode.toShort, 0L)),
           (TopicAndPartition("topic1", 1), ProducerResponseStatus(ErrorMapping.NoError, 0L))))
-    val request2 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs))
+    val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), correlationId = 2)
     val response2 = ProducerResponse(ProducerRequest.CurrentVersion, 0,
       Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NoError, 0L))))
     val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
     EasyMock.expect(mockSyncProducer.send(request1)).andThrow(new RuntimeException) // simulate SocketTimeoutException
-    EasyMock.expect(mockSyncProducer.send(request1)).andReturn(response1)
-    EasyMock.expect(mockSyncProducer.send(request2)).andReturn(response2)
+    EasyMock.expect(mockSyncProducer.send(request2)).andReturn(response1)
+    EasyMock.expect(mockSyncProducer.send(request3)).andReturn(response2)
     EasyMock.replay(mockSyncProducer)
 
     val producerPool = EasyMock.createMock(classOf[ProducerPool])
@@ -510,7 +511,7 @@ class AsyncProducerTest extends JUnit3Suite {
   }
 
   private def getTopicMetadata(topic: String, partition: Seq[Int], brokerId: Int, brokerHost: String, brokerPort: Int): TopicMetadata = {
-    val broker1 = new Broker(brokerId, brokerHost, brokerHost, brokerPort)
+    val broker1 = new Broker(brokerId, brokerHost, brokerPort)
     new TopicMetadata(topic, partition.map(new PartitionMetadata(_, Some(broker1), List(broker1))))
   }
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1144765/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index 744554c..b289dda 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -81,7 +81,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
     props.put("connect.timeout.ms", "300")
     props.put("reconnect.interval", "500")
     props.put("max.message.size", "100")
-    val correlationId = SyncProducerConfig.DefaultCorrelationId
+    val correlationId = 0
     val clientId = SyncProducerConfig.DefaultClientId
     val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
     val ack = SyncProducerConfig.DefaultRequiredAcks
@@ -98,9 +98,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
     val props = new Properties()
     props.put("host", "localhost")
     props.put("port", server.socketServer.port.toString)
-    props.put("buffer.size", "102400")
-    props.put("connect.timeout.ms", "300")
-    props.put("reconnect.interval", "500")
+    props.put("max.message.size", 50000.toString)
     val producer = new SyncProducer(new SyncProducerConfig(props))
     CreateTopicCommand.createTopic(zkClient, "test", 1, 1)
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "test", 0, 500)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1144765/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 19b90a8..fcdd26e 100644
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -123,7 +123,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
 
     // start another controller
     val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(2, TestUtils.choosePort()))
-    val brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, "localhost", s.config.port))
+    val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.config.port))
     val controllerChannelManager = new ControllerChannelManager(brokers.toSet, controllerConfig)
     controllerChannelManager.startup()
     val staleControllerEpoch = 0

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1144765/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 3d4f3f2..a145f2a 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -24,6 +24,7 @@ import java.nio.channels._
 import java.util.Random
 import java.util.Properties
 import junit.framework.Assert._
+import kafka.api._
 import kafka.server._
 import kafka.producer._
 import kafka.message._
@@ -333,13 +334,13 @@ object TestUtils extends Logging {
   }
 
   def createBrokersInZk(zkClient: ZkClient, ids: Seq[Int]): Seq[Broker] = {
-    val brokers = ids.map(id => new Broker(id, "localhost" + System.currentTimeMillis(), "localhost", 6667))
-    brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, b.id, b.host, b.creatorId, b.port))
+    val brokers = ids.map(id => new Broker(id, "localhost", 6667))
+    brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, b.id, b.host, b.port))
     brokers
   }
 
   def deleteBrokersInZk(zkClient: ZkClient, ids: Seq[Int]): Seq[Broker] = {
-    val brokers = ids.map(id => new Broker(id, "localhost" + System.currentTimeMillis(), "localhost", 6667))
+    val brokers = ids.map(id => new Broker(id, "localhost", 6667))
     brokers.foreach(b => ZkUtils.deletePath(zkClient, ZkUtils.BrokerIdsPath + "/" + b))
     brokers
   }
@@ -354,22 +355,27 @@ object TestUtils extends Logging {
   /**
    * Create a wired format request based on simple basic information
    */
-  def produceRequest(topic: String, partition: Int, message: ByteBufferMessageSet): kafka.api.ProducerRequest = {
-    produceRequest(SyncProducerConfig.DefaultCorrelationId,topic,partition,message)
-  }
-
-  def produceRequest(correlationId: Int, topic: String, partition: Int, message: ByteBufferMessageSet): kafka.api.ProducerRequest = {
-    produceRequestWithAcks(List(topic), List(partition), message, SyncProducerConfig.DefaultRequiredAcks)
-  }
-
-  def produceRequestWithAcks(topics: Seq[String], partitions: Seq[Int], message: ByteBufferMessageSet, acks: Int): kafka.api.ProducerRequest = {
-    val correlationId = SyncProducerConfig.DefaultCorrelationId
-    val clientId = SyncProducerConfig.DefaultClientId
-    val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
+  def produceRequest(topic: String, 
+                     partition: Int, 
+                     message: ByteBufferMessageSet, 
+                     acks: Int = SyncProducerConfig.DefaultRequiredAcks,
+                     timeout: Int = SyncProducerConfig.DefaultAckTimeoutMs,
+                     correlationId: Int = 0,
+                     clientId: String = SyncProducerConfig.DefaultClientId): ProducerRequest = {
+    produceRequestWithAcks(Seq(topic), Seq(partition), message, acks, timeout, correlationId, clientId)
+  }
+
+  def produceRequestWithAcks(topics: Seq[String], 
+                             partitions: Seq[Int], 
+                             message: ByteBufferMessageSet, 
+                             acks: Int = SyncProducerConfig.DefaultRequiredAcks, 
+                             timeout: Int = SyncProducerConfig.DefaultAckTimeoutMs,
+                             correlationId: Int = 0,
+                             clientId: String = SyncProducerConfig.DefaultClientId): ProducerRequest = {
     val data = topics.flatMap(topic =>
       partitions.map(partition => (TopicAndPartition(topic,  partition), message))
     )
-    new kafka.api.ProducerRequest(correlationId, clientId, acks.toShort, ackTimeoutMs, Map(data:_*))
+    new ProducerRequest(correlationId, clientId, acks.toShort, timeout, Map(data:_*))
   }
 
   def makeLeaderForPartition(zkClient: ZkClient, topic: String,


Mime
View raw message