kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1344964 [1/2] - in /incubator/kafka/branches/0.8: contrib/hadoop-consumer/src/main/java/kafka/etl/ core/src/main/scala/kafka/api/ core/src/main/scala/kafka/cluster/ core/src/main/scala/kafka/common/ core/src/main/scala/kafka/log/ core/src/...
Date Fri, 01 Jun 2012 01:53:21 GMT
Author: nehanarkhede
Date: Fri Jun  1 01:53:19 2012
New Revision: 1344964

URL: http://svn.apache.org/viewvc?rev=1344964&view=rev
Log:
KAFKA-46: Message replication feature without failures; patched by Neha Narkhede; reviewed by Jun Rao, Jay Kreps, Prashanth Menon

Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/UnknownTopicException.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/inmemory/
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
Modified:
    incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/NotLeaderForPartitionException.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogStats.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/SegmentList.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/   (props changed)
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
    incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/SegmentListTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala

Modified: incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java (original)
+++ incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java Fri Jun  1 01:53:19 2012
@@ -270,7 +270,7 @@ public class KafkaETLContext {
         } else if (errorCode == ErrorMapping.InvalidMessageCode()) {
             throw new IOException(_input + " current offset=" + _offset
                     + " : invalid offset.");
-        } else if (errorCode == ErrorMapping.WrongPartitionCode()) {
+        } else if (errorCode == ErrorMapping.InvalidPartitionCode()) {
             throw new IOException(_input + " : wrong partition");
         } else if (errorCode != ErrorMapping.NoError()) {
             throw new IOException(_input + " current offset=" + _offset

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala Fri Jun  1 01:53:19 2012
@@ -18,10 +18,10 @@
 package kafka.api
 
 import java.nio.ByteBuffer
-import kafka.common.FetchRequestFormatException
 import kafka.network.Request
 import kafka.utils.Utils
 import scala.collection.mutable.{HashMap, Buffer, ListBuffer}
+import kafka.common.FetchRequestFormatException
 
 object OffsetDetail {
 
@@ -116,7 +116,8 @@ case class FetchRequest(versionId: Short
     var topics = Set[String]()
     val iter = offsetInfo.iterator
     while(iter.hasNext) {
-      val topic = iter.next.topic
+      val offsetData = iter.next()
+      val topic = offsetData.topic
       if(topics.contains(topic))
         throw new FetchRequestFormatException("FetchRequest has multiple OffsetDetails for topic: " + topic)
       else

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala Fri Jun  1 01:53:19 2012
@@ -29,18 +29,20 @@ object PartitionData {
     val partition = buffer.getInt
     val error = buffer.getInt
     val initialOffset = buffer.getLong
+    val hw = buffer.getLong()
     val messageSetSize = buffer.getInt
     val messageSetBuffer = buffer.slice()
     messageSetBuffer.limit(messageSetSize)
     buffer.position(buffer.position + messageSetSize)
-    new PartitionData(partition, error, initialOffset, new ByteBufferMessageSet(messageSetBuffer, initialOffset, error))
+    new PartitionData(partition, error, initialOffset, hw, new ByteBufferMessageSet(messageSetBuffer, initialOffset, error))
   }
 }
 
-case class PartitionData(partition: Int, error: Int = ErrorMapping.NoError, initialOffset:Long = 0L, messages: MessageSet) {
-  val sizeInBytes = 4 + 4 + 8 + 4 + messages.sizeInBytes.intValue()
+case class PartitionData(partition: Int, error: Int = ErrorMapping.NoError, initialOffset:Long = 0L, hw: Long = -1L,
+                         messages: MessageSet) {
+  val sizeInBytes = 4 + 4 + 8 + 4 + messages.sizeInBytes.intValue() + 8
 
-  def this(partition: Int, messages: MessageSet) = this(partition, ErrorMapping.NoError, 0L, messages)
+  def this(partition: Int, messages: MessageSet) = this(partition, ErrorMapping.NoError, 0L, -1L, messages)
 
 }
 
@@ -117,6 +119,14 @@ case class FetchResponse(versionId: Shor
     }
     messageSet.asInstanceOf[ByteBufferMessageSet]
   }
+
+  def highWatermark(topic: String, partition: Int): Long = {
+    topicMap.get(topic) match {
+      case Some(topicData) =>
+        TopicData.findPartition(topicData.partitionData, partition).map(_.hw).getOrElse(-1L)
+      case None => -1L
+    }
+  }
 }
 
 // SENDS
@@ -125,10 +135,11 @@ class PartitionDataSend(val partitionDat
   private val messageSize = partitionData.messages.sizeInBytes
   private var messagesSentSize = 0L
 
-  private val buffer = ByteBuffer.allocate(20)
+  private val buffer = ByteBuffer.allocate(28)
   buffer.putInt(partitionData.partition)
   buffer.putInt(partitionData.error)
   buffer.putLong(partitionData.initialOffset)
+  buffer.putLong(partitionData.hw)
   buffer.putInt(partitionData.messages.sizeInBytes.intValue())
   buffer.rewind()
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala Fri Jun  1 01:53:19 2012
@@ -16,12 +16,168 @@
  */
 package kafka.cluster
 
+import kafka.common.NoLeaderForPartitionException
+import kafka.utils.{SystemTime, Time, Logging}
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils.ZkUtils._
+import java.util.concurrent.locks.ReentrantLock
+import java.lang.IllegalStateException
+
 /**
  * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR
- * TODO: Commit queue to be added as part of KAFKA-46. Add AR, ISR, CUR, RAR state maintenance as part of KAFKA-302
  */
-case class Partition(topic: String, val partId: Int, var leader: Option[Replica] = None,
-                     assignedReplicas: Set[Replica] = Set.empty[Replica],
-                     inSyncReplicas: Set[Replica] = Set.empty[Replica],
-                     catchUpReplicas: Set[Replica] = Set.empty[Replica],
-                     reassignedReplicas: Set[Replica] = Set.empty[Replica])
+class Partition(val topic: String,
+                val partitionId: Int,
+                time: Time = SystemTime,
+                var inSyncReplicas: Set[Replica] = Set.empty[Replica]) extends Logging {
+  private var leaderReplicaId: Option[Int] = None
+  private var assignedReplicas: Set[Replica] = Set.empty[Replica]
+  private var highWatermarkUpdateTime: Long = -1L
+  private val leaderISRUpdateLock = new ReentrantLock()
+
+  def leaderId(newLeader: Option[Int] = None): Option[Int] = {
+    try {
+      leaderISRUpdateLock.lock()
+      if(newLeader.isDefined) {
+        info("Updating leader for for topic %s partition %d to replica %d".format(topic, partitionId, newLeader.get))
+        leaderReplicaId = newLeader
+      }
+      leaderReplicaId
+    }finally {
+      leaderISRUpdateLock.unlock()
+    }
+  }
+
+  def assignedReplicas(replicas: Option[Set[Replica]] = None): Set[Replica] = {
+    replicas match {
+      case Some(ar) =>
+        assignedReplicas = ar
+      case None =>
+    }
+    assignedReplicas
+  }
+
+  def getReplica(replicaId: Int): Option[Replica] = assignedReplicas().find(_.brokerId == replicaId)
+
+  def addReplica(replica: Replica): Boolean = {
+    if(!assignedReplicas.contains(replica)) {
+      assignedReplicas += replica
+      true
+    }else false
+  }
+
+  def updateReplicaLEO(replica: Replica, leo: Long) {
+    replica.leoUpdateTime = time.milliseconds
+    replica.logEndOffset(Some(leo))
+    debug("Updating the leo to %d for replica %d".format(leo, replica.brokerId))
+  }
+
+  def leaderReplica(): Replica = {
+    val leaderReplicaId = leaderId()
+    if(leaderReplicaId.isDefined) {
+      val leaderReplica = assignedReplicas().find(_.brokerId == leaderReplicaId.get)
+      if(leaderReplica.isDefined) leaderReplica.get
+      else throw new IllegalStateException("No replica for leader %d in the replica manager"
+        .format(leaderReplicaId.get))
+    }else
+      throw new NoLeaderForPartitionException("Leader for topic %s partition %d does not exist"
+        .format(topic, partitionId))
+  }
+
+  def leaderHW(newHw: Option[Long] = None): Long = {
+    newHw match {
+      case Some(highWatermark) =>
+        leaderReplica().highWatermark(newHw)
+        highWatermarkUpdateTime = time.milliseconds
+        highWatermark
+      case None =>
+        leaderReplica().highWatermark()
+    }
+  }
+
+  def hwUpdateTime: Long = highWatermarkUpdateTime
+
+  def getOutOfSyncReplicas(keepInSyncTimeMs: Long, keepInSyncBytes: Long): Set[Replica] = {
+    /**
+     * there are two cases that need to be handled here -
+     * 1. Stuck followers: If the leo of the replica is less than the leo of leader and the leo hasn't been updated
+     *                     for keepInSyncTimeMs ms, the follower is stuck and should be removed from the ISR
+     * 2. Slow followers: If the leo of the slowest follower is behind the leo of the leader by keepInSyncBytes, the
+     *                     follower is not catching up and should be removed from the ISR
+    **/
+    // Case 1 above
+    val possiblyStuckReplicas = inSyncReplicas.filter(r => r.logEndOffset() < leaderReplica().logEndOffset())
+    info("Possibly stuck replicas for topic %s partition %d are %s".format(topic, partitionId,
+      possiblyStuckReplicas.map(_.brokerId).mkString(",")))
+    val stuckReplicas = possiblyStuckReplicas.filter(r => r.logEndOffsetUpdateTime() < (time.milliseconds - keepInSyncTimeMs))
+    info("Stuck replicas for topic %s partition %d are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(",")))
+    val leader = leaderReplica()
+    // Case 2 above
+    val slowReplicas = inSyncReplicas.filter(r => (leader.logEndOffset() - r.logEndOffset()) > keepInSyncBytes)
+    info("Slow replicas for topic %s partition %d are %s".format(topic, partitionId, slowReplicas.map(_.brokerId).mkString(",")))
+    stuckReplicas ++ slowReplicas
+  }
+
+  def updateISR(newISR: Set[Int], zkClientOpt: Option[ZkClient] = None) {
+    try {
+      leaderISRUpdateLock.lock()
+      zkClientOpt match {
+        case Some(zkClient) =>
+          // update ISR in ZK
+          updateISRInZk(newISR, zkClient)
+        case None =>
+      }
+      // update partition's ISR in cache
+      inSyncReplicas = newISR.map {r =>
+        getReplica(r) match {
+          case Some(replica) => replica
+          case None => throw new IllegalStateException("ISR update failed. No replica for id %d".format(r))
+        }
+      }
+      info("Updated ISR for for topic %s partition %d to %s in cache".format(topic, partitionId, newISR.mkString(",")))
+    }catch {
+      case e => throw new IllegalStateException("Failed to update ISR for topic %s ".format(topic) +
+        "partition %d to %s".format(partitionId, newISR.mkString(",")), e)
+    }finally {
+      leaderISRUpdateLock.unlock()
+    }
+  }
+
+  private def updateISRInZk(newISR: Set[Int], zkClient: ZkClient) = {
+    val replicaListAndEpochString = readDataMaybeNull(zkClient, getTopicPartitionInSyncPath(topic, partitionId.toString))
+    if(replicaListAndEpochString == null) {
+      throw new NoLeaderForPartitionException(("Illegal partition state. ISR cannot be updated for topic " +
+        "%s partition %d since leader and ISR does not exist in ZK".format(topic, partitionId)))
+    }
+    else {
+      val replicasAndEpochInfo = replicaListAndEpochString.split(";")
+      val epoch = replicasAndEpochInfo.last
+      updatePersistentPath(zkClient, getTopicPartitionInSyncPath(topic, partitionId.toString),
+        "%s;%s".format(newISR.mkString(","), epoch))
+      info("Updating ISR for for topic %s partition %d to %s in ZK".format(topic, partitionId, newISR.mkString(",")))
+    }
+  }
+
+  override def equals(that: Any): Boolean = {
+    if(!(that.isInstanceOf[Partition]))
+      return false
+    val other = that.asInstanceOf[Partition]
+    if(topic.equals(other.topic) && partitionId == other.partitionId)
+      return true
+    false
+  }
+
+  override def hashCode(): Int = {
+    31 + topic.hashCode() + 17*partitionId
+  }
+
+  override def toString(): String = {
+    val partitionString = new StringBuilder
+    partitionString.append("Topic: " + topic)
+    partitionString.append("; Partition: " + partitionId)
+    partitionString.append("; Leader: " + leaderId())
+    partitionString.append("; Assigned replicas: " + assignedReplicas().map(_.brokerId).mkString(","))
+    partitionString.append("; In Sync replicas: " + inSyncReplicas.map(_.brokerId).mkString(","))
+    partitionString.toString()
+  }
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala Fri Jun  1 01:53:19 2012
@@ -18,6 +18,123 @@
 package kafka.cluster
 
 import kafka.log.Log
+import kafka.server.{KafkaConfig, ReplicaFetcherThread}
+import java.lang.IllegalStateException
+import kafka.utils.Logging
 
-case class Replica(brokerId: Int, partition: Partition, topic: String,
-                   var log: Option[Log] = None, var hw: Long = -1, var leo: Long = -1, isLocal: Boolean = false)
\ No newline at end of file
+class Replica(val brokerId: Int,
+              val partition: Partition,
+              val topic: String,
+              var log: Option[Log] = None,
+              var leoUpdateTime: Long = -1L) extends Logging {
+  private var logEndOffset: Long = -1L
+  private var replicaFetcherThread: ReplicaFetcherThread = null
+
+  def logEndOffset(newLeo: Option[Long] = None): Long = {
+    isLocal match {
+      case true =>
+        newLeo match {
+          case Some(newOffset) => throw new IllegalStateException("Trying to set the leo %d for local log".format(newOffset))
+          case None => log.get.logEndOffset
+        }
+      case false =>
+        newLeo match {
+          case Some(newOffset) =>
+            logEndOffset = newOffset
+            logEndOffset
+          case None => logEndOffset
+        }
+    }
+  }
+
+  def logEndOffsetUpdateTime(time: Option[Long] = None): Long = {
+    time match {
+      case Some(t) =>
+        leoUpdateTime = t
+        leoUpdateTime
+      case None =>
+        leoUpdateTime
+    }
+  }
+
+  def isLocal: Boolean = {
+    log match {
+      case Some(l) => true
+      case None => false
+    }
+  }
+
+  def highWatermark(highwaterMarkOpt: Option[Long] = None): Long = {
+    highwaterMarkOpt match {
+      case Some(highwaterMark) =>
+        isLocal match {
+          case true =>
+            trace("Setting hw for topic %s partition %d on broker %d to %d".format(topic, partition.partitionId,
+                                                                                   brokerId, highwaterMark))
+            log.get.setHW(highwaterMark)
+            highwaterMark
+          case false => throw new IllegalStateException("Unable to set highwatermark for topic %s ".format(topic) +
+            "partition %d on broker %d, since there is no local log for this partition"
+              .format(partition.partitionId, brokerId))
+        }
+      case None =>
+        isLocal match {
+          case true =>
+            log.get.highwaterMark
+          case false => throw new IllegalStateException("Unable to get highwatermark for topic %s ".format(topic) +
+            "partition %d on broker %d, since there is no local log for this partition"
+              .format(partition.partitionId, brokerId))
+        }
+    }
+  }
+
+  def startReplicaFetcherThread(leaderBroker: Broker, config: KafkaConfig) {
+    val name = "Replica-Fetcher-%d-%s-%d".format(brokerId, topic, partition.partitionId)
+    replicaFetcherThread = new ReplicaFetcherThread(name, this, leaderBroker, config)
+    replicaFetcherThread.setDaemon(true)
+    replicaFetcherThread.start()
+  }
+
+  def stopReplicaFetcherThread() {
+    if(replicaFetcherThread != null) {
+      replicaFetcherThread.shutdown()
+      replicaFetcherThread = null
+    }
+  }
+
+  def getIfFollowerAndLeader(): (Boolean, Int) = {
+    replicaFetcherThread != null match {
+      case true => (true, replicaFetcherThread.getLeader().id)
+      case false => (false, -1)
+    }
+  }
+
+  def close() {
+    if(replicaFetcherThread != null)
+      replicaFetcherThread.shutdown()
+  }
+
+  override def equals(that: Any): Boolean = {
+    if(!(that.isInstanceOf[Replica]))
+      return false
+    val other = that.asInstanceOf[Replica]
+    if(topic.equals(other.topic) && brokerId == other.brokerId && partition.equals(other.partition))
+      return true
+    false
+  }
+
+  override def hashCode(): Int = {
+    31 + topic.hashCode() + 17*brokerId + partition.hashCode()
+  }
+
+
+  override def toString(): String = {
+    val replicaString = new StringBuilder
+    replicaString.append("ReplicaId: " + brokerId)
+    replicaString.append("; Topic: " + topic)
+    replicaString.append("; Partition: " + partition.toString)
+    replicaString.append("; isLocal: " + isLocal)
+    if(isLocal) replicaString.append("; Highwatermark: " + highWatermark())
+    replicaString.toString()
+  }
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala Fri Jun  1 01:53:19 2012
@@ -20,6 +20,7 @@ package kafka.common
 import kafka.message.InvalidMessageException
 import java.nio.ByteBuffer
 import java.lang.Throwable
+import scala.Predef._
 
 /**
  * A bi-directional mapping between error codes and exceptions x  
@@ -31,21 +32,23 @@ object ErrorMapping {
   val NoError = 0
   val OffsetOutOfRangeCode = 1
   val InvalidMessageCode = 2
-  val WrongPartitionCode = 3
+  val InvalidPartitionCode = 3
   val InvalidFetchSizeCode = 4
   val InvalidFetchRequestFormatCode = 5
-  val NoLeaderForPartitionCode = 6
-  val NotLeaderForPartitionCode = 7
+  val NotLeaderForPartitionCode = 6
+  val NoLeaderForPartitionCode = 7
+  val UnknownTopicCode = 8
 
   private val exceptionToCode = 
     Map[Class[Throwable], Int](
       classOf[OffsetOutOfRangeException].asInstanceOf[Class[Throwable]] -> OffsetOutOfRangeCode,
       classOf[InvalidMessageException].asInstanceOf[Class[Throwable]] -> InvalidMessageCode,
-      classOf[InvalidPartitionException].asInstanceOf[Class[Throwable]] -> WrongPartitionCode,
+      classOf[InvalidPartitionException].asInstanceOf[Class[Throwable]] -> InvalidPartitionCode,
       classOf[InvalidMessageSizeException].asInstanceOf[Class[Throwable]] -> InvalidFetchSizeCode,
       classOf[FetchRequestFormatException].asInstanceOf[Class[Throwable]] -> InvalidFetchRequestFormatCode,
-      classOf[NoLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NoLeaderForPartitionCode,
-      classOf[NotLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NotLeaderForPartitionCode
+      classOf[NotLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NotLeaderForPartitionCode,
+      classOf[NoLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NoLeaderForPartitionCode
+//      classOf[UnknownTopicException].asInstanceOf[Class[Throwable]] -> UnknownTopicCode
     ).withDefaultValue(UnknownCode)
   
   /* invert the mapping */

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/NotLeaderForPartitionException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/NotLeaderForPartitionException.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/NotLeaderForPartitionException.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/NotLeaderForPartitionException.scala Fri Jun  1 01:53:19 2012
@@ -14,12 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package kafka.common
 
 /**
- * Exception raised when broker receives a produce message for partition it does not lead
- * @param message - A more detailed and descriptive error message
+ * Thrown when a request is made for partition on a broker that is NOT a leader for that partition
  */
-class NotLeaderForPartitionException(message: String) extends Exception(message) {
+class NotLeaderForPartitionException(message: String) extends RuntimeException(message) {
   def this() = this(null)
-}
+}
\ No newline at end of file

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/UnknownTopicException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/UnknownTopicException.scala?rev=1344964&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/UnknownTopicException.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/UnknownTopicException.scala Fri Jun  1 01:53:19 2012
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Thrown when a request is made for a topic, that hasn't been created in a Kafka cluster
+ */
+class UnknownTopicException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}
\ No newline at end of file

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1344964&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Fri Jun  1 01:53:19 2012
@@ -0,0 +1,456 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import kafka.api.OffsetRequest
+import java.io.{IOException, RandomAccessFile, File}
+import java.util.{Comparator, Collections, ArrayList}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicInteger}
+import kafka.message.{MessageSet, InvalidMessageException, FileMessageSet}
+import kafka.utils._
+import java.text.NumberFormat
+import kafka.common.OffsetOutOfRangeException
+
+object Log {
+  val FileSuffix = ".kafka"
+  val hwFileName = "highwatermark"
+
+  /**
+   * Find a given range object in a list of ranges by a value in that range. Does a binary search over the ranges
+   * but instead of checking for equality looks within the range. Takes the array size as an option in case
+   * the array grows while searching happens
+   *
+   * TODO: This should move into SegmentList.scala
+   */
+  def findRange[T <: Range](ranges: Array[T], value: Long, arraySize: Int): Option[T] = {
+    if(ranges.size < 1)
+      return None
+
+    // check out of bounds
+    if(value < ranges(0).start || value > ranges(arraySize - 1).start + ranges(arraySize - 1).size)
+      throw new OffsetOutOfRangeException("offset " + value + " is out of range")
+
+    // check at the end
+    if (value == ranges(arraySize - 1).start + ranges(arraySize - 1).size)
+      return None
+
+    var low = 0
+    var high = arraySize - 1
+    while(low <= high) {
+      val mid = (high + low) / 2
+      val found = ranges(mid)
+      if(found.contains(value))
+        return Some(found)
+      else if (value < found.start)
+        high = mid - 1
+      else
+        low = mid + 1
+    }
+    None
+  }
+
+  def findRange[T <: Range](ranges: Array[T], value: Long): Option[T] =
+    findRange(ranges, value, ranges.length)
+
+  /**
+   * Make log segment file name from offset bytes. All this does is pad out the offset number with zeros
+   * so that ls sorts the files numerically
+   */
+  def nameFromOffset(offset: Long): String = {
+    val nf = NumberFormat.getInstance()
+    nf.setMinimumIntegerDigits(20)
+    nf.setMaximumFractionDigits(0)
+    nf.setGroupingUsed(false)
+    nf.format(offset) + FileSuffix
+  }
+
+  def getEmptyOffsets(request: OffsetRequest): Array[Long] = {
+    if (request.time == OffsetRequest.LatestTime || request.time == OffsetRequest.EarliestTime)
+      return Array(0L)
+    else
+      return Array()
+  }
+}
+
+
+/**
+ * A segment file in the log directory. Each log semgment consists of an open message set, a start offset and a size 
+ */
+class LogSegment(val file: File, val messageSet: FileMessageSet, val start: Long) extends Range {
+  @volatile var deleted = false
+  def size: Long = messageSet.highWaterMark
+  override def toString() = "(file=" + file + ", start=" + start + ", size=" + size + ")"
+}
+
+
+/**
+ * An append-only log for storing messages. 
+ */
+@threadsafe
+private[kafka] class Log(val dir: File, val maxSize: Long, val flushInterval: Int, val needRecovery: Boolean)
+  extends Logging {
+
+  import kafka.log.Log._
+
+  /* A lock that guards all modifications to the log */
+  private val lock = new Object
+
+  /* The current number of unflushed messages appended to the write */
+  private val unflushed = new AtomicInteger(0)
+
+   /* last time it was flushed */
+  private val lastflushedTime = new AtomicLong(System.currentTimeMillis)
+
+  /* The actual segments of the log */
+  private[log] val segments: SegmentList[LogSegment] = loadSegments()
+
+
+  /* create the leader highwatermark file handle */
+  private val hwFile = new RandomAccessFile(dir.getAbsolutePath + "/" + hwFileName, "rw")
+
+  private var hw: Long = 0
+
+  private val logStats = new LogStats(this)
+
+  Utils.registerMBean(logStats, "kafka:type=kafka.logs." + dir.getName)
+
+  /* The name of this log */
+  def name  = dir.getName()
+
+  /* Load the log segments from the log files on disk */
+  private def loadSegments(): SegmentList[LogSegment] = {
+    // open all the segments read-only
+    val logSegments = new ArrayList[LogSegment]
+    val ls = dir.listFiles()
+    if(ls != null) {
+      for(file <- ls if file.isFile && file.toString.endsWith(FileSuffix)) {
+        if(!file.canRead)
+          throw new IOException("Could not read file " + file)
+        val filename = file.getName()
+        val start = filename.substring(0, filename.length - FileSuffix.length).toLong
+        val messageSet = new FileMessageSet(file, false)
+        logSegments.add(new LogSegment(file, messageSet, start))
+      }
+    }
+
+    if(logSegments.size == 0) {
+      // no existing segments, create a new mutable segment
+      val newFile = new File(dir, nameFromOffset(0))
+      val set = new FileMessageSet(newFile, true)
+      logSegments.add(new LogSegment(newFile, set, 0))
+    } else {
+      // there is at least one existing segment, validate and recover them/it
+      // sort segments into ascending order for fast searching
+      Collections.sort(logSegments, new Comparator[LogSegment] {
+        def compare(s1: LogSegment, s2: LogSegment): Int = {
+          if(s1.start == s2.start) 0
+          else if(s1.start < s2.start) -1
+          else 1
+        }
+      })
+      validateSegments(logSegments)
+
+      //make the final section mutable and run recovery on it if necessary
+      val last = logSegments.remove(logSegments.size - 1)
+      last.messageSet.close()
+      info("Loading the last segment " + last.file.getAbsolutePath() + " in mutable mode, recovery " + needRecovery)
+      val mutable = new LogSegment(last.file, new FileMessageSet(last.file, true, new AtomicBoolean(needRecovery)), last.start)
+      logSegments.add(mutable)
+    }
+    new SegmentList(logSegments.toArray(new Array[LogSegment](logSegments.size)))
+  }
+
+  /**
+   * Check that the ranges and sizes add up, otherwise we have lost some data somewhere
+   */
+  private def validateSegments(segments: ArrayList[LogSegment]) {
+    lock synchronized {
+      for(i <- 0 until segments.size - 1) {
+        val curr = segments.get(i)
+        val next = segments.get(i+1)
+        if(curr.start + curr.size != next.start)
+          throw new IllegalStateException("The following segments don't validate: " +
+                  curr.file.getAbsolutePath() + ", " + next.file.getAbsolutePath())
+      }
+    }
+  }
+
+  /**
+   * The number of segments in the log
+   */
+  def numberOfSegments: Int = segments.view.length
+
+  /**
+   * Close this log
+   */
+  def close() {
+    debug("Closing log " + name)
+    lock synchronized {
+      for(seg <- segments.view) {
+        info("Closing log segment " + seg.file.getAbsolutePath)
+        seg.messageSet.close()
+      }
+      checkpointHW()
+      hwFile.close()
+    }
+  }
+
+  /**
+   * Append this message set to the active segment of the log, rolling over to a fresh segment if necessary.
+   * Returns the offset at which the messages are written.
+   */
+  def append(messages: MessageSet): Unit = {
+    // validate the messages
+    var numberOfMessages = 0
+    for(messageAndOffset <- messages) {
+      if(!messageAndOffset.message.isValid)
+        throw new InvalidMessageException()
+      numberOfMessages += 1;
+    }
+
+    logStats.recordAppendedMessages(numberOfMessages)
+    
+    // they are valid, insert them in the log
+    lock synchronized {
+      try {
+        val segment = segments.view.last
+        segment.messageSet.append(messages)
+        maybeFlush(numberOfMessages)
+        maybeRoll(segment)
+      }
+      catch {
+        case e: IOException =>
+          fatal("Halting due to unrecoverable I/O error while handling producer request", e)
+          Runtime.getRuntime.halt(1)
+        case e2 => throw e2
+      }
+    }
+  }
+
+  /**
+   * Read from the log file at the given offset
+   */
+  def read(offset: Long, length: Int): MessageSet = {
+    val view = segments.view
+    Log.findRange(view, offset, view.length) match {
+      case Some(segment) => segment.messageSet.read((offset - segment.start), length)
+      case _ => MessageSet.Empty
+    }
+  }
+
+  /**
+   * Delete any log segments matching the given predicate function
+   */
+  def markDeletedWhile(predicate: LogSegment => Boolean): Seq[LogSegment] = {
+    lock synchronized {
+      val view = segments.view
+      val deletable = view.takeWhile(predicate)
+      for(seg <- deletable)
+        seg.deleted = true
+      val numToDelete = deletable.size
+      // if we are deleting everything, create a new empty segment
+      if(numToDelete == view.size) {
+        roll()
+      }
+      segments.trunc(numToDelete)
+    }
+  }
+
+  /**
+   * Get the size of the log in bytes
+   */
+  def size: Long =
+    segments.view.foldLeft(0L)(_ + _.size)
+
+  /**
+   * The byte offset of the message that will be appended next.
+   */
+  def nextAppendOffset: Long = {
+    flush
+    val last = segments.view.last
+    last.start + last.size
+  }
+
+  /**
+   *  get the current high watermark of the log
+   */
+  def highwaterMark: Long = segments.view.last.messageSet.highWaterMark
+
+  /**
+   *  get the offset of the last message in the log
+   */
+  def logEndOffset: Long = segments.view.last.messageSet.getEndOffset()
+
+  /**
+   * Roll the log over if necessary
+   */
+  private def maybeRoll(segment: LogSegment) {
+    if(segment.messageSet.sizeInBytes > maxSize)
+      roll()
+  }
+
+  /**
+   * Create a new segment and make it active
+   */
+  def roll() {
+    lock synchronized {
+      val last = segments.view.last
+      val newOffset = nextAppendOffset
+      val newFile = new File(dir, nameFromOffset(newOffset))
+      debug("Rolling log '" + name + "' to " + newFile.getName())
+      segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset))
+    }
+  }
+
+  /**
+   * Flush the log if necessary
+   */
+  private def maybeFlush(numberOfMessages : Int) {
+    if(unflushed.addAndGet(numberOfMessages) >= flushInterval) {
+      flush()
+    }
+  }
+
+  /**
+   * Flush this log file to the physical disk
+   */
+  def flush() : Unit = {
+    if (unflushed.get == 0) return
+
+    lock synchronized {
+      debug("Flushing log '" + name + "' last flushed: " + getLastFlushedTime + " current time: " +
+          System.currentTimeMillis)
+      segments.view.last.messageSet.flush()
+      unflushed.set(0)
+      lastflushedTime.set(System.currentTimeMillis)
+      checkpointHW()
+     }
+  }
+
+  def getOffsetsBefore(request: OffsetRequest): Array[Long] = {
+    val segsArray = segments.view
+    var offsetTimeArray: Array[Tuple2[Long, Long]] = null
+    if (segsArray.last.size > 0)
+      offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length + 1)
+    else
+      offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length)
+
+    for (i <- 0 until segsArray.length)
+      offsetTimeArray(i) = (segsArray(i).start, segsArray(i).file.lastModified)
+    if (segsArray.last.size > 0)
+      offsetTimeArray(segsArray.length) = (segsArray.last.start + segsArray.last.messageSet.highWaterMark, SystemTime.milliseconds)
+
+    var startIndex = -1
+    request.time match {
+      case OffsetRequest.LatestTime =>
+        startIndex = offsetTimeArray.length - 1
+      case OffsetRequest.EarliestTime =>
+        startIndex = 0
+      case _ =>
+          var isFound = false
+          debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2)))
+          startIndex = offsetTimeArray.length - 1
+          while (startIndex >= 0 && !isFound) {
+            if (offsetTimeArray(startIndex)._2 <= request.time)
+              isFound = true
+            else
+              startIndex -=1
+          }
+    }
+
+    val retSize = request.maxNumOffsets.min(startIndex + 1)
+    val ret = new Array[Long](retSize)
+    for (j <- 0 until retSize) {
+      ret(j) = offsetTimeArray(startIndex)._1
+      startIndex -= 1
+    }
+    ret
+  }
+
+  /* Attemps to delete all provided segments from a log and returns how many it was able to */
+  def deleteSegments(segments: Seq[LogSegment]): Int = {
+    var total = 0
+    for(segment <- segments) {
+      info("Deleting log segment " + segment.file.getName() + " from " + name)
+      swallow(segment.messageSet.close())
+      if(!segment.file.delete()) {
+        warn("Delete failed.")
+      } else {
+        total += 1
+      }
+    }
+    total
+  }
+
+  def recoverUptoLastCheckpointedHW() {
+    if(hwFile.length() > 0) {
+      // read the last checkpointed hw from disk
+      hwFile.seek(0)
+      val lastKnownHW = hwFile.readLong()
+      // find the log segment that has this hw
+      val segmentToBeTruncated = segments.view.find(segment =>
+        lastKnownHW >= segment.start && lastKnownHW < segment.messageSet.getEndOffset())
+
+      segmentToBeTruncated match {
+        case Some(segment) =>
+          val truncatedSegmentIndex = segments.view.indexOf(segment)
+          segments.truncLast(truncatedSegmentIndex)
+        case None =>
+      }
+
+      segmentToBeTruncated match {
+        case Some(segment) =>
+          segment.messageSet.truncateUpto(lastKnownHW)
+          info("Truncated log segment %s to highwatermark %d".format(segment.file.getAbsolutePath, hw))
+        case None =>
+          assert(lastKnownHW <= segments.view.last.messageSet.size,
+            "Last checkpointed hw %d cannot be greater than the latest message offset %d in the log %s".
+              format(lastKnownHW, segments.view.last.messageSet.size, segments.view.last.file.getAbsolutePath))
+          error("Cannot truncate log to %d since the log start offset is %d and end offset is %d"
+            .format(lastKnownHW, segments.view.head.start, segments.view.last.messageSet.size))
+      }
+
+      val segmentsToBeDeleted = segments.view.filter(segment => segment.start >= lastKnownHW)
+      if(segmentsToBeDeleted.size < segments.view.size) {
+      val numSegmentsDeleted = deleteSegments(segmentsToBeDeleted)
+      if(numSegmentsDeleted != segmentsToBeDeleted.size)
+        error("Failed to delete some segments during log recovery")
+      }
+    }else
+      info("Unable to recover log upto hw. No previously checkpointed high watermark found for " + name)
+  }
+
+  def setHW(latestLeaderHW: Long) {
+    hw = latestLeaderHW
+  }
+
+  def checkpointHW() {
+    hwFile.seek(0)
+    hwFile.writeLong(hw)
+    hwFile.getChannel.force(true)
+  }
+
+  def topicName():String = {
+    name.substring(0, name.lastIndexOf("-"))
+  }
+
+  def getLastFlushedTime():Long = {
+    return lastflushedTime.get
+  }
+}
+  

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala Fri Jun  1 01:53:19 2012
@@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLat
 import kafka.server.KafkaConfig
 import kafka.common.{InvalidTopicException, InvalidPartitionException}
 import kafka.api.OffsetRequest
