kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject [2/2] git commit: KAFKA-901 Kafka server can become unavailable if clients send several metadata requests; reviewed by Jun Rao
Date Fri, 17 May 2013 22:45:40 GMT
KAFKA-901 Kafka server can become unavailable if clients send several metadata requests; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cfdc403e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cfdc403e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cfdc403e

Branch: refs/heads/0.8
Commit: cfdc403e1780882f62df6e9a092cda6bea017fbe
Parents: 988d4d8
Author: Neha Narkhede <nehanarkhede@apache.org>
Authored: Fri May 17 15:45:33 2013 -0700
Committer: Neha Narkhede <nehanarkhede@apache.org>
Committed: Fri May 17 15:45:33 2013 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/admin/AdminUtils.scala   |   85 ---------
 .../main/scala/kafka/api/LeaderAndIsrRequest.scala |   34 ++--
 .../scala/kafka/api/LeaderAndIsrResponse.scala     |    2 +-
 core/src/main/scala/kafka/api/RequestKeys.scala    |    4 +-
 .../main/scala/kafka/api/StopReplicaRequest.scala  |    9 +-
 core/src/main/scala/kafka/api/TopicMetadata.scala  |   28 +++
 .../scala/kafka/api/TopicMetadataRequest.scala     |    2 +-
 .../scala/kafka/api/UpdateMetadataRequest.scala    |  104 +++++++++++
 .../scala/kafka/api/UpdateMetadataResponse.scala   |   44 +++++
 core/src/main/scala/kafka/client/ClientUtils.scala |    7 +-
 .../scala/kafka/consumer/ConsoleConsumer.scala     |    7 +
 .../main/scala/kafka/consumer/ConsumerConfig.scala |    3 +-
 .../kafka/consumer/ConsumerFetcherManager.scala    |   30 ++--
 .../kafka/consumer/ConsumerFetcherThread.scala     |    3 +-
 .../main/scala/kafka/consumer/SimpleConsumer.scala |    6 +-
 .../consumer/ZookeeperConsumerConnector.scala      |    6 +-
 .../controller/ControllerChannelManager.scala      |   47 +++++-
 .../scala/kafka/controller/KafkaController.scala   |   40 ++++-
 .../kafka/controller/PartitionStateMachine.scala   |   12 +-
 .../kafka/controller/ReplicaStateMachine.scala     |   14 +-
 .../main/scala/kafka/network/RequestChannel.scala  |    2 +-
 .../main/scala/kafka/network/SocketServer.scala    |    7 +-
 .../scala/kafka/producer/BrokerPartitionInfo.scala |    9 +-
 .../kafka/producer/async/DefaultEventHandler.scala |    8 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  142 +++++++++++----
 .../scala/kafka/server/KafkaRequestHandler.scala   |   11 +-
 core/src/main/scala/kafka/server/KafkaServer.scala |    6 +-
 .../scala/kafka/server/ReplicaFetcherThread.scala  |    4 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |    2 +-
 .../test/scala/unit/kafka/admin/AdminTest.scala    |   85 +++-------
 .../api/RequestResponseSerializationTest.scala     |    4 +-
 .../consumer/ZookeeperConsumerConnectorTest.scala  |   34 +++-
 .../kafka/integration/AutoOffsetResetTest.scala    |   10 +-
 .../kafka/integration/LazyInitProducerTest.scala   |    9 +-
 .../unit/kafka/integration/PrimitiveApiTest.scala  |    7 +-
 .../unit/kafka/integration/TopicMetadataTest.scala |  125 ++++++-------
 .../unit/kafka/producer/AsyncProducerTest.scala    |   11 +-
 .../scala/unit/kafka/producer/ProducerTest.scala   |   88 +++++----
 .../unit/kafka/server/LeaderElectionTest.scala     |    2 +-
 .../unit/kafka/server/ServerShutdownTest.scala     |    7 +-
 .../scala/unit/kafka/server/SimpleFetchTest.scala  |    9 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |    7 +
 42 files changed, 664 insertions(+), 412 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index c7652ad..41cb764 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -18,16 +18,12 @@
 package kafka.admin
 
 import java.util.Random
-import kafka.api.{TopicMetadata, PartitionMetadata}
-import kafka.cluster.Broker
 import kafka.utils.{Logging, ZkUtils}
 import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import scala.collection._
-import mutable.ListBuffer
 import scala.collection.mutable
 import kafka.common._
-import scala.Some
 
 object AdminUtils extends Logging {
   val rand = new Random
@@ -89,87 +85,6 @@ object AdminUtils extends Logging {
     }
   }
 
-  def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient): TopicMetadata =
-    fetchTopicMetadataFromZk(topic, zkClient, new mutable.HashMap[Int, Broker])
-
-  def fetchTopicMetadataFromZk(topics: Set[String], zkClient: ZkClient): Set[TopicMetadata] = {
-    val cachedBrokerInfo = new mutable.HashMap[Int, Broker]()
-    topics.map(topic => fetchTopicMetadataFromZk(topic, zkClient, cachedBrokerInfo))
-  }
-
-  private def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient, cachedBrokerInfo: mutable.HashMap[Int, Broker]): TopicMetadata = {
-    if(ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) {
-      val topicPartitionAssignment = ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic).get
-      val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
-      val partitionMetadata = sortedPartitions.map { partitionMap =>
-        val partition = partitionMap._1
-        val replicas = partitionMap._2
-        val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partition)
-        val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
-        debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader)
-
-        var leaderInfo: Option[Broker] = None
-        var replicaInfo: Seq[Broker] = Nil
-        var isrInfo: Seq[Broker] = Nil
-        try {
-          leaderInfo = leader match {
-            case Some(l) =>
-              try {
-                Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head)
-              } catch {
-                case e => throw new LeaderNotAvailableException("Leader not available for partition [%s,%d]".format(topic, partition), e)
-              }
-            case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition)
-          }
-          try {
-            replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt))
-            isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas)
-          } catch {
-            case e => throw new ReplicaNotAvailableException(e)
-          }
-          if(replicaInfo.size < replicas.size)
-            throw new ReplicaNotAvailableException("Replica information not available for following brokers: " +
-              replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(","))
-          if(isrInfo.size < inSyncReplicas.size)
-            throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " +
-              inSyncReplicas.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
-          new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
-        } catch {
-          case e =>
-            debug("Error while fetching metadata for partition [%s,%d]".format(topic, partition), e)
-            new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo,
-              ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
-        }
-      }
-      new TopicMetadata(topic, partitionMetadata)
-    } else {
-      // topic doesn't exist, send appropriate error code
-      new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode)
-    }
-  }
-
-  private def getBrokerInfoFromCache(zkClient: ZkClient,
-                                     cachedBrokerInfo: scala.collection.mutable.Map[Int, Broker],
-                                     brokerIds: Seq[Int]): Seq[Broker] = {
-    var failedBrokerIds: ListBuffer[Int] = new ListBuffer()
-    val brokerMetadata = brokerIds.map { id =>
-      val optionalBrokerInfo = cachedBrokerInfo.get(id)
-      optionalBrokerInfo match {
-        case Some(brokerInfo) => Some(brokerInfo) // return broker info from the cache
-        case None => // fetch it from zookeeper
-          ZkUtils.getBrokerInfo(zkClient, id) match {
-            case Some(brokerInfo) =>
-              cachedBrokerInfo += (id -> brokerInfo)
-              Some(brokerInfo)
-            case None =>
-              failedBrokerIds += id
-              None
-          }
-      }
-    }
-    brokerMetadata.filter(_.isDefined).map(_.get)
-  }
-
   private def getWrappedIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
     val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
     (firstReplicaIndex + shift) % nBrokers

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index 68e64d6..a474474 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -50,23 +50,29 @@ object PartitionStateInfo {
     val controllerEpoch = buffer.getInt
     val leader = buffer.getInt
     val leaderEpoch = buffer.getInt
-    val isrString = readShortString(buffer)
-    val isr = Utils.parseCsvList(isrString).map(_.toInt).toList
+    val isrSize = buffer.getInt
+    val isr = for(i <- 0 until isrSize) yield buffer.getInt
     val zkVersion = buffer.getInt
     val replicationFactor = buffer.getInt
-    PartitionStateInfo(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, leaderEpoch, isr, zkVersion), controllerEpoch),
-      replicationFactor)
+    val replicas = for(i <- 0 until replicationFactor) yield buffer.getInt
+    PartitionStateInfo(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, leaderEpoch, isr.toList, zkVersion), controllerEpoch),
+                       replicas.toSet)
   }
 }
 
