kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1368092 [1/4] - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/admin/ main/scala/kafka/api/ main/scala/kafka/cluster/ main/scala/kafka/common/ main/scala/kafka/consumer/ main/scala/kafka/log/ main/scala/kafka/network/ main/sca...
Date Wed, 01 Aug 2012 16:14:01 GMT
Author: junrao
Date: Wed Aug  1 16:13:59 2012
New Revision: 1368092

URL: http://svn.apache.org/viewvc?rev=1368092&view=rev
Log:
recommit: revisit the become leader and become follower state change operations using V3 design; patched by Yang Ye; reviewed by Neha Narkhede and Jun Rao; kafka-343

Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/BrokerNotExistException.scala
Removed:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommand.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommandHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkQueue.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerToBrokerRequestTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/StateChangeTest.scala
Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala Wed Aug  1 16:13:59 2012
@@ -91,10 +91,10 @@ object AdminUtils extends Logging {
     topics.map { topic =>
       if (ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) {
         val topicPartitionAssignment = ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic).iterator).get(topic).get
-        val sortedPartitions = topicPartitionAssignment.toList.sortWith( (m1,m2) => m1._1.toInt < m2._1.toInt )
+        val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
 
         val partitionMetadata = sortedPartitions.map { partitionMap =>
-          val partition = partitionMap._1.toInt
+          val partition = partitionMap._1
           val replicas = partitionMap._2
           val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partition)
           val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala Wed Aug  1 16:13:59 2012
@@ -25,39 +25,54 @@ import collection.mutable.HashMap
 
 
 object LeaderAndISR {
+  val initialLeaderEpoch: Int = 0
+  val initialZKVersion: Int = 0
   def readFrom(buffer: ByteBuffer): LeaderAndISR = {
     val leader = buffer.getInt
     val leaderGenId = buffer.getInt
     val ISRString = Utils.readShortString(buffer, "UTF-8")
     val ISR = ISRString.split(",").map(_.toInt).toList
-    val zkVersion = buffer.getLong
+    val zkVersion = buffer.getInt
     new LeaderAndISR(leader, leaderGenId, ISR, zkVersion)
   }
 }
 
-case class LeaderAndISR(leader: Int, leaderEpoc: Int, ISR: List[Int], zkVersion: Long){
+case class LeaderAndISR(var leader: Int, var leaderEpoch: Int, var ISR: List[Int], var zkVersion: Int){
+  def this(leader: Int, ISR: List[Int]) = this(leader, LeaderAndISR.initialLeaderEpoch, ISR, LeaderAndISR.initialZKVersion)
+
   def writeTo(buffer: ByteBuffer) {
     buffer.putInt(leader)
-    buffer.putInt(leaderEpoc)
+    buffer.putInt(leaderEpoch)
     Utils.writeShortString(buffer, ISR.mkString(","), "UTF-8")
-    buffer.putLong(zkVersion)
+    buffer.putInt(zkVersion)
   }
 
   def sizeInBytes(): Int = {
-    val size = 4 + 4 + (2 + ISR.mkString(",").length) + 8
+    val size = 4 + 4 + (2 + ISR.mkString(",").length) + 4
     size
   }
+
+  override def toString(): String = {
+    val jsonDataMap = new HashMap[String, String]
+    jsonDataMap.put("leader", leader.toString)
+    jsonDataMap.put("leaderEpoch", leaderEpoch.toString)
+    jsonDataMap.put("ISR", ISR.mkString(","))
+    Utils.stringMapToJsonString(jsonDataMap)
+  }
 }
 
 
 object LeaderAndISRRequest {
   val CurrentVersion = 1.shortValue()
   val DefaultClientId = ""
+  val IsInit: Boolean = true
+  val NotInit: Boolean = false
+  val DefaultAckTimeout: Int = 1000
 
   def readFrom(buffer: ByteBuffer): LeaderAndISRRequest = {
     val versionId = buffer.getShort
     val clientId = Utils.readShortString(buffer)
-    val isInit = buffer.get()
+    val isInit = if(buffer.get() == 1.toByte) true else false
     val ackTimeoutMs = buffer.getInt
     val leaderAndISRRequestCount = buffer.getInt
     val leaderAndISRInfos = new HashMap[(String, Int), LeaderAndISR]
@@ -76,19 +91,18 @@ object LeaderAndISRRequest {
 
 case class LeaderAndISRRequest (versionId: Short,
                                 clientId: String,
-                                isInit: Byte,
+                                isInit: Boolean,
                                 ackTimeoutMs: Int,
-                                leaderAndISRInfos:
-                                Map[(String, Int), LeaderAndISR])
+                                leaderAndISRInfos: Map[(String, Int), LeaderAndISR])
         extends RequestOrResponse(Some(RequestKeys.LeaderAndISRRequest)) {
-  def this(isInit: Byte, ackTimeoutMs: Int, leaderAndISRInfos: Map[(String, Int), LeaderAndISR]) = {
-    this(LeaderAndISRRequest.CurrentVersion, LeaderAndISRRequest.DefaultClientId, isInit, ackTimeoutMs, leaderAndISRInfos)
+  def this(isInit: Boolean, leaderAndISRInfos: Map[(String, Int), LeaderAndISR]) = {
+    this(LeaderAndISRRequest.CurrentVersion, LeaderAndISRRequest.DefaultClientId, isInit, LeaderAndISRRequest.DefaultAckTimeout, leaderAndISRInfos)
   }
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
     Utils.writeShortString(buffer, clientId)
-    buffer.put(isInit)
+    buffer.put(if(isInit) 1.toByte else 0.toByte)
     buffer.putInt(ackTimeoutMs)
     buffer.putInt(leaderAndISRInfos.size)
     for((key, value) <- leaderAndISRInfos){

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala Wed Aug  1 16:13:59 2012
@@ -26,6 +26,7 @@ import collection.mutable.Set
 object StopReplicaRequest {
   val CurrentVersion = 1.shortValue()
   val DefaultClientId = ""
+  val DefaultAckTimeout = 100
 
   def readFrom(buffer: ByteBuffer): StopReplicaRequest = {
     val versionId = buffer.getShort
@@ -43,10 +44,10 @@ object StopReplicaRequest {
 case class StopReplicaRequest(versionId: Short,
                               clientId: String,
                               ackTimeoutMs: Int,
-                              stopReplicaSet: Set[(String, Int)]
-                                     ) extends RequestOrResponse(Some(RequestKeys.StopReplicaRequest)) {
-  def this(ackTimeoutMs: Int, stopReplicaSet: Set[(String, Int)]) = {
-    this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, ackTimeoutMs, stopReplicaSet)
+                              stopReplicaSet: Set[(String, Int)])
+        extends RequestOrResponse(Some(RequestKeys.StopReplicaRequest)) {
+  def this(stopReplicaSet: Set[(String, Int)]) = {
+    this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout, stopReplicaSet)
   }
 
   def writeTo(buffer: ByteBuffer) {
@@ -67,4 +68,4 @@ case class StopReplicaRequest(versionId:
     }
     size
   }
-}
\ No newline at end of file
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala Wed Aug  1 16:13:59 2012
@@ -19,7 +19,7 @@ package kafka.cluster
 
 import kafka.utils.Utils._
 import java.nio.ByteBuffer
-import kafka.common.KafkaException
+import kafka.common.BrokerNotExistException
 
 /**
  * A Kafka broker
@@ -28,7 +28,7 @@ private[kafka] object Broker {
 
   def createBroker(id: Int, brokerInfoString: String): Broker = {
     if(brokerInfoString == null)
-      throw new KafkaException("Broker id %s does not exist".format(id))
+      throw new BrokerNotExistException("Broker id %s does not exist".format(id))
     val brokerInfo = brokerInfoString.split(":")
     new Broker(id, brokerInfo(0), brokerInfo(1), brokerInfo(2).toInt)
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala Wed Aug  1 16:13:59 2012
@@ -16,11 +16,12 @@
  */
 package kafka.cluster
 
-import kafka.utils.{SystemTime, Time, Logging}
-import org.I0Itec.zkclient.ZkClient
-import kafka.utils.ZkUtils._
 import java.util.concurrent.locks.ReentrantLock
+import scala.collection._
+import kafka.utils.{ZkUtils, SystemTime, Time}
 import kafka.common.{KafkaException, LeaderNotAvailableException}
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils.Logging
 
 /**
  * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR
@@ -119,14 +120,8 @@ class Partition(val topic: String,
   }
 
   def updateISR(newISR: Set[Int], zkClientOpt: Option[ZkClient] = None) {
-    try {
+    try{
       leaderISRUpdateLock.lock()
-      zkClientOpt match {
-        case Some(zkClient) =>
-          // update ISR in ZK
-          updateISRInZk(newISR, zkClient)
-        case None =>
-      }
       // update partition's ISR in cache
       inSyncReplicas = newISR.map {r =>
         getReplica(r) match {
@@ -135,29 +130,22 @@ class Partition(val topic: String,
         }
       }
       info("Updated ISR for topic %s partition %d to %s in cache".format(topic, partitionId, newISR.mkString(",")))
-    }catch {
-      case e => throw new KafkaException("Failed to update ISR for topic %s ".format(topic) +
-        "partition %d to %s".format(partitionId, newISR.mkString(",")), e)
-    }finally {
+      if(zkClientOpt.isDefined){
+        val zkClient = zkClientOpt.get
+        val curLeaderAndISR = ZkUtils.getLeaderAndISRForPartition(zkClient, topic, partitionId)
+        curLeaderAndISR match {
+          case None =>
+            throw new IllegalStateException("The leaderAndISR info for partition [%s, %s] is not in Zookeeper".format(topic, partitionId))
+          case Some(m) =>
+            m.ISR = newISR.toList
+            ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topic, partitionId), m.toString)
+        }
+      }
+    } finally {
       leaderISRUpdateLock.unlock()
     }
   }
 
-  private def updateISRInZk(newISR: Set[Int], zkClient: ZkClient) = {
-    val replicaListAndEpochString = readDataMaybeNull(zkClient, getTopicPartitionInSyncPath(topic, partitionId.toString))
-    if(replicaListAndEpochString == null) {
-      throw new LeaderNotAvailableException(("Illegal partition state. ISR cannot be updated for topic " +
-        "%s partition %d since leader and ISR does not exist in ZK".format(topic, partitionId)))
-    }
-    else {
-      val replicasAndEpochInfo = replicaListAndEpochString.split(";")
-      val epoch = replicasAndEpochInfo.last
-      updatePersistentPath(zkClient, getTopicPartitionInSyncPath(topic, partitionId.toString),
-        "%s;%s".format(newISR.mkString(","), epoch))
-      info("Updated ISR for topic %s partition %d to %s in ZK".format(topic, partitionId, newISR.mkString(",")))
-    }
-  }
-
   override def equals(that: Any): Boolean = {
     if(!(that.isInstanceOf[Partition]))
       return false
@@ -180,4 +168,4 @@ class Partition(val topic: String,
     partitionString.append("; In Sync replicas: " + inSyncReplicas.map(_.brokerId).mkString(","))
     partitionString.toString()
   }
-}
+}
\ No newline at end of file

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/BrokerNotExistException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/BrokerNotExistException.scala?rev=1368092&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/BrokerNotExistException.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/BrokerNotExistException.scala Wed Aug  1 16:13:59 2012
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+class BrokerNotExistException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala Wed Aug  1 16:13:59 2012
@@ -39,7 +39,8 @@ object ErrorMapping {
   val NotLeaderForPartitionCode : Short = 7
   val UnknownTopicCode : Short = 8
   val RequestTimedOutCode: Short = 9
-  val ReplicaNotAvailableCode: Short = 10
+  val BrokerNotExistInZookeeperCode: Short = 10
+  val ReplicaNotAvailableCode: Short = 11
 
   private val exceptionToCode = 
     Map[Class[Throwable], Short](
@@ -51,8 +52,9 @@ object ErrorMapping {
       classOf[NotLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NotLeaderForPartitionCode,
       classOf[LeaderNotAvailableException].asInstanceOf[Class[Throwable]] -> LeaderNotAvailableCode,
       classOf[RequestTimedOutException].asInstanceOf[Class[Throwable]] -> RequestTimedOutCode,
-      classOf[ReplicaNotAvailableException].asInstanceOf[Class[Throwable]] -> ReplicaNotAvailableCode,
-      classOf[UnknownTopicException].asInstanceOf[Class[Throwable]] -> UnknownTopicCode
+      classOf[UnknownTopicException].asInstanceOf[Class[Throwable]] -> UnknownTopicCode,
+      classOf[BrokerNotExistException].asInstanceOf[Class[Throwable]] -> BrokerNotExistInZookeeperCode,
+      classOf[ReplicaNotAvailableException].asInstanceOf[Class[Throwable]] -> ReplicaNotAvailableCode
     ).withDefaultValue(UnknownCode)
   
   /* invert the mapping */

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala Wed Aug  1 16:13:59 2012
@@ -88,8 +88,7 @@ class ConsumerFetcherManager(private val
 
 
   override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
-    new ConsumerFetcherThread("ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id),
-                              config, sourceBroker, this)
+    new ConsumerFetcherThread("ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id), config, sourceBroker, this)
   }
 
   def startConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster) {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/TopicCount.scala Wed Aug  1 16:13:59 2012
@@ -66,7 +66,7 @@ private[kafka] object TopicCount extends
                           consumerId: String,
                           zkClient: ZkClient) : TopicCount = {
     val dirs = new ZKGroupDirs(group)
-    val topicCountString = ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)
+    val topicCountString = ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)._1
     val hasWhitelist = topicCountString.startsWith(WHITELIST_MARKER)
     val hasBlacklist = topicCountString.startsWith(BLACKLIST_MARKER)
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Wed Aug  1 16:13:59 2012
@@ -296,7 +296,7 @@ private[kafka] class ZookeeperConsumerCo
     try {
       val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
       val znode = topicDirs.consumerOffsetDir + "/" + partitionId
-      val offsetString = readDataMaybeNull(zkClient, znode)
+      val offsetString = readDataMaybeNull(zkClient, znode)._1
       if (offsetString != null)
         return offsetString.toLong
       else
@@ -416,7 +416,7 @@ private[kafka] class ZookeeperConsumerCo
       }
     }
 
-    private def deletePartitionOwnershipFromZK(topic: String, partition: String) {
+    private def deletePartitionOwnershipFromZK(topic: String, partition: Int) {
       val topicDirs = new ZKGroupTopicDirs(group, topic)
       val znode = topicDirs.consumerOwnerDir + "/" + partition
       deletePath(zkClient, znode)
@@ -427,7 +427,7 @@ private[kafka] class ZookeeperConsumerCo
       info("Releasing partition ownership")
       for ((topic, infos) <- localTopicRegistry) {
         for(partition <- infos.keys)
-          deletePartitionOwnershipFromZK(topic, partition.toString)
+          deletePartitionOwnershipFromZK(topic, partition)
         localTopicRegistry.remove(topic)
       }
     }
@@ -484,7 +484,7 @@ private[kafka] class ZookeeperConsumerCo
 
       releasePartitionOwnership(topicRegistry)
 
-      var partitionOwnershipDecision = new collection.mutable.HashMap[(String, String), String]()
+      var partitionOwnershipDecision = new collection.mutable.HashMap[(String, Int), String]()
       var currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
 
       for ((topic, consumerThreadIdSet) <- myTopicThreadIdsMap) {
@@ -492,7 +492,7 @@ private[kafka] class ZookeeperConsumerCo
 
         val topicDirs = new ZKGroupTopicDirs(group, topic)
         val curConsumers = consumersPerTopicMap.get(topic).get
-        var curPartitions: Seq[String] = partitionsPerTopicMap.get(topic).get
+        var curPartitions: Seq[Int] = partitionsPerTopicMap.get(topic).get
 
         val nPartsPerConsumer = curPartitions.size / curConsumers.size
         val nConsumersWithExtraPart = curPartitions.size % curConsumers.size
@@ -602,13 +602,13 @@ private[kafka] class ZookeeperConsumerCo
       }
     }
 
-    private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[(String, String), String]): Boolean = {
-      var successfullyOwnedPartitions : List[(String, String)] = Nil
+    private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[(String, Int), String]): Boolean = {
+      var successfullyOwnedPartitions : List[(String, Int)] = Nil
       val partitionOwnershipSuccessful = partitionOwnershipDecision.map { partitionOwner =>
         val topic = partitionOwner._1._1
         val partition = partitionOwner._1._2
         val consumerThreadId = partitionOwner._2
-        val partitionOwnerPath = getConsumerPartitionOwnerPath(group, topic,partition)
+        val partitionOwnerPath = getConsumerPartitionOwnerPath(group, topic, partition)
         try {
           createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId)
           info(consumerThreadId + " successfully owned partition " + partition + " for topic " + topic)
@@ -633,29 +633,29 @@ private[kafka] class ZookeeperConsumerCo
     }
 
     private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]],
-                                      topicDirs: ZKGroupTopicDirs, partition: String,
+                                      topicDirs: ZKGroupTopicDirs, partition: Int,
                                       topic: String, consumerThreadId: String) {
       val partTopicInfoMap = currentTopicRegistry.get(topic)
 
       // find the leader for this partition
-      val leaderOpt = getLeaderForPartition(zkClient, topic, partition.toInt)
+      val leaderOpt = getLeaderForPartition(zkClient, topic, partition)
       leaderOpt match {
-        case None => throw new NoBrokersForPartitionException("No leader available for partition %s on topic %s".
+        case None => throw new NoBrokersForPartitionException("No leader available for partition %d on topic %s".
           format(partition, topic))
-        case Some(l) => debug("Leader for partition %s for topic %s is %d".format(partition, topic, l))
+        case Some(l) => debug("Leader for partition %d for topic %s is %d".format(partition, topic, l))
       }
       val leader = leaderOpt.get
 
       val znode = topicDirs.consumerOffsetDir + "/" + partition
-      val offsetString = readDataMaybeNull(zkClient, znode)
+      val offsetString = readDataMaybeNull(zkClient, znode)._1
       // If first time starting a consumer, set the initial offset based on the config
       var offset : Long = 0L
       if (offsetString == null)
         offset = config.autoOffsetReset match {
               case OffsetRequest.SmallestTimeString =>
-                  earliestOrLatestOffset(topic, leader, partition.toInt, OffsetRequest.EarliestTime)
+                  earliestOrLatestOffset(topic, leader, partition, OffsetRequest.EarliestTime)
               case OffsetRequest.LargestTimeString =>
-                  earliestOrLatestOffset(topic, leader, partition.toInt, OffsetRequest.LatestTime)
+                  earliestOrLatestOffset(topic, leader, partition, OffsetRequest.LatestTime)
               case _ =>
                   throw new InvalidConfigException("Wrong value in autoOffsetReset in ConsumerConfig")
         }
@@ -666,12 +666,12 @@ private[kafka] class ZookeeperConsumerCo
       val fetchedOffset = new AtomicLong(offset)
       val partTopicInfo = new PartitionTopicInfo(topic,
                                                  leader,
-                                                 partition.toInt,
+                                                 partition,
                                                  queue,
                                                  consumedOffset,
                                                  fetchedOffset,
                                                  new AtomicInteger(config.fetchSize))
-      partTopicInfoMap.put(partition.toInt, partTopicInfo)
+      partTopicInfoMap.put(partition, partTopicInfo)
       debug(partTopicInfo + " selected new offset " + offset)
     }
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Wed Aug  1 16:13:59 2012
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -29,7 +29,6 @@ import kafka.common.{KafkaException, Inv
 
 object Log {
   val FileSuffix = ".kafka"
-  val hwFileName = "highwatermark"
 
   /**
    * Find a given range object in a list of ranges by a value in that range. Does a binary search over the ranges
@@ -115,8 +114,8 @@ class LogSegment(val file: File, val mes
  * An append-only log for storing messages. 
  */
 @threadsafe
-private[kafka] class Log(val dir: File, val maxSize: Long, val flushInterval: Int, val needRecovery: Boolean, time: Time)
-  extends Logging {
+private[kafka] class Log( val dir: File, val maxSize: Long, val flushInterval: Int, val needRecovery: Boolean, time: Time, brokerId: Int = 0) extends Logging {
+  this.logIdent = "Kafka Log on Broker " + brokerId + ", "
 
   import kafka.log.Log._
 
@@ -126,7 +125,7 @@ private[kafka] class Log(val dir: File, 
   /* The current number of unflushed messages appended to the write */
   private val unflushed = new AtomicInteger(0)
 
-   /* last time it was flushed */
+  /* last time it was flushed */
   private val lastflushedTime = new AtomicLong(System.currentTimeMillis)
 
   /* The actual segments of the log */
@@ -191,8 +190,7 @@ private[kafka] class Log(val dir: File, 
         val curr = segments.get(i)
         val next = segments.get(i+1)
         if(curr.start + curr.size != next.start)
-          throw new KafkaException("The following segments don't validate: " +
-                  curr.file.getAbsolutePath() + ", " + next.file.getAbsolutePath())
+          throw new KafkaException("The following segments don't validate: " + curr.file.getAbsolutePath() + ", " + next.file.getAbsolutePath())
       }
     }
   }
@@ -231,13 +229,12 @@ private[kafka] class Log(val dir: File, 
     BrokerTopicStat.getBrokerTopicStat(topicName).recordMessagesIn(numberOfMessages)
     BrokerTopicStat.getBrokerAllTopicStat.recordMessagesIn(numberOfMessages)
     logStats.recordAppendedMessages(numberOfMessages)
-    
+
     // truncate the message set's buffer upto validbytes, before appending it to the on-disk log
     val validByteBuffer = messages.getBuffer.duplicate()
     val messageSetValidBytes = messages.validBytes
     if(messageSetValidBytes > Int.MaxValue || messageSetValidBytes < 0)
-      throw new InvalidMessageSizeException("Illegal length of message set " + messageSetValidBytes +
-        " Message set cannot be appended to log. Possible causes are corrupted produce requests")
+      throw new InvalidMessageSizeException("Illegal length of message set " + messageSetValidBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests")
 
     validByteBuffer.limit(messageSetValidBytes.asInstanceOf[Int])
     val validMessages = new ByteBufferMessageSet(validByteBuffer)
@@ -375,15 +372,15 @@ private[kafka] class Log(val dir: File, 
       case OffsetRequest.EarliestTime =>
         startIndex = 0
       case _ =>
-          var isFound = false
-          debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2)))
-          startIndex = offsetTimeArray.length - 1
-          while (startIndex >= 0 && !isFound) {
-            if (offsetTimeArray(startIndex)._2 <= request.time)
-              isFound = true
-            else
-              startIndex -=1
-          }
+        var isFound = false
+        debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2)))
+        startIndex = offsetTimeArray.length - 1
+        while (startIndex >= 0 && !isFound) {
+          if (offsetTimeArray(startIndex)._2 <= request.time)
+            isFound = true
+          else
+            startIndex -=1
+        }
     }
 
     val retSize = request.maxNumOffsets.min(startIndex + 1)
@@ -408,7 +405,13 @@ private[kafka] class Log(val dir: File, 
     }
   }
 
-  /* Attemps to delete all provided segments from a log and returns how many it was able to */
+
+  def deleteWholeLog():Unit = {
+    deleteSegments(segments.contents.get())
+    Utils.rm(dir)
+  }
+
+  /* Attempts to delete all provided segments from a log and returns how many it was able to */
   def deleteSegments(segments: Seq[LogSegment]): Int = {
     var total = 0
     for(segment <- segments) {
@@ -424,30 +427,27 @@ private[kafka] class Log(val dir: File, 
   }
 
   def truncateTo(targetOffset: Long) {
-      // find the log segment that has this hw
-      val segmentToBeTruncated = segments.view.find(segment =>
-        targetOffset >= segment.start && targetOffset < segment.absoluteEndOffset)
-
-      segmentToBeTruncated match {
-        case Some(segment) =>
-          val truncatedSegmentIndex = segments.view.indexOf(segment)
-          segments.truncLast(truncatedSegmentIndex)
-          segment.truncateTo(targetOffset)
-          info("Truncated log segment %s to highwatermark %d".format(segment.file.getAbsolutePath, targetOffset))
-        case None =>
-          assert(targetOffset <= segments.view.last.absoluteEndOffset,
-            "Last checkpointed hw %d cannot be greater than the latest message offset %d in the log %s".
-              format(targetOffset, segments.view.last.absoluteEndOffset, segments.view.last.file.getAbsolutePath))
-          error("Cannot truncate log to %d since the log start offset is %d and end offset is %d"
-            .format(targetOffset, segments.view.head.start, segments.view.last.absoluteEndOffset))
-      }
+    // find the log segment that has this hw
+    val segmentToBeTruncated = segments.view.find(
+      segment => targetOffset >= segment.start && targetOffset < segment.absoluteEndOffset)
+
+    segmentToBeTruncated match {
+      case Some(segment) =>
+        val truncatedSegmentIndex = segments.view.indexOf(segment)
+        segments.truncLast(truncatedSegmentIndex)
+        segment.truncateTo(targetOffset)
+        info("Truncated log segment %s to highwatermark %d".format(segment.file.getAbsolutePath, targetOffset))
+      case None =>
+        assert(targetOffset <= segments.view.last.absoluteEndOffset, "Last checkpointed hw %d cannot be greater than the latest message offset %d in the log %s".format(targetOffset, segments.view.last.absoluteEndOffset, segments.view.last.file.getAbsolutePath))
+        error("Cannot truncate log to %d since the log start offset is %d and end offset is %d".format(targetOffset, segments.view.head.start, segments.view.last.absoluteEndOffset))
+    }
 
-      val segmentsToBeDeleted = segments.view.filter(segment => segment.start > targetOffset)
-      if(segmentsToBeDeleted.size < segments.view.size) {
+    val segmentsToBeDeleted = segments.view.filter(segment => segment.start > targetOffset)
+    if(segmentsToBeDeleted.size < segments.view.size) {
       val numSegmentsDeleted = deleteSegments(segmentsToBeDeleted)
       if(numSegmentsDeleted != segmentsToBeDeleted.size)
         error("Failed to delete some segments during log recovery")
-      }
+    }
   }
 
   def topicName():String = {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala Wed Aug  1 16:13:59 2012
@@ -1,11 +1,11 @@
 /**
- 	* Licensed to the Apache Software Foundation (ASF) under one or more
+ * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -35,7 +35,7 @@ private[kafka] class LogManager(val conf
                                 val logCleanupIntervalMs: Long,
                                 val logCleanupDefaultAgeMs: Long,
                                 needRecovery: Boolean) extends Logging {
-  
+
   val logDir: File = new File(config.logDir)
   private val numPartitions = config.numPartitions
   private val maxSize: Long = config.logFileSize
@@ -44,6 +44,7 @@ private[kafka] class LogManager(val conf
   private val logFlushIntervals = config.flushIntervalMap
   private val logRetentionMs = config.logRetentionHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) // convert hours to ms
   private val logRetentionSize = config.logRetentionSize
+  this.logIdent = "Log Manager on Broker " + config.brokerId + ", "
 
   /* Initialize a log for each subdirectory of the main log directory */
   private val logs = new Pool[String, Pool[Int, Log]]()
@@ -60,11 +61,11 @@ private[kafka] class LogManager(val conf
         warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?")
       } else {
         info("Loading log '" + dir.getName() + "'")
-        val log = new Log(dir, maxSize, flushInterval, needRecovery, time)
-        val topicPartion = Utils.getTopicPartition(dir.getName)
-        logs.putIfNotExists(topicPartion._1, new Pool[Int, Log]())
-        val parts = logs.get(topicPartion._1)
-        parts.put(topicPartion._2, log)
+        val log = new Log(dir, maxSize, flushInterval, needRecovery, time, config.brokerId)
+        val topicPartition = Utils.getTopicPartition(dir.getName)
+        logs.putIfNotExists(topicPartition._1, new Pool[Int, Log]())
+        val parts = logs.get(topicPartition._1)
+        parts.put(topicPartition._2, log)
       }
     }
   }
@@ -78,9 +79,9 @@ private[kafka] class LogManager(val conf
       info("Starting log cleaner every " + logCleanupIntervalMs + " ms")
       scheduler.scheduleWithRate(cleanupLogs, "kafka-logcleaner-", 60 * 1000, logCleanupIntervalMs, false)
       info("Starting log flusher every " + config.flushSchedulerThreadRate +
-        " ms with the following overrides " + logFlushIntervals)
+                   " ms with the following overrides " + logFlushIntervals)
       scheduler.scheduleWithRate(flushAllLogs, "kafka-logflusher-",
-        config.flushSchedulerThreadRate, config.flushSchedulerThreadRate, false)
+                                 config.flushSchedulerThreadRate, config.flushSchedulerThreadRate, false)
     }
   }
 
@@ -93,14 +94,14 @@ private[kafka] class LogManager(val conf
       throw new InvalidTopicException("Topic name can't be emtpy")
     if (partition < 0 || partition >= config.topicPartitionsMap.getOrElse(topic, numPartitions)) {
       val error = "Wrong partition %d, valid partitions (0, %d)."
-        .format(partition, (config.topicPartitionsMap.getOrElse(topic, numPartitions) - 1))
+              .format(partition, (config.topicPartitionsMap.getOrElse(topic, numPartitions) - 1))
       warn(error)
       throw new InvalidPartitionException(error)
     }
     logCreationLock synchronized {
       val d = new File(logDir, topic + "-" + partition)
       d.mkdirs()
-      new Log(d, maxSize, flushInterval, false, time)
+      new Log(d, maxSize, flushInterval, false, time, config.brokerId)
     }
   }
 
@@ -195,18 +196,19 @@ private[kafka] class LogManager(val conf
       debug("Garbage collecting '" + log.name + "'")
       total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
     }
-    debug("Log cleanup completed. " + total + " files deleted in " + 
-                 (time.milliseconds - startMs) / 1000 + " seconds")
+    debug("Log cleanup completed. " + total + " files deleted in " +
+                  (time.milliseconds - startMs) / 1000 + " seconds")
   }