+import kafka.log.Log._
 
 /**
  * The guy who creates and hands out logs
@@ -144,7 +145,7 @@ private[kafka] class LogManager(val conf
     val log = getLog(offsetRequest.topic, offsetRequest.partition)
     log match {
       case Some(l) => l.getOffsetsBefore(offsetRequest)
-      case None => Log.getEmptyOffsets(offsetRequest)
+      case None => getEmptyOffsets(offsetRequest)
     }
   }
 
@@ -190,28 +191,13 @@ private[kafka] class LogManager(val conf
     log
   }
 
-  /* Attemps to delete all provided segments from a log and returns how many it was able to */
-  private def deleteSegments(log: Log, segments: Seq[LogSegment]): Int = {
-    var total = 0
-    for(segment <- segments) {
-      info("Deleting log segment " + segment.file.getName() + " from " + log.name)
-      swallow(segment.messageSet.close())
-      if(!segment.file.delete()) {
-        warn("Delete failed.")
-      } else {
-        total += 1
-      }
-    }
-    total
-  }
-
   /* Runs through the log removing segments older than a certain age */
   private def cleanupExpiredSegments(log: Log): Int = {
     val startMs = time.milliseconds
-    val topic = Utils.getTopicPartition(log.dir.getName)._1
+    val topic = Utils.getTopicPartition(log.name)._1
     val logCleanupThresholdMS = logRetentionMSMap.get(topic).getOrElse(this.logCleanupDefaultAgeMs)
     val toBeDeleted = log.markDeletedWhile(startMs - _.file.lastModified > logCleanupThresholdMS)
-    val total = deleteSegments(log, toBeDeleted)
+    val total = log.deleteSegments(toBeDeleted)
     total
   }
 