-case class PartitionStateInfo(val leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, val replicationFactor: Int) {
+case class PartitionStateInfo(val leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
+                              val allReplicas: Set[Int]) {
+  def replicationFactor = allReplicas.size
+
   def writeTo(buffer: ByteBuffer) {
     buffer.putInt(leaderIsrAndControllerEpoch.controllerEpoch)
     buffer.putInt(leaderIsrAndControllerEpoch.leaderAndIsr.leader)
     buffer.putInt(leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch)
-    writeShortString(buffer, leaderIsrAndControllerEpoch.leaderAndIsr.isr.mkString(","))
+    buffer.putInt(leaderIsrAndControllerEpoch.leaderAndIsr.isr.size)
+    leaderIsrAndControllerEpoch.leaderAndIsr.isr.foreach(buffer.putInt(_))
     buffer.putInt(leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion)
     buffer.putInt(replicationFactor)
+    allReplicas.foreach(buffer.putInt(_))
   }
 
   def sizeInBytes(): Int = {
@@ -74,9 +80,11 @@ case class PartitionStateInfo(val leaderIsrAndControllerEpoch: LeaderIsrAndContr
       4 /* epoch of the controller that elected the leader */ +
       4 /* leader broker id */ +
       4 /* leader epoch */ +
-      (2 + leaderIsrAndControllerEpoch.leaderAndIsr.isr.mkString(",").length) +
+      4 /* number of replicas in isr */ +
+      4 * leaderIsrAndControllerEpoch.leaderAndIsr.isr.size /* replicas in isr */ +
       4 /* zk version */ +
-      4 /* replication factor */
+      4 /* replication factor */ +
+      allReplicas.size * 4
     size
   }
   
@@ -84,6 +92,7 @@ case class PartitionStateInfo(val leaderIsrAndControllerEpoch: LeaderIsrAndContr
     val partitionStateInfo = new StringBuilder
     partitionStateInfo.append("(LeaderAndIsrInfo:" + leaderIsrAndControllerEpoch.toString)
     partitionStateInfo.append(",ReplicationFactor:" + replicationFactor + ")")
+    partitionStateInfo.append(",AllReplicas:" + allReplicas.mkString(",") + ")")
     partitionStateInfo.toString()
   }
 }
@@ -98,7 +107,6 @@ object LeaderAndIsrRequest {
     val versionId = buffer.getShort
     val correlationId = buffer.getInt
     val clientId = readShortString(buffer)
-    val ackTimeoutMs = buffer.getInt
     val controllerId = buffer.getInt
     val controllerEpoch = buffer.getInt
     val partitionStateInfosCount = buffer.getInt
@@ -117,14 +125,13 @@ object LeaderAndIsrRequest {
     for (i <- 0 until leadersCount)
       leaders += Broker.readFrom(buffer)
 
-    new LeaderAndIsrRequest(versionId, correlationId, clientId, ackTimeoutMs, controllerId, controllerEpoch, partitionStateInfos.toMap, leaders)
+    new LeaderAndIsrRequest(versionId, correlationId, clientId, controllerId, controllerEpoch, partitionStateInfos.toMap, leaders)
   }
 }
 
 case class LeaderAndIsrRequest (versionId: Short,
                                 override val correlationId: Int,
                                 clientId: String,
-                                ackTimeoutMs: Int,
                                 controllerId: Int,
                                 controllerEpoch: Int,
                                 partitionStateInfos: Map[(String, Int), PartitionStateInfo],
@@ -133,7 +140,7 @@ case class LeaderAndIsrRequest (versionId: Short,
 
   def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], aliveLeaders: Set[Broker], controllerId: Int,
            controllerEpoch: Int, correlationId: Int, clientId: String) = {
-    this(LeaderAndIsrRequest.CurrentVersion, correlationId, clientId, LeaderAndIsrRequest.DefaultAckTimeout,
+    this(LeaderAndIsrRequest.CurrentVersion, correlationId, clientId,
          controllerId, controllerEpoch, partitionStateInfos, aliveLeaders)
   }
 
@@ -141,7 +148,6 @@ case class LeaderAndIsrRequest (versionId: Short,
     buffer.putShort(versionId)
     buffer.putInt(correlationId)
     writeShortString(buffer, clientId)
-    buffer.putInt(ackTimeoutMs)
     buffer.putInt(controllerId)
     buffer.putInt(controllerEpoch)
     buffer.putInt(partitionStateInfos.size)
@@ -159,7 +165,6 @@ case class LeaderAndIsrRequest (versionId: Short,
       2 /* version id */ +
       4 /* correlation id */ + 
       (2 + clientId.length) /* client id */ +
-      4 /* ack timeout */ +
       4 /* controller id */ +
       4 /* controller epoch */ +
       4 /* number of partitions */
@@ -179,7 +184,6 @@ case class LeaderAndIsrRequest (versionId: Short,
     leaderAndIsrRequest.append(";ControllerEpoch:" + controllerEpoch)
     leaderAndIsrRequest.append(";CorrelationId:" + correlationId)
     leaderAndIsrRequest.append(";ClientId:" + clientId)
-    leaderAndIsrRequest.append(";AckTimeoutMs:" + ackTimeoutMs + " ms")
     leaderAndIsrRequest.append(";PartitionState:" + partitionStateInfos.mkString(","))
     leaderAndIsrRequest.append(";Leaders:" + aliveLeaders.mkString(","))
     leaderAndIsrRequest.toString()

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala b/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
index b4cfae8..378b2b3 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
@@ -63,7 +63,7 @@ case class LeaderAndIsrResponse(override val correlationId: Int,
     buffer.putInt(correlationId)
     buffer.putShort(errorCode)
     buffer.putInt(responseMap.size)
-    for ((key:(String, Int), value) <- responseMap){
+    for ((key:(String, Int), value) <- responseMap) {
       writeShortString(buffer, key._1)
       buffer.putInt(key._2)
       buffer.putShort(value)

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/main/scala/kafka/api/RequestKeys.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala
index b000eb7..541cf84 100644
--- a/core/src/main/scala/kafka/api/RequestKeys.scala
+++ b/core/src/main/scala/kafka/api/RequestKeys.scala
@@ -27,6 +27,7 @@ object RequestKeys {
   val MetadataKey: Short = 3
   val LeaderAndIsrKey: Short = 4
   val StopReplicaKey: Short = 5
+  val UpdateMetadataKey: Short = 6
 
   val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]=
     Map(ProduceKey -> ("Produce", ProducerRequest.readFrom),
@@ -34,7 +35,8 @@ object RequestKeys {
         OffsetsKey -> ("Offsets", OffsetRequest.readFrom),
         MetadataKey -> ("Metadata", TopicMetadataRequest.readFrom),
         LeaderAndIsrKey -> ("LeaderAndIsr", LeaderAndIsrRequest.readFrom),
-        StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom))
+        StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom),
+        UpdateMetadataKey -> ("UpdateMetadata", UpdateMetadataRequest.readFrom))
 
   def nameForKey(key: Short): String = {
     keyToNameAndDeserializerMap.get(key) match {

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/main/scala/kafka/api/StopReplicaRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
index cd55db4..efd7046 100644
--- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala
+++ b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
@@ -35,7 +35,6 @@ object StopReplicaRequest extends Logging {
     val versionId = buffer.getShort
     val correlationId = buffer.getInt
     val clientId = readShortString(buffer)
-    val ackTimeoutMs = buffer.getInt
     val controllerId = buffer.getInt
     val controllerEpoch = buffer.getInt
     val deletePartitions = buffer.get match {
@@ -49,7 +48,7 @@ object StopReplicaRequest extends Logging {
     (1 to topicPartitionPairCount) foreach { _ =>
       topicPartitionPairSet.add(readShortString(buffer), buffer.getInt)
     }
-    StopReplicaRequest(versionId, correlationId, clientId, ackTimeoutMs, controllerId, controllerEpoch,
+    StopReplicaRequest(versionId, correlationId, clientId, controllerId, controllerEpoch,
                        deletePartitions, topicPartitionPairSet.toSet)
   }
 }
@@ -57,7 +56,6 @@ object StopReplicaRequest extends Logging {
 case class StopReplicaRequest(versionId: Short,
                               override val correlationId: Int,
                               clientId: String,
-                              ackTimeoutMs: Int,
                               controllerId: Int,
                               controllerEpoch: Int,
                               deletePartitions: Boolean,
@@ -65,7 +63,7 @@ case class StopReplicaRequest(versionId: Short,
         extends RequestOrResponse(Some(RequestKeys.StopReplicaKey), correlationId) {
 
   def this(deletePartitions: Boolean, partitions: Set[(String, Int)], controllerId: Int, controllerEpoch: Int, correlationId: Int) = {
-    this(StopReplicaRequest.CurrentVersion, correlationId, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout,
+    this(StopReplicaRequest.CurrentVersion, correlationId, StopReplicaRequest.DefaultClientId,
          controllerId, controllerEpoch, deletePartitions, partitions)
   }
 
@@ -73,7 +71,6 @@ case class StopReplicaRequest(versionId: Short,
     buffer.putShort(versionId)
     buffer.putInt(correlationId)
     writeShortString(buffer, clientId)
-    buffer.putInt(ackTimeoutMs)
     buffer.putInt(controllerId)
     buffer.putInt(controllerEpoch)
     buffer.put(if (deletePartitions) 1.toByte else 0.toByte)
@@ -89,7 +86,6 @@ case class StopReplicaRequest(versionId: Short,
       2 + /* versionId */
       4 + /* correlation id */
       ApiUtils.shortStringLength(clientId) +
-      4 + /* ackTimeoutMs */
       4 + /* controller id*/
       4 + /* controller epoch */
       1 + /* deletePartitions */
@@ -107,7 +103,6 @@ case class StopReplicaRequest(versionId: Short,
     stopReplicaRequest.append("; Version: " + versionId)
     stopReplicaRequest.append("; CorrelationId: " + correlationId)
     stopReplicaRequest.append("; ClientId: " + clientId)
-    stopReplicaRequest.append("; AckTimeoutMs: " + ackTimeoutMs + " ms")
     stopReplicaRequest.append("; DeletePartitions: " + deletePartitions)
     stopReplicaRequest.append("; ControllerId: " + controllerId)
     stopReplicaRequest.append("; ControllerEpoch: " + controllerEpoch)

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/main/scala/kafka/api/TopicMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadata.scala b/core/src/main/scala/kafka/api/TopicMetadata.scala
index a0d68c5..c7f7c70 100644
--- a/core/src/main/scala/kafka/api/TopicMetadata.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadata.scala
@@ -55,6 +55,34 @@ case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadat
     buffer.putInt(partitionsMetadata.size)
     partitionsMetadata.foreach(m => m.writeTo(buffer))
   }
+
+  override def toString(): String = {
+    val topicMetadataInfo = new StringBuilder
+    topicMetadataInfo.append("{TopicMetadata for topic %s -> ".format(topic))
+    errorCode match {
+      case ErrorMapping.NoError =>
+        partitionsMetadata.foreach { partitionMetadata =>
+          partitionMetadata.errorCode match {
+            case ErrorMapping.NoError =>
+              topicMetadataInfo.append("\nMetadata for partition [%s,%d] is %d".format(topic,
+                partitionMetadata.partitionId, partitionMetadata.toString()))
+            case ErrorMapping.ReplicaNotAvailableCode =>
+              // this error message means some replica other than the leader is not available. The consumer
+              // doesn't care about non leader replicas, so ignore this
+              topicMetadataInfo.append("\nMetadata for partition [%s,%d] is %d".format(topic,
+                partitionMetadata.partitionId, partitionMetadata.toString()))
+            case _ =>
+              topicMetadataInfo.append("\nMetadata for partition [%s,%d] is not available due to %s".format(topic,
+                partitionMetadata.partitionId, ErrorMapping.exceptionFor(partitionMetadata.errorCode).getClass.getName))
+          }
+        }
+      case _ =>
+        topicMetadataInfo.append("\nNo partition metadata for topic %s due to %s".format(topic,
+                                 ErrorMapping.exceptionFor(errorCode).getClass.getName))
+    }
+    topicMetadataInfo.append("}")
+    topicMetadataInfo.toString()
+  }
 }
 
 object PartitionMetadata {

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
index 7477cfd..c5221c4 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
@@ -81,7 +81,7 @@ case class TopicMetadataRequest(val versionId: Short,
     topicMetadataRequest.toString()
   }
 
-  override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+  override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
     val topicMetadata = topics.map {
       topic => TopicMetadata(topic, Nil, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
new file mode 100644
index 0000000..a797a64
--- /dev/null
+++ b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
@@ -0,0 +1,104 @@
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.api.ApiUtils._
+import kafka.cluster.Broker
+import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.network.{BoundedByteBufferSend, RequestChannel}
+import kafka.network.RequestChannel.Response
+
+object UpdateMetadataRequest {
+  val CurrentVersion = 0.shortValue
+  val IsInit: Boolean = true
+  val NotInit: Boolean = false
+  val DefaultAckTimeout: Int = 1000
+
+  def readFrom(buffer: ByteBuffer): UpdateMetadataRequest = {
+    val versionId = buffer.getShort
+    val correlationId = buffer.getInt
+    val clientId = readShortString(buffer)
+    val controllerId = buffer.getInt
+    val controllerEpoch = buffer.getInt
+    val partitionStateInfosCount = buffer.getInt
+    val partitionStateInfos = new collection.mutable.HashMap[TopicAndPartition, PartitionStateInfo]
+
+    for(i <- 0 until partitionStateInfosCount){
+      val topic = readShortString(buffer)
+      val partition = buffer.getInt
+      val partitionStateInfo = PartitionStateInfo.readFrom(buffer)
+
+      partitionStateInfos.put(TopicAndPartition(topic, partition), partitionStateInfo)
+    }
+
+    val numAliveBrokers = buffer.getInt
+    val aliveBrokers = for(i <- 0 until numAliveBrokers) yield Broker.readFrom(buffer)
+    new UpdateMetadataRequest(versionId, correlationId, clientId, controllerId, controllerEpoch,
+      partitionStateInfos.toMap, aliveBrokers.toSet)
+  }
+}
+
+case class UpdateMetadataRequest (versionId: Short,
+                                  override val correlationId: Int,
+                                  clientId: String,
+                                  controllerId: Int,
+                                  controllerEpoch: Int,
+                                  partitionStateInfos: Map[TopicAndPartition, PartitionStateInfo],
+                                  aliveBrokers: Set[Broker])
+  extends RequestOrResponse(Some(RequestKeys.UpdateMetadataKey), correlationId) {
+
+  def this(controllerId: Int, controllerEpoch: Int, correlationId: Int, clientId: String,
+           partitionStateInfos: Map[TopicAndPartition, PartitionStateInfo], aliveBrokers: Set[Broker]) = {
+    this(UpdateMetadataRequest.CurrentVersion, correlationId, clientId,
+      controllerId, controllerEpoch, partitionStateInfos, aliveBrokers)
+  }
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putShort(versionId)
+    buffer.putInt(correlationId)
+    writeShortString(buffer, clientId)
+    buffer.putInt(controllerId)
+    buffer.putInt(controllerEpoch)
+    buffer.putInt(partitionStateInfos.size)
+    for((key, value) <- partitionStateInfos){
+      writeShortString(buffer, key.topic)
+      buffer.putInt(key.partition)
+      value.writeTo(buffer)
+    }
+    buffer.putInt(aliveBrokers.size)
+    aliveBrokers.foreach(_.writeTo(buffer))
+  }
+
+  def sizeInBytes(): Int = {
+    var size =
+      2 /* version id */ +
+        4 /* correlation id */ +
+        (2 + clientId.length) /* client id */ +
+        4 /* controller id */ +
+        4 /* controller epoch */ +
+        4 /* number of partitions */
+    for((key, value) <- partitionStateInfos)
+      size += (2 + key.topic.length) /* topic */ + 4 /* partition */ + value.sizeInBytes /* partition state info */
+    size += 4 /* number of alive brokers in the cluster */
+    for(broker <- aliveBrokers)
+      size += broker.sizeInBytes /* broker info */
+    size
+  }
+
+  override def toString(): String = {
+    val updateMetadataRequest = new StringBuilder
+    updateMetadataRequest.append("Name:" + this.getClass.getSimpleName)
+    updateMetadataRequest.append(";Version:" + versionId)
+    updateMetadataRequest.append(";Controller:" + controllerId)
+    updateMetadataRequest.append(";ControllerEpoch:" + controllerEpoch)
+    updateMetadataRequest.append(";CorrelationId:" + correlationId)
+    updateMetadataRequest.append(";ClientId:" + clientId)
+    updateMetadataRequest.append(";PartitionState:" + partitionStateInfos.mkString(","))
+    updateMetadataRequest.append(";AliveBrokers:" + aliveBrokers.mkString(","))
+    updateMetadataRequest.toString()
+  }
+
+  override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+    val errorResponse = new UpdateMetadataResponse(correlationId, ErrorMapping.codeFor(e.getCause.asInstanceOf[Class[Throwable]]))
+    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala b/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala
new file mode 100644
index 0000000..b1e42c3
--- /dev/null
+++ b/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import kafka.common.{TopicAndPartition, ErrorMapping}
+import java.nio.ByteBuffer
+import kafka.api.ApiUtils._
+import collection.mutable.HashMap
+import collection.Map
+
+
+object UpdateMetadataResponse {
+  def readFrom(buffer: ByteBuffer): UpdateMetadataResponse = {
+    val correlationId = buffer.getInt
+    val errorCode = buffer.getShort
+    new UpdateMetadataResponse(correlationId, errorCode)
+  }
+}
+
+case class UpdateMetadataResponse(override val correlationId: Int,
+                                  errorCode: Short = ErrorMapping.NoError)
+  extends RequestOrResponse(correlationId = correlationId) {
+  def sizeInBytes(): Int = 4 /* correlation id */ + 2 /* error code */
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putInt(correlationId)
+    buffer.putShort(errorCode)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index 025d3ab..ebe4845 100644
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -47,7 +47,7 @@ object ClientUtils extends Logging{
         producer.close()
       }
     }
-    if(!fetchMetaDataSucceeded){
+    if(!fetchMetaDataSucceeded) {
       throw new KafkaException("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, shuffledBrokers), t)
     } else {
       debug("Successfully fetched metadata for %d topic(s) %s".format(topics.size, topics))
@@ -62,13 +62,14 @@ object ClientUtils extends Logging{
    * @param clientId The client's identifier
    * @return topic metadata response
    */
-  def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], clientId: String, timeoutMs: Int): TopicMetadataResponse = {
+  def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], clientId: String, timeoutMs: Int,
+                         correlationId: Int = 0): TopicMetadataResponse = {
     val props = new Properties()
     props.put("metadata.broker.list", brokers.map(_.getConnectionString()).mkString(","))
     props.put("client.id", clientId)
     props.put("request.timeout.ms", timeoutMs.toString)
     val producerConfig = new ProducerConfig(props)
-    fetchTopicMetadata(topics, brokers, producerConfig, 0)
+    fetchTopicMetadata(topics, brokers, producerConfig, correlationId)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
index e2b0041..89ff382 100644
--- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
@@ -84,6 +84,11 @@ object ConsoleConsumer extends Logging {
             .describedAs("ms")
             .ofType(classOf[java.lang.Integer])
             .defaultsTo(ConsumerConfig.SocketTimeout)
+    val refreshMetadataBackoffMsOpt = parser.accepts("refresh-leader-backoff-ms", "Backoff time before refreshing metadata")
+            .withRequiredArg
+            .describedAs("ms")
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(ConsumerConfig.RefreshMetadataBackoffMs)
     val consumerTimeoutMsOpt = parser.accepts("consumer-timeout-ms", "consumer throws timeout exception after waiting this much " +
             "of time without incoming messages")
             .withRequiredArg
@@ -160,6 +165,8 @@ object ConsoleConsumer extends Logging {
     props.put("auto.offset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest")
     props.put("zookeeper.connect", options.valueOf(zkConnectOpt))
     props.put("consumer.timeout.ms", options.valueOf(consumerTimeoutMsOpt).toString)
+    props.put("refresh.leader.backoff.ms", options.valueOf(refreshMetadataBackoffMsOpt).toString)
+
     val config = new ConsumerConfig(props)
     val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
index e9cfd10..9e9a8bc 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -23,6 +23,7 @@ import kafka.utils._
 import kafka.common.{InvalidConfigException, Config}
 
 object ConsumerConfig extends Config {
+  val RefreshMetadataBackoffMs = 200
   val SocketTimeout = 30 * 1000
   val SocketBufferSize = 64*1024
   val FetchSize = 1024 * 1024
@@ -115,7 +116,7 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
   val rebalanceBackoffMs = props.getInt("rebalance.backoff.ms", zkSyncTimeMs)
 
   /** backoff time to refresh the leader of a partition after it loses the current leader */
-  val refreshLeaderBackoffMs = props.getInt("refresh.leader.backoff.ms", 200)
+  val refreshLeaderBackoffMs = props.getInt("refresh.leader.backoff.ms", RefreshMetadataBackoffMs)
 
   /* what to do if an offset is out of range.
      smallest : automatically reset the offset to the smallest offset

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index c6250dc..658b5c1 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -28,6 +28,7 @@ import kafka.utils.ZkUtils._
 import kafka.utils.{ShutdownableThread, SystemTime}
 import kafka.common.TopicAndPartition
 import kafka.client.ClientUtils
+import java.util.concurrent.atomic.AtomicInteger
 
 /**
  *  Usage:
@@ -44,6 +45,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
   private val lock = new ReentrantLock
   private val cond = lock.newCondition()
   private var leaderFinderThread: ShutdownableThread = null
+  private val correlationId = new AtomicInteger(0)
 
   private class LeaderFinderThread(name: String) extends ShutdownableThread(name) {
     // thread responsible for adding the fetcher to the right broker when leader is available
@@ -61,22 +63,22 @@ class ConsumerFetcherManager(private val consumerIdString: String,
           val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet,
                                                               brokers,
                                                               config.clientId,
-                                                              config.socketTimeoutMs).topicsMetadata
+                                                              config.socketTimeoutMs,
+                                                              correlationId.getAndIncrement).topicsMetadata
+          if(logger.isDebugEnabled) topicsMetadata.foreach(topicMetadata => debug(topicMetadata.toString()))
           val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker]
-          topicsMetadata.foreach(
-            tmd => {
-              val topic = tmd.topic
-              tmd.partitionsMetadata.foreach(
-              pmd => {
-                val topicAndPartition = TopicAndPartition(topic, pmd.partitionId)
-                if(pmd.leader.isDefined && noLeaderPartitionSet.contains(topicAndPartition)) {
-                  val leaderBroker = pmd.leader.get
-                  leaderForPartitionsMap.put(topicAndPartition, leaderBroker)
-                }
-              })
-            })
+          topicsMetadata.foreach { tmd =>
+            val topic = tmd.topic
+            tmd.partitionsMetadata.foreach { pmd =>
+              val topicAndPartition = TopicAndPartition(topic, pmd.partitionId)
+              if(pmd.leader.isDefined && noLeaderPartitionSet.contains(topicAndPartition)) {
+                val leaderBroker = pmd.leader.get
+                leaderForPartitionsMap.put(topicAndPartition, leaderBroker)
+              }
+            }
+          }
 
-          leaderForPartitionsMap.foreach{
+          leaderForPartitionsMap.foreach {
             case(topicAndPartition, leaderBroker) =>
               val pti = partitionMap(topicAndPartition)
               try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index 5f9c902..1270e92 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -20,9 +20,8 @@ package kafka.consumer
 import kafka.cluster.Broker
 import kafka.server.AbstractFetcherThread
 import kafka.message.ByteBufferMessageSet
-import kafka.api.{PartitionOffsetRequestInfo, Request, OffsetRequest, FetchResponsePartitionData}
+import kafka.api.{Request, OffsetRequest, FetchResponsePartitionData}
 import kafka.common.TopicAndPartition
-import kafka.common.ErrorMapping
 
 
 class ConsumerFetcherThread(name: String,

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index 1fbdfc3..bdeee91 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -20,11 +20,7 @@ package kafka.consumer
 import kafka.api._
 import kafka.network._
 import kafka.utils._
-import kafka.utils.ZkUtils._
-import collection.immutable
-import kafka.common.{ErrorMapping, TopicAndPartition, KafkaException}
-import org.I0Itec.zkclient.ZkClient
-import kafka.cluster.Broker
+import kafka.common.{ErrorMapping, TopicAndPartition}
 
 /**
  * A consumer of kafka messages

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 398618f..e66680b 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -303,6 +303,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   class ZKRebalancerListener(val group: String, val consumerIdString: String,
                              val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]])
     extends IZkChildListener {
+    private val correlationId = new AtomicInteger(0)
     private var isWatcherTriggered = false
     private val lock = new ReentrantLock
     private val cond = lock.newCondition()
@@ -403,9 +404,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet,
                                                           brokers,
                                                           config.clientId,
-                                                          config.socketTimeoutMs).topicsMetadata
+                                                          config.socketTimeoutMs,
+                                                          correlationId.getAndIncrement).topicsMetadata
       val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
-      topicsMetadata.foreach(m =>{
+      topicsMetadata.foreach(m => {
         val topic = m.topic
         val partitions = m.partitionsMetadata.map(m1 => m1.partitionId)
         partitionsPerTopicMap.put(topic, partitions)

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 3164f78..7e8ae29 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -25,6 +25,7 @@ import kafka.server.KafkaConfig
 import collection.mutable
 import kafka.api._
 import org.apache.log4j.Logger
+import kafka.common.TopicAndPartition
 
 class ControllerChannelManager (private val controllerContext: ControllerContext, config: KafkaConfig) extends Logging {
   private val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
@@ -75,6 +76,7 @@ class ControllerChannelManager (private val controllerContext: ControllerContext
 
   private def addNewBroker(broker: Broker) {
     val messageQueue = new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize)
+    debug("Controller %d trying to connect to broker %d".format(config.brokerId,broker.id))
     val channel = new BlockingChannel(broker.host, broker.port,
       BlockingChannel.UseDefaultBufferSize,
       BlockingChannel.UseDefaultBufferSize,
@@ -144,12 +146,13 @@ class RequestSendThread(val controllerId: Int,
   }
 }
 
-class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (RequestOrResponse) => Unit) => Unit,
+class ControllerBrokerRequestBatch(controllerContext: ControllerContext, sendRequest: (Int, RequestOrResponse, (RequestOrResponse) => Unit) => Unit,
                                    controllerId: Int, clientId: String)
   extends  Logging {
   val leaderAndIsrRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), PartitionStateInfo]]
   val stopReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]]
   val stopAndDeleteReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]]
+  val updateMetadataRequestMap = new mutable.HashMap[Int, mutable.HashMap[TopicAndPartition, PartitionStateInfo]]
   private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
 
   def newBatch() {
@@ -159,15 +162,19 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
         "a new one. Some state changes %s might be lost ".format(leaderAndIsrRequestMap.toString()))
     leaderAndIsrRequestMap.clear()
     stopReplicaRequestMap.clear()
+    updateMetadataRequestMap.clear()
     stopAndDeleteReplicaRequestMap.clear()
   }
 
   def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
-                                       leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, replicationFactor: Int) {
+                                       leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
+                                       replicas: Seq[Int]) {
     brokerIds.foreach { brokerId =>
       leaderAndIsrRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String, Int), PartitionStateInfo])
-      leaderAndIsrRequestMap(brokerId).put((topic, partition), PartitionStateInfo(leaderIsrAndControllerEpoch, replicationFactor))
+      leaderAndIsrRequestMap(brokerId).put((topic, partition),
+        PartitionStateInfo(leaderIsrAndControllerEpoch, replicas.toSet))
     }
+    addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(TopicAndPartition(topic, partition)))
   }
 
   def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, deletePartition: Boolean) {
@@ -185,6 +192,30 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
     }
   }
 
+  def addUpdateMetadataRequestForBrokers(brokerIds: Seq[Int],
+                                         partitions:scala.collection.Set[TopicAndPartition] = Set.empty[TopicAndPartition]) {
+    val partitionList =
+    if(partitions.isEmpty) {
+      controllerContext.partitionLeadershipInfo.keySet
+    } else {
+      partitions
+    }
+    partitionList.foreach { partition =>
+      val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition)
+      leaderIsrAndControllerEpochOpt match {
+        case Some(leaderIsrAndControllerEpoch) =>
+          val replicas = controllerContext.partitionReplicaAssignment(partition).toSet
+          val partitionStateInfo = PartitionStateInfo(leaderIsrAndControllerEpoch, replicas)
+          brokerIds.foreach { brokerId =>
+            updateMetadataRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[TopicAndPartition, PartitionStateInfo])
+            updateMetadataRequestMap(brokerId).put(partition, partitionStateInfo)
+          }
+        case None =>
+          info("Leader not assigned yet for partition %s. Skip sending udpate metadata request".format(partition))
+      }
+    }
+  }
+
   def sendRequestsToBrokers(controllerEpoch: Int, correlationId: Int, liveBrokers: Set[Broker]) {
     leaderAndIsrRequestMap.foreach { m =>
       val broker = m._1
@@ -202,6 +233,16 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
       sendRequest(broker, leaderAndIsrRequest, null)
     }
     leaderAndIsrRequestMap.clear()
+    updateMetadataRequestMap.foreach { m =>
+      val broker = m._1
+      val partitionStateInfos = m._2.toMap
+      val updateMetadataRequest = new UpdateMetadataRequest(controllerId, controllerEpoch, correlationId, clientId,
+                                                            partitionStateInfos, controllerContext.liveOrShuttingDownBrokers)
+      partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request with " +
+        "correlationId %d to broker %d for partition %s").format(controllerId, controllerEpoch, correlationId, broker, p._1)))
+      sendRequest(broker, updateMetadataRequest, null)
+    }
+    updateMetadataRequestMap.clear()
     Seq((stopReplicaRequestMap, false), (stopAndDeleteReplicaRequestMap, true)) foreach {
       case(m, deletePartitions) => {
         m foreach {

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 02510bd..f334685 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -32,9 +32,9 @@ import kafka.utils.{Utils, ZkUtils, Logging}
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient}
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
+import java.util.concurrent.atomic.AtomicInteger
 import scala.Some
 import kafka.common.TopicAndPartition
-import java.util.concurrent.atomic.AtomicInteger
 
 class ControllerContext(val zkClient: ZkClient,
                         var controllerChannelManager: ControllerChannelManager = null,
@@ -65,7 +65,8 @@ class ControllerContext(val zkClient: ZkClient,
   def liveBrokers = liveBrokersUnderlying.filter(broker => !shuttingDownBrokerIds.contains(broker.id))
   def liveBrokerIds = liveBrokerIdsUnderlying.filter(brokerId => !shuttingDownBrokerIds.contains(brokerId))
 
-  def liveOrShuttingDownBrokerIds = liveBrokerIdsUnderlying ++ shuttingDownBrokerIds
+  def liveOrShuttingDownBrokerIds = liveBrokerIdsUnderlying
+  def liveOrShuttingDownBrokers = liveBrokersUnderlying
 }
 
 trait KafkaControllerMBean {
@@ -91,7 +92,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
   private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
   private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
   private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
-  private val brokerRequestBatch = new ControllerBrokerRequestBatch(sendRequest, this.config.brokerId, this.clientId)
+  private val brokerRequestBatch = new ControllerBrokerRequestBatch(controllerContext, sendRequest, this.config.brokerId, this.clientId)
   registerControllerChangedListener()
 
   newGauge(
@@ -159,16 +160,17 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
       }
 
       val partitionsToMove = replicatedPartitionsBrokerLeads().toSet
+
       debug("Partitions to move leadership from broker %d: %s".format(id, partitionsToMove.mkString(",")))
 
-      partitionsToMove.foreach{ topicAndPartition =>
+      partitionsToMove.foreach { topicAndPartition =>
         val (topic, partition) = topicAndPartition.asTuple
         // move leadership serially to relinquish lock.
         controllerContext.controllerLock synchronized {
           controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch =>
             if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) {
               partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition,
-                                                       controlledShutdownPartitionLeaderSelector)
+                controlledShutdownPartitionLeaderSelector)
               val newLeaderIsrAndControllerEpoch = controllerContext.partitionLeadershipInfo(topicAndPartition)
 
               // mark replica offline only if leadership was moved successfully
@@ -200,7 +202,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
                 case Some(updatedLeaderIsrAndControllerEpoch) =>
                   brokerRequestBatch.addLeaderAndIsrRequestForBrokers(
                     Seq(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader), topic, partition,
-                    updatedLeaderIsrAndControllerEpoch, replicationFactor)
+                    updatedLeaderIsrAndControllerEpoch, controllerContext.partitionReplicaAssignment(topicAndPartition))
                 case None =>
                 // ignore
               }
@@ -244,6 +246,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
       info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
       initializeAndMaybeTriggerPartitionReassignment()
       initializeAndMaybeTriggerPreferredReplicaElection()
+      /* send partition leadership info to all live brokers */
+      sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
     }
     else
       info("Controller has been shut down, aborting startup/failover")
@@ -275,13 +279,16 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     info("New broker startup callback for %s".format(newBrokers.mkString(",")))
 
     val newBrokersSet = newBrokers.toSet
+    // send update metadata request for all partitions to the newly restarted brokers. In cases of controlled shutdown
+    // leaders will not be elected when a new broker comes up. So at least in the common controlled shutdown case, the
+    // metadata will reach the new brokers faster
+    sendUpdateMetadataRequest(newBrokers)
     // the very first thing to do when a new broker comes up is send it the entire list of partitions that it is
     // supposed to host. Based on that the broker starts the high watermark threads for the input list of partitions
     replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, newBrokers), OnlineReplica)
     // when a new broker comes up, the controller needs to trigger leader election for all new and offline partitions
     // to see if these brokers can become leaders for some/all of those
     partitionStateMachine.triggerOnlinePartitionStateChange()
-
     // check if reassignment of some partitions need to be restarted
     val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter{
       case (topicAndPartition, reassignmentContext) =>
@@ -379,6 +386,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
         removePartitionFromReassignedPartitions(topicAndPartition)
         info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition))
         controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
+        // after electing leader, the replicas and isr information changes, so resend the update metadata request
+        sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition))
       case false =>
         info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
           "reassigned not yet caught up with the leader")
@@ -663,6 +672,18 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
   }
 
   /**
+   * Send the leader information for selected partitions to selected brokers so that they can correctly respond to
+   * metadata requests
+   * @param brokers The brokers that the update metadata request should be sent to
+   * @param partitions The partitions for which the metadata is to be sent
+   */
+  private def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicAndPartition] = Set.empty[TopicAndPartition]) {
+    brokerRequestBatch.newBatch()
+    brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions)
+    brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
+  }
+
+  /**
    * Removes a given partition replica from the ISR; if it is not the current
    * leader and there are sufficient remaining replicas in ISR.
    * @param topic topic
@@ -673,7 +694,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
    */
   def removeReplicaFromIsr(topic: String, partition: Int, replicaId: Int): Option[LeaderIsrAndControllerEpoch] = {
     val topicAndPartition = TopicAndPartition(topic, partition)
-    debug("Removing replica %d from ISR of %s.".format(replicaId, topicAndPartition))
+    debug("Removing replica %d from ISR %s for partition %s.".format(replicaId,
+      controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.isr.mkString(","), topicAndPartition))
     var finalLeaderIsrAndControllerEpoch: Option[LeaderIsrAndControllerEpoch] = None
     var zkWriteCompleteOrUnnecessary = false
     while (!zkWriteCompleteOrUnnecessary) {
@@ -701,6 +723,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
             newLeaderAndIsr.zkVersion = newVersion
 
             finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(newLeaderAndIsr, epoch))
+            controllerContext.partitionLeadershipInfo.put(topicAndPartition, finalLeaderIsrAndControllerEpoch.get)
             if (updateSucceeded)
               info("New leader and ISR for partition %s is %s".format(topicAndPartition, newLeaderAndIsr.toString()))
             updateSucceeded
@@ -708,6 +731,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
             warn("Cannot remove replica %d from ISR of %s. Leader = %d ; ISR = %s"
                  .format(replicaId, topicAndPartition, leaderAndIsr.leader, leaderAndIsr.isr))
             finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(leaderAndIsr, epoch))
+            controllerContext.partitionLeadershipInfo.put(topicAndPartition, finalLeaderIsrAndControllerEpoch.get)
             true
           }
         case None =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index c017727..e3af0c3 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -43,7 +43,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   private val controllerId = controller.config.brokerId
   private val zkClient = controllerContext.zkClient
   var partitionState: mutable.Map[TopicAndPartition, PartitionState] = mutable.Map.empty
-  val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest, controllerId, controller.clientId)
+  val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.controllerContext, controller.sendRequest,
+    controllerId, controller.clientId)
   private val hasStarted = new AtomicBoolean(false)
   private val hasShutdown = new AtomicBoolean(false)
   private val noOpPartitionLeaderSelector = new NoOpLeaderSelector(controllerContext)
@@ -233,7 +234,6 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
    * @param topicAndPartition   The topic/partition whose leader and isr path is to be initialized
    */
   private def initializeLeaderAndIsrForPartition(topicAndPartition: TopicAndPartition) {
-    debug("Initializing leader and isr for partition %s".format(topicAndPartition))
     val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
     val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.liveBrokerIds.contains(r))
     liveAssignedReplicas.size match {
@@ -249,6 +249,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
         val leader = liveAssignedReplicas.head
         val leaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader, liveAssignedReplicas.toList),
           controller.epoch)
+        debug("Initializing leader and isr for partition %s to %s".format(topicAndPartition, leaderIsrAndControllerEpoch))
         try {
           ZkUtils.createPersistentPath(controllerContext.zkClient,
             ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
@@ -256,9 +257,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
           // NOTE: the above write can fail only if the current controller lost its zk session and the new controller
           // took over and initialized this partition. This can happen if the current controller went into a long
           // GC pause
-          brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topicAndPartition.topic,
-            topicAndPartition.partition, leaderIsrAndControllerEpoch, replicaAssignment.size)
           controllerContext.partitionLeadershipInfo.put(topicAndPartition, leaderIsrAndControllerEpoch)
+          brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topicAndPartition.topic,
+            topicAndPartition.partition, leaderIsrAndControllerEpoch, replicaAssignment)
         } catch {
           case e: ZkNodeExistsException =>
             // read the controller epoch
@@ -316,9 +317,10 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
       controllerContext.partitionLeadershipInfo.put(TopicAndPartition(topic, partition), newLeaderIsrAndControllerEpoch)
       stateChangeLogger.trace("Controller %d epoch %d elected leader %d for Offline partition %s"
                                 .format(controllerId, controller.epoch, newLeaderAndIsr.leader, topicAndPartition))
+      val replicas = controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition))
       // store new leader and isr info in cache
       brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition,
-        newLeaderIsrAndControllerEpoch, controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition)).size)
+        newLeaderIsrAndControllerEpoch, replicas)
     } catch {
       case lenne: LeaderElectionNotNeededException => // swallow
       case nroe: NoReplicaOnlineException => throw nroe

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index bea1644..e237805 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -42,7 +42,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
   private val controllerId = controller.config.brokerId
   private val zkClient = controllerContext.zkClient
   var replicaState: mutable.Map[(String, Int, Int), ReplicaState] = mutable.Map.empty
-  val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest, controllerId, controller.clientId)
+  val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.controllerContext, controller.sendRequest,
+    controllerId, controller.clientId)
   private val hasStarted = new AtomicBoolean(false)
   private val hasShutdown = new AtomicBoolean(false)
   this.logIdent = "[Replica state machine on controller " + controller.config.brokerId + "]: "
@@ -121,7 +122,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
                 throw new StateChangeFailedException("Replica %d for partition %s cannot be moved to NewReplica"
                   .format(replicaId, topicAndPartition) + "state as it is being requested to become leader")
               brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
-                                                                  topic, partition, leaderIsrAndControllerEpoch, replicaAssignment.size)
+                                                                  topic, partition, leaderIsrAndControllerEpoch,
+                                                                  replicaAssignment)
             case None => // new leader request will be sent to this replica when one gets elected
           }
           replicaState.put((topic, partition, replicaId), NewReplica)