-  
+
   /**
    * Close all the logs
    */
   def shutdown() {
-    info("Closing log manager")
+    info("shut down")
     allLogs.foreach(_.close())
+    info("shutted down completedly")
   }
-  
+
   /**
    * Get all the partition logs
    */
@@ -222,7 +224,7 @@ private[kafka] class LogManager(val conf
         if(logFlushIntervals.contains(log.topicName))
           logFlushInterval = logFlushIntervals(log.topicName)
         debug(log.topicName + " flush interval  " + logFlushInterval +
-            " last flushed " + log.getLastFlushedTime + " timesincelastFlush: " + timeSinceLastFlush)
+                      " last flushed " + log.getLastFlushedTime + " timesincelastFlush: " + timeSinceLastFlush)
         if(timeSinceLastFlush >= logFlushInterval)
           log.flush
       }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala Wed Aug  1 16:13:59 2012
@@ -31,12 +31,13 @@ import kafka.utils._
  *   N Processor threads that each have their own selector and read requests from sockets
  *   M Handler threads that handle requests and produce responses back to the processor threads for writing.
  */
-class SocketServer(val port: Int,
+class SocketServer(val brokerId: Int,
+                   val port: Int,
                    val numProcessorThreads: Int, 
                    val monitoringPeriodSecs: Int,
                    val maxQueuedRequests: Int,
                    val maxRequestSize: Int = Int.MaxValue) extends Logging {
-
+  this.logIdent = "Socket Server on Broker " + brokerId + ", "
   private val time = SystemTime
   private val processors = new Array[Processor](numProcessorThreads)
   private var acceptor: Acceptor = new Acceptor(port, processors)
@@ -57,18 +58,18 @@ class SocketServer(val port: Int,
     // start accepting connections
     Utils.newThread("kafka-acceptor", acceptor, false).start()
     acceptor.awaitStartup
-    info("Kafka socket server started")
+    info("started")
   }
 
   /**
    * Shutdown the socket server
    */
   def shutdown() = {
-    info("Shutting down socket server")
+    info("shutting down")
     acceptor.shutdown
     for(processor <- processors)
       processor.shutdown
-    info("Shut down socket server complete")
+    info("shutted down completely")
   }
 }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala Wed Aug  1 16:13:59 2012
@@ -23,7 +23,7 @@ import kafka.cluster.Broker
 
 abstract class AbstractFetcherManager(protected val name: String, numFetchers: Int = 1) extends Logging {
     // map of (source brokerid, fetcher Id per source broker) => fetcher
-  private val fetcherThreadMap = new mutable.HashMap[Tuple2[Int, Int], AbstractFetcherThread]
+  private val fetcherThreadMap = new mutable.HashMap[(Int, Int), AbstractFetcherThread]
   private val mapLock = new Object
   this.logIdent = name + " "
 
@@ -52,7 +52,7 @@ abstract class AbstractFetcherManager(pr
   }
 
   def removeFetcher(topic: String, partitionId: Int) {
-    info("%s removing fetcher on topic %s, partition %d".format(name, topic, partitionId))
+    info("removing fetcher on topic %s, partition %d".format(topic, partitionId))
     mapLock synchronized {
       for ((key, fetcher) <- fetcherThreadMap) {
         fetcher.removePartition(topic, partitionId)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Wed Aug  1 16:13:59 2012
@@ -25,24 +25,29 @@ import kafka.common._
 import kafka.log._
 import kafka.message._
 import kafka.network._
-import kafka.utils.{SystemTime, Logging}
 import org.apache.log4j.Logger
 import scala.collection._
 import mutable.HashMap
 import scala.math._
 import kafka.network.RequestChannel.Response
+import kafka.utils.{ZkUtils, SystemTime, Logging}
+import kafka.cluster.Replica
 
 /**
  * Logic to handle the various Kafka requests
  */
-class KafkaApis(val requestChannel: RequestChannel,
-                val logManager: LogManager,
-                val replicaManager: ReplicaManager,
-                val kafkaZookeeper: KafkaZooKeeper) extends Logging {
+class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
+                val replicaManager: ReplicaManager, val kafkaZookeeper: KafkaZooKeeper,
+                addReplicaCbk: (String, Int, Set[Int]) => Replica,
+                stopReplicaCbk: (String, Int) => Short,
+                becomeLeader: (Replica, LeaderAndISR) => Short,
+                becomeFollower: (Replica, LeaderAndISR) => Short,
+                brokerId: Int) extends Logging {
 
-  private val produceRequestPurgatory = new ProducerRequestPurgatory
-  private val fetchRequestPurgatory = new FetchRequestPurgatory(requestChannel)
+  private val produceRequestPurgatory = new ProducerRequestPurgatory(brokerId)
+  private val fetchRequestPurgatory = new FetchRequestPurgatory(brokerId, requestChannel)
   private val requestLogger = Logger.getLogger("kafka.request.logger")
+  this.logIdent = "KafkaApi on Broker " + brokerId + ", "
 
   /**
    * Top-level method that handles all requests and multiplexes to the right api
@@ -62,12 +67,46 @@ class KafkaApis(val requestChannel: Requ
 
 
   def handleLeaderAndISRRequest(request: RequestChannel.Request){
-    val leaderAndISRRequest = LeaderAndISRRequest.readFrom(request.request.buffer)
     val responseMap = new HashMap[(String, Int), Short]
+    val leaderAndISRRequest = LeaderAndISRRequest.readFrom(request.request.buffer)
+    info("handling leader and isr request " + leaderAndISRRequest)
+
+    for((partitionInfo, leaderAndISR) <- leaderAndISRRequest.leaderAndISRInfos){
+      var errorCode = ErrorMapping.NoError
+      val topic = partitionInfo._1
+      val partition = partitionInfo._2
+
+      // If the partition does not exist locally, create it
+      if(replicaManager.getPartition(topic, partition) == None){
+        trace("the partition (%s, %d) does not exist locally, check if current broker is in assigned replicas, if so, start the local replica".format(topic, partition))
+        val assignedReplicas = ZkUtils.getReplicasForPartition(kafkaZookeeper.getZookeeperClient, topic, partition)
+        trace("assigned replicas list for topic [%s] partition [%d] is [%s]".format(topic, partition, assignedReplicas.toString))
+        if(assignedReplicas.contains(brokerId)) {
+          val replica = addReplicaCbk(topic, partition, assignedReplicas.toSet)
+          info("starting replica for topic [%s] partition [%d]".format(replica.topic, replica.partition.partitionId))
+        }
+      }
+      val replica = replicaManager.getReplica(topic, partition).get
+      // The command ask this broker to be new leader for P and it isn't the leader yet
+      val requestedLeaderId = leaderAndISR.leader
+      // If the broker is requested to be the leader and it's not current the leader (the leader id is set and not equal to broker id)
+      if(requestedLeaderId == brokerId && (!replica.partition.leaderId().isDefined || replica.partition.leaderId().get != brokerId)){
+        info("becoming the leader for partition [%s, %d] at the leader and isr request %s".format(topic, partition, leaderAndISRRequest))
+        errorCode = becomeLeader(replica, leaderAndISR)
+      }
+      else if (requestedLeaderId != brokerId) {
+        info("becoming the follower for partition [%s, %d] at the leader and isr request %s".format(topic, partition, leaderAndISRRequest))
+        errorCode = becomeFollower(replica, leaderAndISR)
+      }
+
+      responseMap.put(partitionInfo, errorCode)
+    }
 
-    // TODO: put in actual logic later
-    for((key, value) <- leaderAndISRRequest.leaderAndISRInfos){
-      responseMap.put(key, ErrorMapping.NoError)
+    if(leaderAndISRRequest.isInit == LeaderAndISRRequest.IsInit){
+      replicaManager.startHighWaterMarksCheckPointThread
+      val partitionsToRemove = replicaManager.allPartitions.filter(p => !leaderAndISRRequest.leaderAndISRInfos.contains(p._1)).keySet
+      info("init flag is set in leaderAndISR request, partitions to remove: %s".format(partitionsToRemove))
+      partitionsToRemove.foreach(p => stopReplicaCbk(p._1, p._2))
     }
 
     val leaderAndISRResponse = new LeaderAndISRResponse(leaderAndISRRequest.versionId, responseMap)
@@ -79,9 +118,9 @@ class KafkaApis(val requestChannel: Requ
     val stopReplicaRequest = StopReplicaRequest.readFrom(request.request.buffer)
     val responseMap = new HashMap[(String, Int), Short]
 
-    // TODO: put in actual logic later
     for((topic, partition) <- stopReplicaRequest.stopReplicaSet){
-      responseMap.put((topic, partition), ErrorMapping.NoError)
+      val errorCode = stopReplicaCbk(topic, partition)
+      responseMap.put((topic, partition), errorCode)
     }
     val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, responseMap)
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse)))
@@ -95,7 +134,7 @@ class KafkaApis(val requestChannel: Requ
     var satisfied = new mutable.ArrayBuffer[DelayedFetch]
     for(partitionData <- partitionDatas)
       satisfied ++= fetchRequestPurgatory.update((topic, partitionData.partition), partitionData)
-    trace("Produce request to %s unblocked %d DelayedFetchRequests.".format(topic, satisfied.size))
+    trace("produce request to %s unblocked %d DelayedFetchRequests.".format(topic, satisfied.size))
     // send any newly unblocked responses
     for(fetchReq <- satisfied) {
       val topicData = readMessageSets(fetchReq.fetch)
@@ -111,11 +150,11 @@ class KafkaApis(val requestChannel: Requ
     val produceRequest = ProducerRequest.readFrom(request.request.buffer)
     val sTime = SystemTime.milliseconds
     if(requestLogger.isTraceEnabled)
-      requestLogger.trace("Producer request " + request.toString)
+      requestLogger.trace("producer request %s".format(produceRequest.toString))
     trace("Broker %s received produce request %s".format(logManager.config.brokerId, produceRequest.toString))
 
     val response = produceToLocalLog(produceRequest)
-    debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
+    debug("produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
 
     if (produceRequest.requiredAcks == 0 ||
         produceRequest.requiredAcks == 1 ||
@@ -133,13 +172,11 @@ class KafkaApis(val requestChannel: Requ
           (topic, partitionData.partition)
         })
       })
-
       val delayedProduce = new DelayedProduce(
         topicPartitionPairs, request,
         response.errors, response.offsets,
         produceRequest, produceRequest.ackTimeoutMs.toLong)
       produceRequestPurgatory.watch(delayedProduce)
-
       /*
        * Replica fetch requests may have arrived (and potentially satisfied)
        * delayedProduce requests before they even made it to the purgatory.
@@ -147,10 +184,9 @@ class KafkaApis(val requestChannel: Requ
        */
       var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce]
       topicPartitionPairs.foreach(topicPartition =>
-        satisfiedProduceRequests ++=
-          produceRequestPurgatory.update(topicPartition, topicPartition))
-      debug(satisfiedProduceRequests.size +
-        " DelayedProduce requests unblocked after produce to local log.")
+                                    satisfiedProduceRequests ++=
+                                            produceRequestPurgatory.update(topicPartition, topicPartition))
+      debug("%d DelayedProduce requests unblocked after produce to local log.".format(satisfiedProduceRequests.size))
       satisfiedProduceRequests.foreach(_.respond())
     }
   }
@@ -159,10 +195,10 @@ class KafkaApis(val requestChannel: Requ
    * Helper method for handling a parsed producer request
    */
   private def produceToLocalLog(request: ProducerRequest): ProducerResponse = {
+    trace("produce [%s] to local log ".format(request.toString))
     val requestSize = request.topicPartitionCount
     val errors = new Array[Short](requestSize)
     val offsets = new Array[Long](requestSize)
-
     var msgIndex = -1
     for(topicData <- request.data) {
       for(partitionData <- topicData.partitionDataArray) {
@@ -181,7 +217,7 @@ class KafkaApis(val requestChannel: Requ
           case e =>
             BrokerTopicStat.getBrokerTopicStat(topicData.topic).recordFailedProduceRequest
             BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest
-            error("Error processing ProducerRequest on " + topicData.topic + ":" + partitionData.partition, e)
+            error("error processing ProducerRequest on " + topicData.topic + ":" + partitionData.partition, e)
             e match {
               case _: IOException =>
                 fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e)
@@ -193,7 +229,8 @@ class KafkaApis(val requestChannel: Requ
         }
       }
     }
-    new ProducerResponse(request.versionId, request.correlationId, errors, offsets)
+    val ret = new ProducerResponse(request.versionId, request.correlationId, errors, offsets)
+    ret
   }
 
   /**
@@ -201,9 +238,7 @@ class KafkaApis(val requestChannel: Requ
    */
   def handleFetchRequest(request: RequestChannel.Request) {
     val fetchRequest = FetchRequest.readFrom(request.request.buffer)
-    if(requestLogger.isTraceEnabled)
-      requestLogger.trace("Fetch request " + fetchRequest.toString)
-    trace("Broker %s received fetch request %s".format(logManager.config.brokerId, fetchRequest.toString))
+    trace("handling fetch request: " + fetchRequest.toString)
     // validate the request
     try {
       fetchRequest.validate()
@@ -225,8 +260,7 @@ class KafkaApis(val requestChannel: Requ
           )
         })
       })
-      trace("Replica %d fetch unblocked %d DelayedProduce requests.".format(
-        fetchRequest.replicaId, satisfiedProduceRequests.size))
+      debug("replica %d fetch unblocked %d DelayedProduce requests.".format(fetchRequest.replicaId, satisfiedProduceRequests.size))
       satisfiedProduceRequests.foreach(_.respond())
     }
 
@@ -236,11 +270,11 @@ class KafkaApis(val requestChannel: Requ
        availableBytes >= fetchRequest.minBytes ||
        fetchRequest.numPartitions <= 0) {
       val topicData = readMessageSets(fetchRequest)
-      debug("Returning fetch response %s for fetch request with correlation id %d"
-        .format(topicData.map(_.partitionDataArray.map(_.error).mkString(",")).mkString(","), fetchRequest.correlationId))
+      debug("returning fetch response %s for fetch request with correlation id %d".format(topicData.map(_.partitionDataArray.map(_.error).mkString(",")).mkString(","), fetchRequest.correlationId))
       val response = new FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, topicData)
       requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response)))
     } else {
+      debug("putting fetch request into purgatory")
       // create a list of (topic, partition) pairs to use as keys for this delayed request
       val topicPartitionPairs: Seq[Any] = fetchRequest.offsetInfo.flatMap(o => o.partitions.map((o.topic, _)))
       val delayedFetch = new DelayedFetch(topicPartitionPairs, request, fetchRequest, fetchRequest.maxWait, availableBytes)
@@ -256,7 +290,6 @@ class KafkaApis(val requestChannel: Requ
     for(offsetDetail <- fetchRequest.offsetInfo) {
       for(i <- 0 until offsetDetail.partitions.size) {
         try {
-          debug("Fetching log for topic %s partition %d".format(offsetDetail.topic, offsetDetail.partitions(i)))
           val maybeLog = logManager.getLog(offsetDetail.topic, offsetDetail.partitions(i))
           val available = maybeLog match {
             case Some(log) => max(0, log.logEndOffset - offsetDetail.offsets(i))
@@ -265,7 +298,7 @@ class KafkaApis(val requestChannel: Requ
           totalBytes += math.min(offsetDetail.fetchSizes(i), available)
         } catch {
           case e: InvalidPartitionException =>
-            info("Invalid partition " + offsetDetail.partitions(i) + "in fetch request from client '" + fetchRequest.clientId + "'")
+            info("invalid partition " + offsetDetail.partitions(i) + "in fetch request from client '" + fetchRequest.clientId + "'")
         }
       }
     }
@@ -274,13 +307,13 @@ class KafkaApis(val requestChannel: Requ
 
   private def maybeUpdatePartitionHW(fetchRequest: FetchRequest) {
     val offsets = fetchRequest.offsetInfo
-
+    debug("act on update partition HW, check offset detail: %s ".format(offsets))
     for(offsetDetail <- offsets) {
       val topic = offsetDetail.topic
       val (partitions, offsets) = (offsetDetail.partitions, offsetDetail.offsets)
       for( (partition, offset) <- (partitions, offsets).zipped.map((_,_))) {
         replicaManager.recordFollowerPosition(topic, partition, fetchRequest.replicaId, offset,
-          kafkaZookeeper.getZookeeperClient)
+                                              kafkaZookeeper.getZookeeperClient)
       }
     }
   }
@@ -310,21 +343,17 @@ class KafkaApis(val requestChannel: Requ
             BrokerTopicStat.getBrokerTopicStat(topic).recordBytesOut(messages.sizeInBytes)
             BrokerTopicStat.getBrokerAllTopicStat.recordBytesOut(messages.sizeInBytes)
             val leaderReplicaOpt = replicaManager.getReplica(topic, partition, logManager.config.brokerId)
-            assert(leaderReplicaOpt.isDefined, "Leader replica for topic %s partition %d".format(topic, partition) +
-              " must exist on leader broker %d".format(logManager.config.brokerId))
+            assert(leaderReplicaOpt.isDefined, "Leader replica for topic %s partition %d".format(topic, partition) + " must exist on leader broker %d".format(logManager.config.brokerId))
             val leaderReplica = leaderReplicaOpt.get
             fetchRequest.replicaId match {
               case FetchRequest.NonFollowerId => // replica id value of -1 signifies a fetch request from an external client, not from one of the replicas
                 new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages)
               case _ => // fetch request from a follower
                 val replicaOpt = replicaManager.getReplica(topic, partition, fetchRequest.replicaId)
-                assert(replicaOpt.isDefined, "No replica %d in replica manager on %d"
-                  .format(fetchRequest.replicaId, replicaManager.config.brokerId))
+                assert(replicaOpt.isDefined, "No replica %d in replica manager on %d".format(fetchRequest.replicaId, replicaManager.config.brokerId))
                 val replica = replicaOpt.get
-                debug("Leader %d for topic %s partition %d received fetch request from follower %d"
-                  .format(logManager.config.brokerId, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
-                debug("Leader %d returning %d messages for topic %s partition %d to follower %d"
-                  .format(logManager.config.brokerId, messages.sizeInBytes, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
+                debug("leader [%d] for topic [%s] partition [%d] received fetch request from follower [%d]".format(logManager.config.brokerId, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
+                debug("Leader %d returning %d messages for topic %s partition %d to follower %d".format(logManager.config.brokerId, messages.sizeInBytes, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
                 new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages)
             }
         }
@@ -343,7 +372,7 @@ class KafkaApis(val requestChannel: Requ
     try {
       // check if the current broker is the leader for the partitions
       kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topic, partition)
-      trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize))
+      trace("fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize))
       val log = logManager.getLog(topic, partition)
       response = Right(log match { case Some(l) => l.read(offset, maxSize) case None => MessageSet.Empty })
     } catch {
@@ -360,7 +389,7 @@ class KafkaApis(val requestChannel: Requ
   def handleOffsetRequest(request: RequestChannel.Request) {
     val offsetRequest = OffsetRequest.readFrom(request.request.buffer)
     if(requestLogger.isTraceEnabled)
-      requestLogger.trace("Offset request " + offsetRequest.toString)
+      requestLogger.trace("offset request " + offsetRequest.toString)
     var response: OffsetResponse = null
     try {
       kafkaZookeeper.ensurePartitionLeaderOnThisBroker(offsetRequest.topic, offsetRequest.partition)
@@ -372,8 +401,7 @@ class KafkaApis(val requestChannel: Requ
         System.exit(1)
       case e =>
         warn("Error while responding to offset request", e)
-        response = new OffsetResponse(offsetRequest.versionId, Array.empty[Long],
-          ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]).toShort)
+        response = new OffsetResponse(offsetRequest.versionId, Array.empty[Long],ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]).toShort)
     }
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }
@@ -384,41 +412,39 @@ class KafkaApis(val requestChannel: Requ
   def handleTopicMetadataRequest(request: RequestChannel.Request) {
     val metadataRequest = TopicMetadataRequest.readFrom(request.request.buffer)
     if(requestLogger.isTraceEnabled)
-      requestLogger.trace("Topic metadata request " + metadataRequest.toString())
-
+      requestLogger.trace("topic metadata request " + metadataRequest.toString())
     val topicsMetadata = new mutable.ArrayBuffer[TopicMetadata]()
     val zkClient = kafkaZookeeper.getZookeeperClient
     var errorCode = ErrorMapping.NoError
     val config = logManager.config
-
     try {
       val topicMetadataList = AdminUtils.getTopicMetaDataFromZK(metadataRequest.topics, zkClient)
-
-      metadataRequest.topics.zip(topicMetadataList).foreach { topicAndMetadata =>
-        val topic = topicAndMetadata._1
-        topicAndMetadata._2.errorCode match {
-          case ErrorMapping.NoError => topicsMetadata += topicAndMetadata._2
-          case ErrorMapping.UnknownTopicCode =>
-            /* check if auto creation of topics is turned on */
-            if(config.autoCreateTopics) {
-              CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
-              info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
-                .format(topic, config.numPartitions, config.defaultReplicationFactor))
-              val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
-              newTopicMetadata.errorCode match {
-                case ErrorMapping.NoError => topicsMetadata += newTopicMetadata
-                case _ =>
-                  throw new KafkaException("Topic metadata for automatically created topic %s does not exist".format(topic))
+      metadataRequest.topics.zip(topicMetadataList).foreach(
+        topicAndMetadata =>{
+          val topic = topicAndMetadata._1
+          topicAndMetadata._2.errorCode match {
+            case ErrorMapping.NoError => topicsMetadata += topicAndMetadata._2
+            case ErrorMapping.UnknownTopicCode =>
+              /* check if auto creation of topics is turned on */
+              if(config.autoCreateTopics) {
+                CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
+                info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
+                             .format(topic, config.numPartitions, config.defaultReplicationFactor))
+                val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
+                newTopicMetadata.errorCode match {
+                  case ErrorMapping.NoError => topicsMetadata += newTopicMetadata
+                  case _ =>
+                    throw new KafkaException("Topic metadata for automatically created topic %s does not exist".format(topic))
+                }
               }
-            }
-          case _ => error("Error while fetching topic metadata for topic " + topic,
-            ErrorMapping.exceptionFor(topicAndMetadata._2.errorCode).getCause)
-        }
-      }
+            case _ => error("Error while fetching topic metadata for topic " + topic,
+                            ErrorMapping.exceptionFor(topicAndMetadata._2.errorCode).getCause)
+          }
+        })
     }catch {
       case e => error("Error while retrieving topic metadata", e)
-        // convert exception type to error code
-        errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
+      // convert exception type to error code
+      errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
     }
     topicsMetadata.foreach(metadata => trace("Sending topic metadata " + metadata.toString))
     val response = new TopicMetaDataResponse(metadataRequest.versionId, topicsMetadata.toSeq, errorCode)
@@ -426,7 +452,10 @@ class KafkaApis(val requestChannel: Requ
   }
 
   def close() {
+    debug("shut down")
     fetchRequestPurgatory.shutdown()
+    produceRequestPurgatory.shutdown()
+    debug("shutted down completely")
   }
 
   /**
@@ -439,7 +468,7 @@ class KafkaApis(val requestChannel: Requ
   /**
    * A holding pen for fetch requests waiting to be satisfied
    */
-  class FetchRequestPurgatory(requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, PartitionData] {
+  class FetchRequestPurgatory(brokerId: Int, requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, PartitionData]("Fetch Request Purgatory on Broker " + brokerId + ", ") {
 
     /**
      * A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field
@@ -447,6 +476,7 @@ class KafkaApis(val requestChannel: Requ
     def checkSatisfied(partitionData: PartitionData, delayedFetch: DelayedFetch): Boolean = {
       val messageDataSize = partitionData.messages.sizeInBytes
       val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(messageDataSize)
+      debug("fetch request check, accm size: " + accumulatedSize + " delay fetch min bytes: " + delayedFetch.fetch.minBytes)
       accumulatedSize >= delayedFetch.fetch.minBytes
     }
 
@@ -466,7 +496,7 @@ class KafkaApis(val requestChannel: Requ
                        requiredOffsets: Array[Long],
                        val produce: ProducerRequest,
                        delayMs: Long)
-    extends DelayedRequest(keys, request, delayMs) with Logging {
+          extends DelayedRequest(keys, request, delayMs) with Logging {
 
     /**
      * Map of (topic, partition) -> partition status
@@ -495,15 +525,15 @@ class KafkaApis(val requestChannel: Requ
 
     def respond() {
       val errorsAndOffsets: (List[Short], List[Long]) = (
-        keys.foldRight
-          ((List[Short](), List[Long]()))
-          ((key: Any, result: (List[Short], List[Long])) => {
-            val status = partitionStatus(key)
-            (status.error :: result._1, status.requiredOffset :: result._2)
-          })
-        )
+              keys.foldRight
+                      ((List[Short](), List[Long]()))
+                      ((key: Any, result: (List[Short], List[Long])) => {
+                        val status = partitionStatus(key)
+                        (status.error :: result._1, status.requiredOffset :: result._2)
+                      })
+              )
       val response = new ProducerResponse(produce.versionId, produce.correlationId,
-        errorsAndOffsets._1.toArray, errorsAndOffsets._2.toArray)
+                                          errorsAndOffsets._1.toArray, errorsAndOffsets._2.toArray)
 
       requestChannel.sendResponse(new RequestChannel.Response(
         request, new BoundedByteBufferSend(response)))
@@ -539,7 +569,7 @@ class KafkaApis(val requestChannel: Requ
                 numAcks, produce.requiredAcks,
                 topic, partitionId))
               if ((produce.requiredAcks < 0 && numAcks >= isr.size) ||
-                (produce.requiredAcks > 0 && numAcks >= produce.requiredAcks)) {
+                      (produce.requiredAcks > 0 && numAcks >= produce.requiredAcks)) {
                 /*
                  * requiredAcks < 0 means acknowledge after all replicas in ISR
                  * are fully caught up to the (local) leader's offset
@@ -588,8 +618,7 @@ class KafkaApis(val requestChannel: Requ
   /**
    * A holding pen for produce requests waiting to be satisfied.
    */
-  private [kafka] class ProducerRequestPurgatory
-    extends RequestPurgatory[DelayedProduce, (String, Int)] {
+  private [kafka] class ProducerRequestPurgatory(brokerId: Int) extends RequestPurgatory[DelayedProduce, (String, Int)]("Producer Request Purgatory on Broker " + brokerId + ", ") {
 
     protected def checkSatisfied(fetchRequestPartition: (String, Int),
                                  delayedProduce: DelayedProduce) =



Mime
View raw message