@@ -231,7 +217,7 @@ private[kafka] class LogManager(val conf
       }
     }
     val toBeDeleted = log.markDeletedWhile( shouldDelete )
-    val total = deleteSegments(log, toBeDeleted)
+    val total = log.deleteSegments(toBeDeleted)
     total
   }
 
@@ -292,16 +278,16 @@ private[kafka] class LogManager(val conf
       try{
         val timeSinceLastFlush = System.currentTimeMillis - log.getLastFlushedTime
         var logFlushInterval = config.defaultFlushIntervalMs
-        if(logFlushIntervalMap.contains(log.getTopicName))
-          logFlushInterval = logFlushIntervalMap(log.getTopicName)
-        debug(log.getTopicName + " flush interval  " + logFlushInterval +
+        if(logFlushIntervalMap.contains(log.topicName))
+          logFlushInterval = logFlushIntervalMap(log.topicName)
+        debug(log.topicName + " flush interval  " + logFlushInterval +
             " last flushed " + log.getLastFlushedTime + " timesincelastFlush: " + timeSinceLastFlush)
         if(timeSinceLastFlush >= logFlushInterval)
           log.flush
       }
       catch {
         case e =>
-          error("Error flushing topic " + log.getTopicName, e)
+          error("Error flushing topic " + log.topicName, e)
           e match {
             case _: IOException =>
               fatal("Halting due to unrecoverable I/O error while flushing logs: " + e.getMessage, e)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogStats.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogStats.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogStats.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogStats.scala Fri Jun  1 01:53:19 2012
@@ -36,7 +36,7 @@ class LogStats(val log: Log) extends Log
   
   def getNumberOfSegments: Int = log.numberOfSegments
   
-  def getCurrentOffset: Long = log.getHighwaterMark
+  def getCurrentOffset: Long = log.highwaterMark
   
   def getNumAppendedMessages: Long = numCumulatedMessages.get
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/SegmentList.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/SegmentList.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/SegmentList.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/SegmentList.scala Fri Jun  1 01:53:19 2012
@@ -72,7 +72,29 @@ private[log] class SegmentList[T](seq: S
     }
     deleted
   }
-  
+
+  /**
+   * Delete the items from position newEnd until end of list
+   */
+  def truncLast(newEnd: Int): Seq[T] = {
+    if(newEnd >= contents.get().size-1)
+      throw new IllegalArgumentException("End index must be segment list size - 1");
+    var deleted: Array[T] = null
+    var done = false
+    while(!done) {
+      val curr = contents.get()
+      val newLength = newEnd + 1
+      val updated = new Array[T](newLength)
+      Array.copy(curr, 0, updated, 0, newLength)
+      if(contents.compareAndSet(curr, updated)) {
+        deleted = new Array[T](curr.length - newLength)
+        Array.copy(curr, newEnd + 1, deleted, 0, curr.length - newLength)
+        done = true
+      }
+    }
+    deleted
+  }
+
   /**
    * Get a consistent view of the sequence
    */

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala Fri Jun  1 01:53:19 2012
@@ -146,6 +146,8 @@ class FileMessageSet private[kafka](priv
     */
   def highWaterMark(): Long = setHighWaterMark.get()
 
+  def getEndOffset(): Long = offset + sizeInBytes()
+
   def checkMutable(): Unit = {
     if(!mutable)
       throw new IllegalStateException("Attempt to invoke mutation on immutable message set.")
@@ -208,7 +210,13 @@ class FileMessageSet private[kafka](priv
     needRecover.set(false)    
     len - validUpTo
   }
-  
+
+  def truncateUpto(hw: Long) = {
+    channel.truncate(hw)
+    setSize.set(hw)
+    setHighWaterMark.set(hw)
+  }
+
   /**
    * Read, validate, and discard a single message, returning the next valid offset, and
    * the message being validated

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala Fri Jun  1 01:53:19 2012
@@ -64,6 +64,7 @@ class SocketServer(val port: Int,
    * Shutdown the socket server
    */
   def shutdown() = {
+    info("Shutting down socket server")
     acceptor.shutdown
     for(processor <- processors)
       processor.shutdown

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala Fri Jun  1 01:53:19 2012
@@ -55,14 +55,14 @@ class BrokerPartitionInfo(producerPool: 
       m.leader match {
         case Some(leader) =>
           val leaderReplica = new Replica(leader.id, partition, topic)
-          partition.leader = Some(leaderReplica)
+          partition.leaderId(Some(leaderReplica.brokerId))
           debug("Topic %s partition %d has leader %d".format(topic, m.partitionId, leader.id))
           partition
         case None =>
           debug("Topic %s partition %d does not have a leader yet".format(topic, m.partitionId))
           partition
       }
-    }.sortWith((s, t) => s.partId < t.partId)
+    }.sortWith((s, t) => s.partitionId < t.partitionId)
   }
 
   /**

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala Fri Jun  1 01:53:19 2012
@@ -98,11 +98,8 @@ class DefaultEventHandler[K,V](config: P
       val partitionIndex = getPartition(event.getKey, totalNumPartitions)
       val brokerPartition = topicPartitionsList(partitionIndex)
 
-      val leaderBrokerId = brokerPartition.leader match {
-        case Some(leader) => leader.brokerId
-        case None => -1
-        // postpone the failure until the send operation, so that requests for other brokers are handled correctly
-      }
+      // postpone the failure until the send operation, so that requests for other brokers are handled correctly
+      val leaderBrokerId = brokerPartition.leaderId().getOrElse(-1)
 
       var dataPerBroker: HashMap[(String, Int), Seq[ProducerData[K,Message]]] = null
       ret.get(leaderBrokerId) match {
@@ -113,7 +110,7 @@ class DefaultEventHandler[K,V](config: P
           ret.put(leaderBrokerId, dataPerBroker)
       }
 
-      val topicAndPartition = (event.getTopic, brokerPartition.partId)
+      val topicAndPartition = (event.getTopic, brokerPartition.partitionId)
       var dataPerTopicPartition: ListBuffer[ProducerData[K,Message]] = null
       dataPerBroker.get(topicAndPartition) match {
         case Some(element) =>
@@ -131,7 +128,7 @@ class DefaultEventHandler[K,V](config: P
     debug("Getting the number of broker partitions registered for topic: " + pd.getTopic)
     val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic)
     debug("Broker partitions registered for topic: %s are %s"
-      .format(pd.getTopic, topicPartitionsList.map(p => p.partId).mkString(",")))
+      .format(pd.getTopic, topicPartitionsList.map(p => p.partitionId).mkString(",")))
     val totalNumPartitions = topicPartitionsList.length
     if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition = " + pd.getKey)
     topicPartitionsList

Propchange: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Fri Jun  1 01:53:19 2012
@@ -0,0 +1 @@
+FollowerReplica.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Fri Jun  1 01:53:19 2012
@@ -18,7 +18,6 @@
 package kafka.server
 
 import java.io.IOException
-import java.lang.IllegalStateException
 import java.util.concurrent.atomic._
 import kafka.admin.{CreateTopicCommand, AdminUtils}
 import kafka.api._
@@ -30,12 +29,14 @@ import kafka.utils.{SystemTime, Logging}
 import org.apache.log4j.Logger
 import scala.collection._
 import scala.math._
+import java.lang.IllegalStateException
 
 /**
  * Logic to handle the various Kafka requests
  */
-class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, val kafkaZookeeper: KafkaZooKeeper) extends Logging {
-  
+class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
+                val replicaManager: ReplicaManager, val kafkaZookeeper: KafkaZooKeeper) extends Logging {
+
   private val fetchRequestPurgatory = new FetchRequestPurgatory(requestChannel)
   private val requestLogger = Logger.getLogger("kafka.request.logger")
 
@@ -74,7 +75,7 @@ class KafkaApis(val requestChannel: Requ
     }
     // send any newly unblocked responses
     for(fetchReq <- satisfied) {
-       val topicData = readMessageSets(fetchReq.fetch.offsetInfo)
+       val topicData = readMessageSets(fetchReq.fetch)
        val response = new FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData)
        requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response, ErrorMapping.NoError), -1))
     }