@@ -152,7 +154,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
               controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
                 case Some(leaderIsrAndControllerEpoch) =>
                   brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch,
-                    replicaAssignment.size)
+                    replicaAssignment)
                   replicaState.put((topic, partition, replicaId), OnlineReplica)
                   stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OnlineReplica"
                     .format(controllerId, controller.epoch, replicaId, topicAndPartition))
@@ -173,8 +175,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
                     case Some(updatedLeaderIsrAndControllerEpoch) =>
                       // send the shrunk ISR state change request only to the leader
                       brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader),
-                        topic, partition, updatedLeaderIsrAndControllerEpoch,
-                        replicaAssignment.size)
+                        topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)
                       replicaState.put((topic, partition, replicaId), OfflineReplica)
                       stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OfflineReplica"
                                                 .format(controllerId, controller.epoch, replicaId, topicAndPartition))
@@ -245,7 +246,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
             try {
               val curBrokerIds = currentBrokerList.map(_.toInt).toSet
               val newBrokerIds = curBrokerIds -- controllerContext.liveOrShuttingDownBrokerIds
-              val newBrokers = newBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
+              val newBrokerInfo = newBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _))
+              val newBrokers = newBrokerInfo.filter(_.isDefined).map(_.get)
               val deadBrokerIds = controllerContext.liveOrShuttingDownBrokerIds -- curBrokerIds
               controllerContext.liveBrokers = curBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
               info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s"

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 7b8d1f0..1437496 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -49,7 +49,7 @@ object RequestChannel extends Logging {
     val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer)
     buffer = null
     private val requestLogger = Logger.getLogger("kafka.request.logger")
-    trace("Received request : %s".format(requestObj))
+    trace("Processor %d received request : %s".format(processor, requestObj))
 
     def updateRequestMetrics() {
       val endTimeMs = SystemTime.milliseconds

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 5a44c28..d5bd143 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -61,20 +61,19 @@ class SocketServer(val brokerId: Int,
     this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize)
     Utils.newThread("kafka-acceptor", acceptor, false).start()
     acceptor.awaitStartup
-    info("started")
+    info("Started")
   }
 
   /**
    * Shutdown the socket server
    */
   def shutdown() = {
-    info("shutting down")
+    info("Shutting down")
     if(acceptor != null)
       acceptor.shutdown()
     for(processor <- processors)
       processor.shutdown()
-    requestChannel.shutdown
-    info("shut down completely")
+    info("Shutdown completed")
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
index 82e6e4d..13a8aa6 100644
--- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
+++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
@@ -54,6 +54,13 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
           }
       }
     val partitionMetadata = metadata.partitionsMetadata
+    if(partitionMetadata.size == 0) {
+      if(metadata.errorCode != ErrorMapping.NoError) {
+        throw new KafkaException(ErrorMapping.exceptionFor(metadata.errorCode))
+      } else {
+        throw new KafkaException("Topic metadata %s has empty partition metadata and no error code".format(metadata))
+      }
+    }
     partitionMetadata.map { m =>
       m.leader match {
         case Some(leader) =>
@@ -77,7 +84,7 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
     // throw partition specific exception
     topicsMetadata.foreach(tmd =>{
       trace("Metadata for topic %s is %s".format(tmd.topic, tmd))
-      if(tmd.errorCode == ErrorMapping.NoError){
+      if(tmd.errorCode == ErrorMapping.NoError) {
         topicPartitionInfo.put(tmd.topic, tmd)
       } else
         warn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, ErrorMapping.exceptionFor(tmd.errorCode).getClass))

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 89cb27d..1a74951 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -52,7 +52,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
   def handle(events: Seq[KeyedMessage[K,V]]) {
     lock synchronized {
       val serializedData = serialize(events)
-      serializedData.foreach{
+      serializedData.foreach {
         keyed =>
           val dataSize = keyed.message.payloadSize
           producerTopicStats.getProducerTopicStats(keyed.topic).byteRate.mark(dataSize)
@@ -61,6 +61,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
       var outstandingProduceRequests = serializedData
       var remainingRetries = config.messageSendMaxRetries + 1
       val correlationIdStart = correlationId.get()
+      debug("Handling %d events".format(events.size))
       while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {
         topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)
         if (topicMetadataRefreshInterval >= 0 &&
@@ -70,7 +71,8 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
           lastTopicMetadataRefreshTime = SystemTime.milliseconds
         }
         outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)
-        if (outstandingProduceRequests.size > 0)  {
+        if (outstandingProduceRequests.size > 0) {
+          info("Back off for %d ms before retrying send. Remaining retries = %d".format(config.retryBackoffMs, remainingRetries-1))
           // back off and update the topic metadata cache before attempting another send operation
           Thread.sleep(config.retryBackoffMs)
           // get topics of the outstanding produce requests and refresh metadata for those
@@ -177,7 +179,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
     }catch {    // Swallow recoverable exceptions and return None so that they can be retried.
       case ute: UnknownTopicException => warn("Failed to collate messages by topic,partition due to", ute); None
       case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to", lnae); None
-      case oe => error("Failed to collate messages by topic, partition due to", oe); throw oe
+      case oe => error("Failed to collate messages by topic, partition due to", oe); None
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index f5288bf..7642179 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -17,11 +17,10 @@
 
 package kafka.server
 
-import kafka.admin.{CreateTopicCommand, AdminUtils}
+import kafka.admin.CreateTopicCommand
 import kafka.api._
 import kafka.message._
 import kafka.network._
-import org.apache.log4j.Logger
 import scala.collection._
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic._
@@ -30,6 +29,7 @@ import org.I0Itec.zkclient.ZkClient
 import kafka.common._
 import kafka.utils.{ZkUtils, Pool, SystemTime, Logging}
 import kafka.network.RequestChannel.Response
+import kafka.cluster.Broker
 
 
 /**
@@ -45,7 +45,13 @@ class KafkaApis(val requestChannel: RequestChannel,
   private val fetchRequestPurgatory =
     new FetchRequestPurgatory(requestChannel, replicaManager.config.fetchPurgatoryPurgeIntervalRequests)
   private val delayedRequestMetrics = new DelayedRequestMetrics
-
+  /* following 3 data structures are updated by the update metadata request
+  * and is queried by the topic metadata request. */
+  var leaderCache: mutable.Map[TopicAndPartition, PartitionStateInfo] =
+    new mutable.HashMap[TopicAndPartition, PartitionStateInfo]()
+//  private var allBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]()
+  private var aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]()
+  private val partitionMetadataLock = new Object
   this.logIdent = "[KafkaApi-%d] ".format(brokerId)
 
   /**
@@ -61,6 +67,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
         case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
         case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
+        case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
         case requestId => throw new KafkaException("No mapping found for handler id " + requestId)
       }
     } catch {
@@ -84,7 +91,6 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-
   def handleStopReplicaRequest(request: RequestChannel.Request) {
     val stopReplicaRequest = request.requestObj.asInstanceOf[StopReplicaRequest]
     val (response, error) = replicaManager.stopReplicas(stopReplicaRequest)
@@ -93,6 +99,29 @@ class KafkaApis(val requestChannel: RequestChannel,
     replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads()
   }
 
+  def handleUpdateMetadataRequest(request: RequestChannel.Request) {
+    val updateMetadataRequest = request.requestObj.asInstanceOf[UpdateMetadataRequest]
+    if(updateMetadataRequest.controllerEpoch < replicaManager.controllerEpoch) {
+      val stateControllerEpochErrorMessage = ("Broker %d received update metadata request with correlation id %d from an " +
+        "old controller %d with epoch %d. Latest known controller epoch is %d").format(brokerId,
+        updateMetadataRequest.correlationId, updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch,
+        replicaManager.controllerEpoch)
+      replicaManager.stateChangeLogger.warn(stateControllerEpochErrorMessage)
+      throw new ControllerMovedException(stateControllerEpochErrorMessage)
+    }
+    partitionMetadataLock synchronized {
+      replicaManager.controllerEpoch = updateMetadataRequest.controllerEpoch
+      // cache the list of alive brokers in the cluster
+      updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b))
+      updateMetadataRequest.partitionStateInfos.foreach { partitionState =>
+        leaderCache.put(partitionState._1, partitionState._2)
+        debug("Caching leader info %s for partition %s".format(partitionState._2, partitionState._1))
+      }
+    }
+    val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId)
+    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(updateMetadataResponse)))
+  }
+
   /**
    * Check if a partitionData from a produce request can unblock any
    * DelayedFetch requests.
@@ -390,46 +419,87 @@ class KafkaApis(val requestChannel: RequestChannel,
     val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest]
     val topicsMetadata = new mutable.ArrayBuffer[TopicMetadata]()
     val config = replicaManager.config
-    val uniqueTopics = {
+    var uniqueTopics = Set.empty[String]
+    uniqueTopics = {
       if(metadataRequest.topics.size > 0)
         metadataRequest.topics.toSet
       else
         ZkUtils.getAllTopics(zkClient).toSet
     }
-    val topicMetadataList = AdminUtils.fetchTopicMetadataFromZk(uniqueTopics, zkClient)
-    topicMetadataList.foreach(
-      topicAndMetadata => {
-        topicAndMetadata.errorCode match {
-          case ErrorMapping.NoError => topicsMetadata += topicAndMetadata
-          case ErrorMapping.UnknownTopicOrPartitionCode =>
-            try {
-              /* check if auto creation of topics is turned on */
-              if (config.autoCreateTopicsEnable) {
-                try {
-                  CreateTopicCommand.createTopic(zkClient, topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor)
-                  info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
-                               .format(topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor))
-                } catch {
-                  case e: TopicExistsException => // let it go, possibly another broker created this topic
-                }
-                val newTopicMetadata = AdminUtils.fetchTopicMetadataFromZk(topicAndMetadata.topic, zkClient)
-                topicsMetadata += newTopicMetadata
-                newTopicMetadata.errorCode match {
-                  case ErrorMapping.NoError =>
-                  case _ => throw new KafkaException("Topic metadata for automatically created topic %s does not exist".format(topicAndMetadata.topic))
-                }
+    val topicMetadataList =
+      partitionMetadataLock synchronized {
+        uniqueTopics.map { topic =>
+          if(leaderCache.keySet.map(_.topic).contains(topic)) {
+            val partitionStateInfo = leaderCache.filter(p => p._1.topic.equals(topic))
+            val sortedPartitions = partitionStateInfo.toList.sortWith((m1,m2) => m1._1.partition < m2._1.partition)
+            val partitionMetadata = sortedPartitions.map { case(topicAndPartition, partitionState) =>
+              val replicas = leaderCache(topicAndPartition).allReplicas
+              var replicaInfo: Seq[Broker] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).toSeq
+              val partitionStateOpt = leaderCache.get(topicAndPartition)
+              var leaderInfo: Option[Broker] = None
+              var isrInfo: Seq[Broker] = Nil
+              partitionStateOpt match {
+                case Some(partitionState) =>
+                  val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch
+                  val leader = leaderIsrAndEpoch.leaderAndIsr.leader
+                  val isr = leaderIsrAndEpoch.leaderAndIsr.isr
+                  debug("%s".format(topicAndPartition) + ";replicas = " + replicas + ", in sync replicas = " + isr + ", leader = " + leader)
+                  try {
+                    if(aliveBrokers.keySet.contains(leader))
+                      leaderInfo = Some(aliveBrokers(leader))
+                    else throw new LeaderNotAvailableException("Leader not available for partition %s".format(topicAndPartition))
+                    isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null)
+                    if(replicaInfo.size < replicas.size)
+                      throw new ReplicaNotAvailableException("Replica information not available for following brokers: " +
+                        replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(","))
+                    if(isrInfo.size < isr.size)
+                      throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " +
+                        isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
+                    new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
+                  } catch {
+                    case e =>
+                      error("Error while fetching metadata for partition %s".format(topicAndPartition), e)
+                      new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo, isrInfo,
+                        ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+                  }
+                case None => // it is possible that for a newly created topic/partition, its replicas are assigned, but a
+                  // leader hasn't been assigned yet
+                  debug("%s".format(topicAndPartition) + ";replicas = " + replicas + ", in sync replicas = None, leader = None")
+                  new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.LeaderNotAvailableCode)
               }
-            } catch {
-              case e => error("Error while retrieving topic metadata", e)
             }
-          case _ => 
-            error("Error while fetching topic metadata for topic " + topicAndMetadata.topic,
-                  ErrorMapping.exceptionFor(topicAndMetadata.errorCode).getCause)
-            topicsMetadata += topicAndMetadata
+            new TopicMetadata(topic, partitionMetadata)
+          } else {
+            // topic doesn't exist, send appropriate error code
+            new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode)
+          }
         }
-      })
-    trace("Sending topic metadata for correlation id %d to client %s".format(metadataRequest.correlationId, metadataRequest.clientId))
-    topicsMetadata.foreach(metadata => trace("Sending topic metadata " + metadata.toString))
+      }
+
+    // handle auto create topics
+    topicMetadataList.foreach { topicMetadata =>
+      topicMetadata.errorCode match {
+        case ErrorMapping.NoError => topicsMetadata += topicMetadata
+        case ErrorMapping.UnknownTopicOrPartitionCode =>
+          if (config.autoCreateTopicsEnable) {
+            try {
+              CreateTopicCommand.createTopic(zkClient, topicMetadata.topic, config.numPartitions, config.defaultReplicationFactor)
+              info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
+                .format(topicMetadata.topic, config.numPartitions, config.defaultReplicationFactor))
+            } catch {
+              case e: TopicExistsException => // let it go, possibly another broker created this topic
+            }
+            topicsMetadata += new TopicMetadata(topicMetadata.topic, topicMetadata.partitionsMetadata, ErrorMapping.LeaderNotAvailableCode)
+          } else {
+            topicsMetadata += topicMetadata
+          }
+        case _ =>
+          debug("Error while fetching topic metadata for topic %s due to %s ".format(topicMetadata.topic,
+            ErrorMapping.exceptionFor(topicMetadata.errorCode).getClass.getName))
+          topicsMetadata += topicMetadata
+      }
+    }
+    trace("Sending topic metadata %s for correlation id %d to client %s".format(topicsMetadata.mkString(","), metadataRequest.correlationId, metadataRequest.clientId))
     val response = new TopicMetadataResponse(topicsMetadata.toSeq, metadataRequest.correlationId)
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cfdc403e/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index 842dcf3..fed0b86 100644
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -33,14 +33,15 @@ class KafkaRequestHandler(id: Int, brokerId: Int, val requestChannel: RequestCha
       try {
         val req = requestChannel.receiveRequest()
         if(req eq RequestChannel.AllDone) {
-          trace("receives shut down command, shut down".format(brokerId, id))
+          debug("Kafka request handler %d on broker %d received shut down command".format(
+            id, brokerId))
           return
         }
         req.dequeueTimeMs = SystemTime.milliseconds
-        debug("handles request " + req)
+        trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
         apis.handle(req)
       } catch {
-        case e: Throwable => error("exception when handling request", e)
+        case e: Throwable => error("Exception when handling request")
       }
     }
   }
@@ -55,12 +56,12 @@ class KafkaRequestHandlerPool(val brokerId: Int,
   this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
   val threads = new Array[Thread](numThreads)
   val runnables = new Array[KafkaRequestHandler](numThreads)
-  for(i <- 0 until numThreads) { 
+  for(i <- 0 until numThreads) {
     runnables(i) = new KafkaRequestHandler(i, brokerId, requestChannel, apis)
     threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
     threads(i).start()
   }
-  
+
   def shutdown() {
     info("shutting down")
     for(handler <- runnables)


Mime
View raw message