@@ -93,10 +94,10 @@ class KafkaApis(val requestChannel: Requ
       for(partitionData <- topicData.partitionData) {
         msgIndex += 1
         try {
-          // TODO: need to handle ack's here!  Will probably move to another method.
           kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topicData.topic, partitionData.partition)
           val log = logManager.getOrCreateLog(topicData.topic, partitionData.partition)
           log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
+          replicaManager.recordLeaderLogUpdate(topicData.topic, partitionData.partition)
           offsets(msgIndex) = log.nextAppendOffset
           errors(msgIndex) = ErrorMapping.NoError.toShort
           trace(partitionData.messages.sizeInBytes + " bytes written to logs.")
@@ -127,18 +128,24 @@ class KafkaApis(val requestChannel: Requ
 
     // validate the request
     try {
-      fetchRequest.validate()  
+      fetchRequest.validate()
     } catch {
       case e:FetchRequestFormatException =>
         val response = new FetchResponse(FetchResponse.CurrentVersion, fetchRequest.correlationId, Array.empty)
-        val channelResponse = new RequestChannel.Response(request, new FetchResponseSend(response, ErrorMapping.InvalidFetchRequestFormatCode), -1)
+        val channelResponse = new RequestChannel.Response(request, new FetchResponseSend(response,
+          ErrorMapping.InvalidFetchRequestFormatCode), -1)
         requestChannel.sendResponse(channelResponse)
     }
-    
+
+    if(fetchRequest.replicaId != -1)
+      maybeUpdatePartitionHW(fetchRequest)
+
     // if there are enough bytes available right now we can answer the request, otherwise we have to punt
     val availableBytes = availableFetchBytes(fetchRequest)
     if(fetchRequest.maxWait <= 0 || availableBytes >= fetchRequest.minBytes) {
-      val topicData = readMessageSets(fetchRequest.offsetInfo)
+      val topicData = readMessageSets(fetchRequest)
+      debug("Returning fetch response %s for fetch request with correlation id %d"
+        .format(topicData.map(_.partitionData.map(_.error).mkString(",")).mkString(","), fetchRequest.correlationId))
       val response = new FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, topicData)
       requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response, ErrorMapping.NoError), -1))
     } else {
@@ -157,34 +164,71 @@ class KafkaApis(val requestChannel: Requ
     for(offsetDetail <- fetchRequest.offsetInfo) {
       for(i <- 0 until offsetDetail.partitions.size) {
         try {
-          val maybeLog = logManager.getLog(offsetDetail.topic, offsetDetail.partitions(i)) 
+          debug("Fetching log for topic %s partition %d".format(offsetDetail.topic, offsetDetail.partitions(i)))
+          val maybeLog = logManager.getLog(offsetDetail.topic, offsetDetail.partitions(i))
           val available = maybeLog match {
-            case Some(log) => max(0, log.getHighwaterMark - offsetDetail.offsets(i))
+            case Some(log) => max(0, log.highwaterMark - offsetDetail.offsets(i))
             case None => 0
           }
-    	  totalBytes += math.min(offsetDetail.fetchSizes(i), available)
+          totalBytes += math.min(offsetDetail.fetchSizes(i), available)
         } catch {
-          case e: InvalidPartitionException => 
+          case e: InvalidPartitionException =>
             info("Invalid partition " + offsetDetail.partitions(i) + "in fetch request from client '" + fetchRequest.clientId + "'")
         }
       }
     }
     totalBytes
   }
-  
+
+  private def maybeUpdatePartitionHW(fetchRequest: FetchRequest) {
+    val offsets = fetchRequest.offsetInfo
+
+    for(offsetDetail <- offsets) {
+      val topic = offsetDetail.topic
+      val (partitions, offsets) = (offsetDetail.partitions, offsetDetail.offsets)
+      for( (partition, offset) <- (partitions, offsets).zipped.map((_,_))) {
+        replicaManager.recordFollowerPosition(topic, partition, fetchRequest.replicaId, offset,
+          kafkaZookeeper.getZookeeperClient)
+      }
+    }
+  }
+
   /**
    * Read from all the offset details given and produce an array of topic datas
    */
-  private def readMessageSets(offsets: Seq[OffsetDetail]): Array[TopicData] = {
+  private def readMessageSets(fetchRequest: FetchRequest): Array[TopicData] = {
+    val offsets = fetchRequest.offsetInfo
     val fetchedData = new mutable.ArrayBuffer[TopicData]()
+
     for(offsetDetail <- offsets) {
       val info = new mutable.ArrayBuffer[PartitionData]()
       val topic = offsetDetail.topic
       val (partitions, offsets, fetchSizes) = (offsetDetail.partitions, offsetDetail.offsets, offsetDetail.fetchSizes)
       for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ) {
         val partitionInfo = readMessageSet(topic, partition, offset, fetchSize) match {
-          case Left(err) => new PartitionData(partition, err, offset, MessageSet.Empty)
-          case Right(messages) => new PartitionData(partition, ErrorMapping.NoError, offset, messages)
+          case Left(err) =>
+            fetchRequest.replicaId match {
+              case -1 => new PartitionData(partition, err, offset, -1L, MessageSet.Empty)
+              case _ =>
+                new PartitionData(partition, err, offset, -1L, MessageSet.Empty)
+            }
+          case Right(messages) =>
+            val leaderReplicaOpt = replicaManager.getReplica(topic, partition, logManager.config.brokerId)
+            assert(leaderReplicaOpt.isDefined, "Leader replica for topic %s partition %d".format(topic, partition) +
+              " must exist on leader broker %d".format(logManager.config.brokerId))
+            val leaderReplica = leaderReplicaOpt.get
+            fetchRequest.replicaId match {
+              case -1 => // replica id value of -1 signifies a fetch request from an external client, not from one of the replicas
+                new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages)
+              case _ => // fetch request from a follower
+                val replicaOpt = replicaManager.getReplica(topic, partition, fetchRequest.replicaId)
+                assert(replicaOpt.isDefined, "No replica %d in replica manager on %d"
+                  .format(fetchRequest.replicaId, replicaManager.config.brokerId))
+                val replica = replicaOpt.get
+                debug("Leader %d for topic %s partition %d received fetch request from follower %d"
+                  .format(logManager.config.brokerId, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
+                new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages)
+            }
         }
         info.append(partitionInfo)
       }
@@ -192,13 +236,15 @@ class KafkaApis(val requestChannel: Requ
     }
     fetchedData.toArray
   }
-  
+
   /**
    * Read from a single topic/partition at the given offset
    */
   private def readMessageSet(topic: String, partition: Int, offset: Long, maxSize: Int): Either[Int, MessageSet] = {
     var response: Either[Int, MessageSet] = null
     try {
+      // check if the current broker is the leader for the partitions
+      kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topic, partition)
       trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize))
       val log = logManager.getLog(topic, partition)
       response = Right(log match { case Some(l) => l.read(offset, maxSize) case None => MessageSet.Empty })
@@ -259,6 +305,10 @@ class KafkaApis(val requestChannel: Requ
     info("Sending response for topic metadata request")
     requestChannel.sendResponse(new RequestChannel.Response(request, new TopicMetadataSend(topicsMetadata), -1))
   }
+
+  def close() {
+    fetchRequestPurgatory.shutdown()
+  }
   
   /**
    * A delayed fetch request
@@ -285,7 +335,7 @@ class KafkaApis(val requestChannel: Requ
      * When a request expires just answer it with whatever data is present
      */
     def expire(delayed: DelayedFetch) {
-      val topicData = readMessageSets(delayed.fetch.offsetInfo)
+      val topicData = readMessageSets(delayed.fetch)
       val response = new FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData)
       requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response, ErrorMapping.NoError), -1))
     }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Fri Jun  1 01:53:19 2012
@@ -20,6 +20,7 @@ package kafka.server
 import java.util.Properties
 import kafka.utils.{Utils, ZKConfig}
 import kafka.message.Message
+import kafka.consumer.ConsumerConfig
 
 /**
  * Configuration settings for the kafka server
@@ -105,7 +106,27 @@ class KafkaConfig(props: Properties) ext
   * leader election on all replicas minus the preferred replica */
   val preferredReplicaWaitTime = Utils.getLong(props, "preferred.replica.wait.time", 300)
 
+  val keepInSyncTimeMs = Utils.getLong(props, "isr.in.sync.time.ms", 30000)
+
+  val keepInSyncBytes = Utils.getLong(props, "isr.in.sync.bytes", 4000)
+
   /* size of the state change request queue in Zookeeper */
   val stateChangeQSize = Utils.getInt(props, "state.change.queue.size", 1000)
 
+  /**
+   * Config options relevant to a follower for a replica
+   */
+  /** the socket timeout for network requests */
+  val replicaSocketTimeoutMs = Utils.getInt(props, "replica.socket.timeout.ms", ConsumerConfig.SocketTimeout)
+
+  /** the socket receive buffer for network requests */
+  val replicaSocketBufferSize = Utils.getInt(props, "replica.socket.buffersize", ConsumerConfig.SocketBufferSize)
+
+  /** the number of byes of messages to attempt to fetch */
+  val replicaFetchSize = Utils.getInt(props, "replica.fetch.size", ConsumerConfig.FetchSize)
+
+  val replicaMaxWaitTimeMs = Utils.getInt(props, "replica.fetch.wait.time.ms", 500)
+
+  val replicaMinBytes = Utils.getInt(props, "replica.fetch.min.bytes", 4086)
+
  }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala Fri Jun  1 01:53:19 2012
@@ -42,7 +42,7 @@ class KafkaRequestHandler(val requestCha
 
 class KafkaRequestHandlerPool(val requestChannel: RequestChannel, 
                               val apis: KafkaApis, 
-                              numThreads: Int) { 
+                              numThreads: Int) extends Logging {
   
   val threads = new Array[Thread](numThreads)
   val runnables = new Array[KafkaRequestHandler](numThreads)
@@ -53,10 +53,12 @@ class KafkaRequestHandlerPool(val reques
   }
   
   def shutdown() {
+    info("Shutting down request handlers")
     for(handler <- runnables)
       handler.shutdown
     for(thread <- threads)
       thread.join
+    info("Request handlers shut down")
   }
   
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Fri Jun  1 01:53:19 2012
@@ -21,15 +21,16 @@ import java.io.File
 import kafka.network.{SocketServerStats, SocketServer}
 import kafka.log.LogManager
 import kafka.utils._
-import kafka.cluster.Replica
 import java.util.concurrent._
 import atomic.AtomicBoolean
+import kafka.cluster.Replica
+import org.I0Itec.zkclient.ZkClient
 
 /**
  * Represents the lifecycle of a single Kafka broker. Handles all functionality required
  * to start up and shutdown a single Kafka node.
  */
-class KafkaServer(val config: KafkaConfig) extends Logging {
+class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging {
 
   val CleanShutdownFile = ".kafka_cleanshutdown"
   private var isShuttingDown = new AtomicBoolean(false)
@@ -39,7 +40,8 @@ class KafkaServer(val config: KafkaConfi
   var requestHandlerPool: KafkaRequestHandlerPool = null
   private var logManager: LogManager = null
   var kafkaZookeeper: KafkaZooKeeper = null
-  val replicaManager = new ReplicaManager(config)
+  private var replicaManager: ReplicaManager = null
+  private var apis: KafkaApis = null
 
   /**
    * Start up API for bringing up a single instance of the Kafka server.
@@ -68,9 +70,11 @@ class KafkaServer(val config: KafkaConfi
                                     config.maxSocketRequestSize)
     Utils.registerMBean(socketServer.stats, statsMBeanName)
 
-    kafkaZookeeper = new KafkaZooKeeper(config, addReplica, getReplica)
+    kafkaZookeeper = new KafkaZooKeeper(config, addReplica, getReplica, makeLeader, makeFollower)
+
+    replicaManager = new ReplicaManager(config, time, kafkaZookeeper.getZookeeperClient)
 
-    val apis = new KafkaApis(socketServer.requestChannel, logManager, kafkaZookeeper)
+    apis = new KafkaApis(socketServer.requestChannel, logManager, replicaManager, kafkaZookeeper)
     requestHandlerPool = new KafkaRequestHandlerPool(socketServer.requestChannel, apis, config.numIoThreads)
     socketServer.startup
 
@@ -84,6 +88,7 @@ class KafkaServer(val config: KafkaConfi
 
     // starting relevant replicas and leader election for partitions assigned to this broker
     kafkaZookeeper.startup
+
     info("Server started.")
   }
   
@@ -95,7 +100,9 @@ class KafkaServer(val config: KafkaConfi
     val canShutdown = isShuttingDown.compareAndSet(false, true);
     if (canShutdown) {
       info("Shutting down Kafka server with id " + config.brokerId)
-      kafkaZookeeper.close
+      apis.close()
+      if(replicaManager != null)
+        replicaManager.close()
       if (socketServer != null)
         socketServer.shutdown()
       if(requestHandlerPool != null)
@@ -103,6 +110,7 @@ class KafkaServer(val config: KafkaConfi
       Utils.unregisterMBean(statsMBeanName)
       if(logManager != null)
         logManager.close()
+      kafkaZookeeper.close
 
       val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile)
       debug("Creating clean shutdown file " + cleanShutDownFile.getAbsolutePath())
@@ -117,13 +125,23 @@ class KafkaServer(val config: KafkaConfi
    */
   def awaitShutdown(): Unit = shutdownLatch.await()
 
-  def addReplica(topic: String, partition: Int): Replica = {
+  def addReplica(topic: String, partition: Int, assignedReplicas: Set[Int]): Replica = {
+    info("Added local replica for topic %s partition %d on broker %d".format(topic, partition, config.brokerId))
     // get local log
     val log = logManager.getOrCreateLog(topic, partition)
-    replicaManager.addLocalReplica(topic, partition, log)
+    replicaManager.addLocalReplica(topic, partition, log, assignedReplicas)
+  }
+
+  def makeLeader(replica: Replica, currentISRInZk: Seq[Int]) {
+    replicaManager.makeLeader(replica, currentISRInZk)
+  }
+
+  def makeFollower(replica: Replica, leaderBrokerId: Int, zkClient: ZkClient) {
+    replicaManager.makeFollower(replica, leaderBrokerId, zkClient)
   }
 
-  def getReplica(topic: String, partition: Int): Option[Replica] = replicaManager.getReplica(topic, partition)
+  def getReplica(topic: String, partition: Int): Option[Replica] =
+    replicaManager.getReplica(topic, partition)
 
   def getLogManager(): LogManager = logManager
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala?rev=1344964&r1=1344963&r2=1344964&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala Fri Jun  1 01:53:19 2012
@@ -17,14 +17,15 @@
 
 package kafka.server
 
-import java.lang.{Thread, IllegalStateException}
 import java.net.InetAddress
-import kafka.admin.AdminUtils
 import kafka.cluster.Replica
-import kafka.common.{NoLeaderForPartitionException, NotLeaderForPartitionException, KafkaZookeeperClient}
 import kafka.utils._
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener, IZkStateListener, ZkClient}
+import kafka.admin.AdminUtils
+import java.lang.{Thread, IllegalStateException}
+import collection.mutable.HashSet
+import kafka.common.{InvalidPartitionException, NoLeaderForPartitionException, NotLeaderForPartitionException, KafkaZookeeperClient}
 
 /**
  * Handles the server's interaction with zookeeper. The server needs to register the following paths:
@@ -33,8 +34,10 @@ import org.I0Itec.zkclient.{IZkDataListe
  *
  */
 class KafkaZooKeeper(config: KafkaConfig,
-                     addReplicaCbk: (String, Int) => Replica,
-                     getReplicaCbk: (String, Int) => Option[Replica]) extends Logging {
+                     addReplicaCbk: (String, Int, Set[Int]) => Replica,
+                     getReplicaCbk: (String, Int) => Option[Replica],
+                     becomeLeader: (Replica, Seq[Int]) => Unit,
+                     becomeFollower: (Replica, Int, ZkClient) => Unit) extends Logging {
 
   val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + config.brokerId
   private var zkClient: ZkClient = null
@@ -109,6 +112,12 @@ class KafkaZooKeeper(config: KafkaConfig
   }
 
   def ensurePartitionLeaderOnThisBroker(topic: String, partition: Int) {
+    // TODO: KAFKA-352 first check if this topic exists in the cluster
+//    if(!topicPartitionsChangeListener.doesTopicExistInCluster(topic))
+//      throw new UnknownTopicException("Topic %s doesn't exist in the cluster".format(topic))
+    // check if partition id is invalid
+    if(partition < 0)
+      throw new InvalidPartitionException("Partition %d is invalid".format(partition))
     ZkUtils.getLeaderForPartition(zkClient, topic, partition) match {
       case Some(leader) =>
         if(leader != config.brokerId)
@@ -179,8 +188,9 @@ class KafkaZooKeeper(config: KafkaConfig
   private def startReplicasForPartitions(topic: String, partitions: Seq[Int]) {
     partitions.foreach { partition =>
       val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partition).map(r => r.toInt)
+      info("Assigned replicas list for topic %s partition %d is %s".format(topic, partition, assignedReplicas.mkString(",")))
       if(assignedReplicas.contains(config.brokerId)) {
-        val replica = addReplicaCbk(topic, partition)
+        val replica = addReplicaCbk(topic, partition, assignedReplicas.toSet)
         startReplica(replica)
       } else
         warn("Ignoring partition %d of topic %s since broker %d doesn't host any replicas for it"
@@ -189,43 +199,114 @@ class KafkaZooKeeper(config: KafkaConfig
   }
 
   private def startReplica(replica: Replica) {
-    info("Starting replica for topic %s partition %d on broker %d".format(replica.topic, replica.partition.partId, replica.brokerId))
-    replica.log match {
-      case Some(log) =>  // log is already started
-      case None =>
-      // TODO: Add log recovery upto the last checkpointed HW as part of KAFKA-46
-    }
-    ZkUtils.getLeaderForPartition(zkClient, replica.topic, replica.partition.partId) match {
-      case Some(leader) => info("Topic %s partition %d has leader %d".format(replica.topic, replica.partition.partId, leader))
+    info("Starting replica for topic %s partition %d on broker %d".format(replica.topic, replica.partition.partitionId,
+      replica.brokerId))
+    ZkUtils.getLeaderForPartition(zkClient, replica.topic, replica.partition.partitionId) match {
+      case Some(leader) => info("Topic %s partition %d has leader %d".format(replica.topic, replica.partition.partitionId,
+        leader))
+        // check if this broker is the leader, if not, then become follower
+        if(leader != config.brokerId)
+          becomeFollower(replica, leader, zkClient)
       case None => // leader election
         leaderElection(replica)
     }
   }
 
   def leaderElection(replica: Replica) {
-    info("Broker %d electing leader for topic %s partition %d".format(config.brokerId, replica.topic, replica.partition.partId))
+    info("Broker %d electing leader for topic %s partition %d".format(config.brokerId, replica.topic, replica.partition.partitionId))
     // read the AR list for replica.partition from ZK
-    val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, replica.topic, replica.partition.partId).map(r => r.toInt)
-    // TODO: read the ISR as part of KAFKA-302
-    if(assignedReplicas.contains(replica.brokerId)) {
+    val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, replica.topic, replica.partition.partitionId).map(_.toInt)
+    val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, replica.topic, replica.partition.partitionId)
+    val liveBrokers = ZkUtils.getSortedBrokerList(zkClient).map(_.toInt)
+    if(canBecomeLeader(config.brokerId, replica.topic, replica.partition.partitionId,
+                       assignedReplicas, inSyncReplicas, liveBrokers)) {
+      info("Broker %d will participate in leader election for topic %s partition %d".format(config.brokerId, replica.topic,
+                                                                                           replica.partition.partitionId))
       // wait for some time if it is not the preferred replica
       try {
-        if(replica.brokerId != assignedReplicas.head)
-          Thread.sleep(config.preferredReplicaWaitTime)
+        if(replica.brokerId != assignedReplicas.head) {
+          // sleep only if the preferred replica is alive
+          if(liveBrokers.contains(assignedReplicas.head)) {
+            info("Preferred replica %d for topic %s ".format(assignedReplicas.head, replica.topic) +
+              "partition %d is alive. Waiting for %d ms to allow it to become leader"
+              .format(replica.partition.partitionId, config.preferredReplicaWaitTime))
+            Thread.sleep(config.preferredReplicaWaitTime)
+          }
+        }
       }catch {
         case e => // ignoring
       }
-      val newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, replica.topic, replica.partition.partId, replica.brokerId)
-      newLeaderEpoch match {
-        case Some(epoch) =>
-          info("Broker %d is leader for topic %s partition %d".format(replica.brokerId, replica.topic, replica.partition.partId))
-          // TODO: Become leader as part of KAFKA-302
+      val newLeaderEpochAndISR = ZkUtils.tryToBecomeLeaderForPartition(zkClient, replica.topic,
+        replica.partition.partitionId, replica.brokerId)
+      newLeaderEpochAndISR match {
+        case Some(epochAndISR) =>
+          info("Broker %d is leader for topic %s partition %d".format(replica.brokerId, replica.topic,
+            replica.partition.partitionId))
+          info("Current ISR for topic %s partition %d is %s".format(replica.topic, replica.partition.partitionId,
+                                                                    epochAndISR._2.mkString(",")))
+          becomeLeader(replica, epochAndISR._2)
         case None =>
+          ZkUtils.getLeaderForPartition(zkClient, replica.topic, replica.partition.partitionId) match {
+            case Some(leader) =>
+              becomeFollower(replica, leader, zkClient)
+            case None =>
+              error("Lost leader for topic %s partition %d right after leader election".format(replica.topic,
+                replica.partition.partitionId))
+          }
       }
     }
   }
 
+  private def canBecomeLeader(brokerId: Int, topic: String, partition: Int, assignedReplicas: Seq[Int],
+                              inSyncReplicas: Seq[Int], liveBrokers: Seq[Int]): Boolean = {
+    // TODO: raise alert, mark the partition offline if no broker in the assigned replicas list is alive
+    assert(assignedReplicas.size > 0, "There should be at least one replica in the assigned replicas list for topic " +
+      " %s partition %d".format(topic, partition))
+    inSyncReplicas.size > 0 match {
+      case true => // check if this broker is in the ISR. If yes, return true
+        inSyncReplicas.contains(brokerId) match {
+          case true =>
+            info("Broker %d can become leader since it is in the ISR %s".format(brokerId, inSyncReplicas.mkString(",")) +
+              " for topic %s partition %d".format(topic, partition))
+            true
+          case false =>
+            // check if any broker in the ISR is alive. If not, return true only if this broker is in the AR
+            val liveBrokersInISR = inSyncReplicas.filter(r => liveBrokers.contains(r))
+            liveBrokersInISR.isEmpty match {
+              case true =>
+                if(assignedReplicas.contains(brokerId)) {
+                  info("No broker in the ISR %s for topic %s".format(inSyncReplicas.mkString(","), topic) +
+                    " partition %d is alive. Broker %d can become leader since it is in the assigned replicas %s"
+                      .format(partition, brokerId, assignedReplicas.mkString(",")))
+                  true
+                }else {
+                  info("No broker in the ISR %s for topic %s".format(inSyncReplicas.mkString(","), topic) +
+                    " partition %d is alive. Broker %d can become leader since it is in the assigned replicas %s"
+                      .format(partition, brokerId, assignedReplicas.mkString(",")))
+                  false
+                }
+              case false =>
+                info("ISR for topic %s partition %d is %s. Out of these %s brokers are alive. Broker %d "
+                  .format(topic, partition, inSyncReplicas.mkString(",")) + "cannot become leader since it doesn't exist " +
+                  "in the ISR")
+                false  // let one of the live brokers in the ISR become the leader
+            }
+        }
+      case false =>
+        if(assignedReplicas.contains(brokerId)) {
+          info("ISR for topic %s partition %d is empty. Broker %d can become leader since it "
+            .format(topic, partition, brokerId) + "is part of the assigned replicas list")
+          true
+        }else {
+          info("ISR for topic %s partition %d is empty. Broker %d cannot become leader since it "
+            .format(topic, partition, brokerId) + "is not part of the assigned replicas list")
+          false
+        }
+    }
+  }
+
   class TopicChangeListener extends IZkChildListener with Logging {
+    private val allTopics = new HashSet[String]()
 
     @throws(classOf[Exception])
     def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
@@ -233,12 +314,16 @@ class KafkaZooKeeper(config: KafkaConfig
         debug("Topic/partition change listener fired for path " + parentPath)
         import scala.collection.JavaConversions._
         val currentChildren = asBuffer(curChilds)
+        allTopics.clear()
         // check if topic has changed or a partition for an existing topic has changed
         if(parentPath == ZkUtils.BrokerTopicsPath) {
           val currentTopics = currentChildren
           debug("New topics " + currentTopics.mkString(","))
           // for each new topic [topic], watch the path /brokers/topics/[topic]/partitions
-          currentTopics.foreach(topic => zkClient.subscribeChildChanges(ZkUtils.getTopicPartitionsPath(topic), this))
+          currentTopics.foreach { topic =>
+            zkClient.subscribeChildChanges(ZkUtils.getTopicPartitionsPath(topic), this)
+            allTopics += topic
+          }
           handleNewTopics(currentTopics)
         }else {
           val topic = parentPath.split("/").takeRight(2).head
@@ -248,6 +333,10 @@ class KafkaZooKeeper(config: KafkaConfig
         }
       }
     }
+
+    def doesTopicExistInCluster(topic: String): Boolean = {
+      allTopics.contains(topic)
+    }
   }
 
   private def ensureStateChangeCommandValidityOnThisBroker(stateChangeCommand: StateChangeCommand): Boolean = {
@@ -275,9 +364,19 @@ class KafkaZooKeeper(config: KafkaConfig
     @throws(classOf[Exception])
     def handleDataChange(dataPath: String, data: Object) {
       // handle leader change event for path
-      val newLeader: String = data.asInstanceOf[String]
-      debug("Leader change listener fired for path %s. New leader is %s".format(dataPath, newLeader))
-      // TODO: update the leader in the list of replicas maintained by the log manager
+      val newLeaderAndEpochInfo: String = data.asInstanceOf[String]
+      val newLeader = newLeaderAndEpochInfo.split(";").head.toInt
+      val newEpoch = newLeaderAndEpochInfo.split(";").last.toInt
+      debug("Leader change listener fired for path %s. New leader is %d. New epoch is %d".format(dataPath, newLeader, newEpoch))
+      val topicPartitionInfo = dataPath.split("/")
+      val topic = topicPartitionInfo.takeRight(4).head
+      val partition = topicPartitionInfo.takeRight(2).head.toInt
+      info("Updating leader change information in replica for topic %s partition %d".format(topic, partition))
+      val replica = getReplicaCbk(topic, partition).getOrElse(null)
+      assert(replica != null, "Replica for topic %s partition %d should exist on broker %d"
+        .format(topic, partition, config.brokerId))
+      replica.partition.leaderId(Some(newLeader))
+      assert(getReplicaCbk(topic, partition).get.partition.leaderId().get == newLeader, "New leader should be set correctly")
     }
 
     @throws(classOf[Exception])



Mime
View raw message