kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject git commit: KAFKA-1001; Handle follower transition in batch; patched by Guozhang Wang; reviewed by Jun Rao
Date Tue, 29 Oct 2013 20:11:30 GMT
Updated Branches:
  refs/heads/trunk 274b12f33 -> bd49e4f3e


KAFKA-1001; Handle follower transition in batch; patched by Guozhang Wang; reviewed by Jun
Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bd49e4f3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bd49e4f3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bd49e4f3

Branch: refs/heads/trunk
Commit: bd49e4f3e77c86a9cd93262e628143fff762d4ee
Parents: 274b12f
Author: Guozhang Wang <guwang@linkedin.com>
Authored: Tue Oct 29 13:11:17 2013 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Tue Oct 29 13:11:17 2013 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/cluster/Partition.scala    |  66 +----
 .../main/scala/kafka/common/ErrorMapping.scala  |   1 +
 .../scala/kafka/common/TopicAndPartition.scala  |   6 +
 .../kafka/consumer/ConsumerFetcherManager.scala |  34 +--
 .../kafka/consumer/ConsumerFetcherThread.scala  |   2 +-
 core/src/main/scala/kafka/log/LogManager.scala  |  16 ++
 .../kafka/server/AbstractFetcherManager.scala   |  48 ++--
 .../kafka/server/AbstractFetcherThread.scala    |  27 +-
 .../scala/kafka/server/ReplicaManager.scala     | 254 +++++++++++++------
 9 files changed, 275 insertions(+), 179 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bd49e4f3/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 5ccecd1..d8078bd 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -41,7 +41,6 @@ class Partition(val topic: String,
                 val replicaManager: ReplicaManager) extends Logging with KafkaMetricsGroup
{
   private val localBrokerId = replicaManager.config.brokerId
   private val logManager = replicaManager.logManager
-  private val replicaFetcherManager = replicaManager.replicaFetcherManager
   private val zkClient = replicaManager.zkClient
   var leaderReplicaIdOpt: Option[Int] = None
   var inSyncReplicas: Set[Replica] = Set.empty[Replica]
@@ -132,30 +131,23 @@ class Partition(val topic: String,
     assignedReplicaMap.values.toSet
   }
 
+  def getLeaderEpoch(): Int = {
+    leaderIsrUpdateLock synchronized {
+      return this.leaderEpoch
+    }
+  }
+
 
   /**
-   *  If the leaderEpoch of the incoming request is higher than locally cached epoch, make
the local replica the leader in the following steps.
-   *  1. stop the existing replica fetcher
-   *  2. create replicas in ISR if needed (the ISR expand/shrink logic needs replicas in
ISR to be available)
-   *  3. reset LogEndOffset for remote replicas (there could be old LogEndOffset from the
time when this broker was the leader last time)
-   *  4. set the new leader and ISR
+   * Make the local replica the leader by resetting LogEndOffset for remote replicas (there
could be old LogEndOffset from the time when this broker was the leader last time)
+   *  and setting the new leader and ISR
    */
-  def makeLeader(controllerId: Int, topic: String, partitionId: Int,
-                 leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, correlationId:
Int): Boolean = {
+  def makeLeader(controllerId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
correlationId: Int): Boolean = {
     leaderIsrUpdateLock synchronized {
       val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
-      if (leaderEpoch >= leaderAndIsr.leaderEpoch){
-        stateChangeLogger.trace(("Broker %d discarded the become-leader request with correlation
id %d from " +
-                                 "controller %d epoch %d for partition [%s,%d] since current
leader epoch %d is >= the request's leader epoch %d")
-                                   .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch,
topic,
-                                           partitionId, leaderEpoch, leaderAndIsr.leaderEpoch))
-        return false
-      }
       // record the epoch of the controller that made the leadership decision. This is useful
while updating the isr
       // to maintain the decision maker controller's epoch in the zookeeper path
       controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
-      // stop replica fetcher thread, if any
-      replicaFetcherManager.removeFetcher(topic, partitionId)
 
       val newInSyncReplicas = leaderAndIsr.isr.map(r => getOrCreateReplica(r)).toSet
       // reset LogEndOffset for remote replicas
@@ -171,52 +163,22 @@ class Partition(val topic: String,
   }
 
   /**
-   *  If the leaderEpoch of the incoming request is higher than locally cached epoch, make
the local replica the follower in the following steps.
-   *  1. stop any existing fetcher on this partition from the local replica
-   *  2. make sure local replica exists and truncate the log to high watermark
-   *  3. set the leader and set ISR to empty
-   *  4. start a fetcher to the new leader
+   *  Make the local replica the follower by setting the new leader and ISR to empty
    */
-  def makeFollower(controllerId: Int, topic: String, partitionId: Int, leaderIsrAndControllerEpoch:
LeaderIsrAndControllerEpoch,
-                   leaders: Set[Broker], correlationId: Int): Boolean = {
+  def makeFollower(controllerId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
leaders: Set[Broker], correlationId: Int): Boolean = {
     leaderIsrUpdateLock synchronized {
       val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
-      if (leaderEpoch >= leaderAndIsr.leaderEpoch) {
-        stateChangeLogger.trace(("Broker %d discarded the become-follower request with correlation
id %d from " +
-                                 "controller %d epoch %d for partition [%s,%d] since current
leader epoch %d is >= the request's leader epoch %d")
-                                   .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch,
topic,
-                                           partitionId, leaderEpoch, leaderAndIsr.leaderEpoch))
-        return false
-      }
-      // record the epoch of the controller that made the leadership decision. This is useful
while updating the isr
-      // to maintain the decision maker controller's epoch in the zookeeper path
-      controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
-      // make sure local replica exists. This reads the last check pointed high watermark
from disk. On startup, it is
-      // important to ensure that this operation happens for every single partition in a
leader and isr request, else
-      // some high watermark values could be overwritten with 0. This leads to replicas fetching
from the earliest offset
-      // on the leader
-      val localReplica = getOrCreateReplica()
       val newLeaderBrokerId: Int = leaderAndIsr.leader
       // TODO: Delete leaders from LeaderAndIsrRequest in 0.8.1
       leaders.find(_.id == newLeaderBrokerId) match {
         case Some(leaderBroker) =>
-          // stop fetcher thread to previous leader
-          replicaFetcherManager.removeFetcher(topic, partitionId)
-          localReplica.log.get.truncateTo(localReplica.highWatermark)
-          logManager.checkpointRecoveryPointOffsets()
+          // record the epoch of the controller that made the leadership decision. This is
useful while updating the isr
+          // to maintain the decision maker controller's epoch in the zookeeper path
+          controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
           inSyncReplicas = Set.empty[Replica]
           leaderEpoch = leaderAndIsr.leaderEpoch
           zkVersion = leaderAndIsr.zkVersion
           leaderReplicaIdOpt = Some(newLeaderBrokerId)
-          if (!replicaManager.isShuttingDown.get()) {
-            // start fetcher thread to current leader if we are not shutting down
-            replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset,
leaderBroker)
-          }
-          else {
-            stateChangeLogger.trace(("Broker %d ignored the become-follower state change
with correlation id %d from " +
-                                     "controller %d epoch %d since it is shutting down")
-                                      .format(localBrokerId, correlationId, controllerId,
leaderIsrAndControllerEpoch.controllerEpoch))
-          }
         case None => // we should not come here
           stateChangeLogger.error(("Broker %d aborted the become-follower state change with
correlation id %d from " +
                                    "controller %d epoch %d for partition [%s,%d] new leader
%d")

http://git-wip-us.apache.org/repos/asf/kafka/blob/bd49e4f3/core/src/main/scala/kafka/common/ErrorMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index 153bc0b..b0b5dce 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -42,6 +42,7 @@ object ErrorMapping {
   val MessageSizeTooLargeCode: Short = 10
   val StaleControllerEpochCode: Short = 11
   val OffsetMetadataTooLargeCode: Short = 12
+  val StaleLeaderEpochCode: Short = 13
 
   private val exceptionToCode = 
     Map[Class[Throwable], Short](

http://git-wip-us.apache.org/repos/asf/kafka/blob/bd49e4f3/core/src/main/scala/kafka/common/TopicAndPartition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/TopicAndPartition.scala b/core/src/main/scala/kafka/common/TopicAndPartition.scala
index 63596b7..df3db91 100644
--- a/core/src/main/scala/kafka/common/TopicAndPartition.scala
+++ b/core/src/main/scala/kafka/common/TopicAndPartition.scala
@@ -1,5 +1,7 @@
 package kafka.common
 
+import kafka.cluster.{Replica, Partition}
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -24,6 +26,10 @@ case class TopicAndPartition(topic: String, partition: Int) {
 
   def this(tuple: (String, Int)) = this(tuple._1, tuple._2)
 
+  def this(partition: Partition) = this(partition.topic, partition.partitionId)
+
+  def this(replica: Replica) = this(replica.topic, replica.partitionId)
+
   def asTuple = (topic, partition)
 
   override def toString = "[%s,%d]".format(topic, partition)

http://git-wip-us.apache.org/repos/asf/kafka/blob/bd49e4f3/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 566ca46..e4451bb 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -18,9 +18,10 @@
 package kafka.consumer
 
 import org.I0Itec.zkclient.ZkClient
-import kafka.server.{AbstractFetcherThread, AbstractFetcherManager}
+import kafka.server.{BrokerAndInitialOffset, AbstractFetcherThread, AbstractFetcherManager}
 import kafka.cluster.{Cluster, Broker}
 import scala.collection.immutable
+import scala.collection.Map
 import collection.mutable.HashMap
 import scala.collection.mutable
 import java.util.concurrent.locks.ReentrantLock
@@ -90,23 +91,22 @@ class ConsumerFetcherManager(private val consumerIdString: String,
         lock.unlock()
       }
 
-      leaderForPartitionsMap.foreach {
-        case(topicAndPartition, leaderBroker) =>
-          val pti = partitionMap(topicAndPartition)
-          try {
-            addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(),
leaderBroker)
-          } catch {
-            case t: Throwable => {
-                if (!isRunning.get())
-                  throw t /* If this thread is stopped, propagate this exception to kill
the thread. */
-                else {
-                  warn("Failed to add leader for partition %s; will retry".format(topicAndPartition),
t)
-                  lock.lock()
-                  noLeaderPartitionSet += topicAndPartition
-                  lock.unlock()
-                }
-              }
+      try {
+        addFetcherForPartitions(leaderForPartitionsMap.map{
+          case (topicAndPartition, broker) =>
+            topicAndPartition -> BrokerAndInitialOffset(broker, partitionMap(topicAndPartition).getFetchOffset())}
+        )
+      } catch {
+        case t: Throwable => {
+          if (!isRunning.get())
+            throw t /* If this thread is stopped, propagate this exception to kill the thread.
*/
+          else {
+            warn("Failed to add leader for partitions %s; will retry".format(leaderForPartitionsMap.keySet.mkString(",")),
t)
+            lock.lock()
+            noLeaderPartitionSet ++= leaderForPartitionsMap.keySet
+            lock.unlock()
           }
+        }
       }
 
       shutdownIdleFetcherThreads()

http://git-wip-us.apache.org/repos/asf/kafka/blob/bd49e4f3/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index dda0a8f..f8c1b4e 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -66,7 +66,7 @@ class ConsumerFetcherThread(name: String,
 
   // any logic for partitions whose leader has changed
   def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) {
-    partitions.foreach(tap => removePartition(tap.topic, tap.partition))
+    removePartitions(partitions.toSet)
     consumerFetcherManager.addPartitionsWithError(partitions)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bd49e4f3/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 4719715..d489e08 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -177,6 +177,22 @@ class LogManager(val logDirs: Array[File],
     }
     debug("Shutdown complete.")
   }
+
+  /**
+   * Truncate the partition logs to the specified offsets and checkpoint the recovery point
to this offset
+   *
+   * @param partitionAndOffsets Partition logs that need to be truncated
+   */
+  def truncateTo(partitionAndOffsets: Map[TopicAndPartition, Long]) {
+    for ((topicAndPartition, truncateOffset) <- partitionAndOffsets) {
+      val log = logs.get(topicAndPartition)
+      // If the log does not exist, skip it
+      if (log != null) {
+        log.truncateTo(truncateOffset)
+      }
+    }
+    checkpointRecoveryPointOffsets()
+  }
   
   /**
    * Write out the current recovery point for all logs to a text file in the log directory


http://git-wip-us.apache.org/repos/asf/kafka/blob/bd49e4f3/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index 15b7bd3..394e981 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -18,14 +18,17 @@
 package kafka.server
 
 import scala.collection.mutable
+import scala.collection.Set
+import scala.collection.Map
 import kafka.utils.Logging
 import kafka.cluster.Broker
 import kafka.metrics.KafkaMetricsGroup
+import kafka.common.TopicAndPartition
 import com.yammer.metrics.core.Gauge
 
 abstract class AbstractFetcherManager(protected val name: String, metricPrefix: String, numFetchers:
Int = 1)
   extends Logging with KafkaMetricsGroup {
-    // map of (source brokerid, fetcher Id per source broker) => fetcher
+  // map of (source broker_id, fetcher_id per source broker) => fetcher
   private val fetcherThreadMap = new mutable.HashMap[BrokerAndFetcherId, AbstractFetcherThread]
   private val mapLock = new Object
   this.logIdent = "[" + name + "] "
@@ -60,36 +63,43 @@ abstract class AbstractFetcherManager(protected val name: String, metricPrefix:
   )
 
   private def getFetcherId(topic: String, partitionId: Int) : Int = {
-    (topic.hashCode() + 31 * partitionId) % numFetchers
+    (31 * topic.hashCode() + partitionId) % numFetchers
   }
 
   // to be defined in subclass to create a specific fetcher
   def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread
 
-  def addFetcher(topic: String, partitionId: Int, initialOffset: Long, sourceBroker: Broker)
{
+  def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, BrokerAndInitialOffset])
{
     mapLock synchronized {
-      var fetcherThread: AbstractFetcherThread = null
-      val key = new BrokerAndFetcherId(sourceBroker, getFetcherId(topic, partitionId))
-      fetcherThreadMap.get(key) match {
-        case Some(f) => fetcherThread = f
-        case None =>
-          fetcherThread = createFetcherThread(key.fetcherId, sourceBroker)
-          fetcherThreadMap.put(key, fetcherThread)
-          fetcherThread.start
+      val partitionsPerFetcher = partitionAndOffsets.groupBy{ case(topicAndPartition, brokerAndInitialOffset)
=>
+        BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicAndPartition.topic,
topicAndPartition.partition))}
+      for ((brokerAndFetcherId, partitionAndOffsets) <- partitionsPerFetcher) {
+        var fetcherThread: AbstractFetcherThread = null
+        fetcherThreadMap.get(brokerAndFetcherId) match {
+          case Some(f) => fetcherThread = f
+          case None =>
+            fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)
+            fetcherThreadMap.put(brokerAndFetcherId, fetcherThread)
+            fetcherThread.start
+        }
+
+        fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { case
(topicAndPartition, brokerAndInitOffset) =>
+          topicAndPartition -> brokerAndInitOffset.initOffset
+        })
       }
-      fetcherThread.addPartition(topic, partitionId, initialOffset)
-      info("Adding fetcher for partition [%s,%d], initOffset %d to broker %d with fetcherId
%d"
-          .format(topic, partitionId, initialOffset, sourceBroker.id, key.fetcherId))
     }
+
+    info("Added fetcher for partitions %s".format(partitionAndOffsets.map{ case (topicAndPartition,
brokerAndInitialOffset) =>
+      "[" + topicAndPartition + ", initOffset " + brokerAndInitialOffset.initOffset + " to
broker " + brokerAndInitialOffset.broker + "] "}))
   }
 
-  def removeFetcher(topic: String, partitionId: Int) {
-    info("Removing fetcher for partition [%s,%d]".format(topic, partitionId))
+  def removeFetcherForPartitions(partitions: Set[TopicAndPartition]) {
     mapLock synchronized {
       for ((key, fetcher) <- fetcherThreadMap) {
-        fetcher.removePartition(topic, partitionId)
+        fetcher.removePartitions(partitions)
       }
     }
+    info("Removed fetcher for partitions %s".format(partitions.mkString(",")))
   }
 
   def shutdownIdleFetcherThreads() {
@@ -115,4 +125,6 @@ abstract class AbstractFetcherManager(protected val name: String, metricPrefix:
   }
 }
 
-case class BrokerAndFetcherId(broker: Broker, fetcherId: Int)
\ No newline at end of file
+case class BrokerAndFetcherId(broker: Broker, fetcherId: Int)
+
+case class BrokerAndInitialOffset(broker: Broker, initOffset: Long)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/bd49e4f3/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index c64260f..bb2dd90 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -19,17 +19,19 @@ package kafka.server
 
 import kafka.cluster.Broker
 import collection.mutable
+import scala.collection.Set
+import scala.collection.Map
 import kafka.message.{InvalidMessageException, ByteBufferMessageSet, MessageAndOffset}
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
-import java.util.concurrent.atomic.AtomicLong
 import kafka.utils.{Pool, ShutdownableThread}
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.locks.ReentrantLock
 import kafka.consumer.{PartitionTopicInfo, SimpleConsumer}
 import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData, FetchRequestBuilder}
 import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition, ErrorMapping}
 import kafka.utils.Utils.inLock
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.ReentrantLock
+import java.util.concurrent.atomic.AtomicLong
 
 
 /**
@@ -166,23 +168,26 @@ abstract class AbstractFetcherThread(name: String, clientId: String,
sourceBroke
     }
   }
 
-  def addPartition(topic: String, partitionId: Int, initialOffset: Long) {
+  def addPartitions(partitionAndOffsets: Map[TopicAndPartition, Long]) {
     partitionMapLock.lockInterruptibly()
     try {
-      val topicPartition = TopicAndPartition(topic, partitionId)
-      partitionMap.put(
-          topicPartition,
-          if (PartitionTopicInfo.isOffsetInvalid(initialOffset)) handleOffsetOutOfRange(topicPartition)
else initialOffset)
+      for ((topicAndPartition, offset) <- partitionAndOffsets) {
+        // If the partitionMap already has the topic/partition, then do not update the map
with the old offset
+        if (!partitionMap.contains(topicAndPartition))
+          partitionMap.put(
+            topicAndPartition,
+            if (PartitionTopicInfo.isOffsetInvalid(offset)) handleOffsetOutOfRange(topicAndPartition)
else offset)
+      }
       partitionMapCond.signalAll()
     } finally {
       partitionMapLock.unlock()
     }
   }
 
-  def removePartition(topic: String, partitionId: Int) {
+  def removePartitions(topicAndPartitions: Set[TopicAndPartition]) {
     partitionMapLock.lockInterruptibly()
     try {
-      partitionMap.remove(TopicAndPartition(topic, partitionId))
+      topicAndPartitions.foreach(tp => partitionMap.remove(tp))
     } finally {
       partitionMapLock.unlock()
     }
@@ -199,7 +204,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
 }
 
 class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMetricsGroup
{
-  private[this] var lagVal = new AtomicLong(-1L)
+  private[this] val lagVal = new AtomicLong(-1L)
   newGauge(
     metricId + "-ConsumerLag",
     new Gauge[Long] {

http://git-wip-us.apache.org/repos/asf/kafka/blob/bd49e4f3/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index ee1cc0c..7b8f89e 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -28,8 +28,8 @@ import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
 import java.util.concurrent.TimeUnit
 import kafka.common._
-import kafka.api.{StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest}
-import kafka.controller.KafkaController
+import kafka.api.{StopReplicaRequest, LeaderAndIsrRequest}
+import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController}
 import org.apache.log4j.Logger
 
 
@@ -50,6 +50,7 @@ class ReplicaManager(val config: KafkaConfig,
   private val allPartitions = new Pool[(String, Int), Partition]
   private var leaderPartitions = new mutable.HashSet[Partition]()
   private val leaderPartitionsLock = new Object
+  private val replicaStateChangeLock = new Object
   val replicaFetcherManager = new ReplicaFetcherManager(config, this)
   private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
   val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new OffsetCheckpoint(new
File(dir, ReplicaManager.HighWatermarkFilename)))).toMap
@@ -116,7 +117,6 @@ class ReplicaManager(val config: KafkaConfig,
     val errorCode = ErrorMapping.NoError
     getReplica(topic, partitionId) match {
       case Some(replica) =>
-        replicaFetcherManager.removeFetcher(topic, partitionId)
         /* TODO: handle deleteLog in a better way */
         //if (deletePartition)
         //  logManager.deleteLog(topic, partition)
@@ -132,20 +132,26 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[(String, Int), Short],
Short) = {
-    val responseMap = new collection.mutable.HashMap[(String, Int), Short]
-    if(stopReplicaRequest.controllerEpoch < controllerEpoch) {
-      stateChangeLogger.warn("Broker %d received stop replica request from an old controller
epoch %d."
-        .format(localBrokerId, stopReplicaRequest.controllerEpoch) +
-            " Latest known controller epoch is %d " + controllerEpoch)
-      (responseMap, ErrorMapping.StaleControllerEpochCode)
-    } else {
-      controllerEpoch = stopReplicaRequest.controllerEpoch
-      val responseMap = new HashMap[(String, Int), Short]
-      for((topic, partitionId) <- stopReplicaRequest.partitions){
-        val errorCode = stopReplica(topic, partitionId, stopReplicaRequest.deletePartitions)
-        responseMap.put((topic, partitionId), errorCode)
+    replicaStateChangeLock synchronized {
+      val responseMap = new collection.mutable.HashMap[(String, Int), Short]
+      if(stopReplicaRequest.controllerEpoch < controllerEpoch) {
+        stateChangeLogger.warn("Broker %d received stop replica request from an old controller
epoch %d."
+          .format(localBrokerId, stopReplicaRequest.controllerEpoch) +
+          " Latest known controller epoch is %d " + controllerEpoch)
+        (responseMap, ErrorMapping.StaleControllerEpochCode)
+      } else {
+        controllerEpoch = stopReplicaRequest.controllerEpoch
+        val responseMap = new HashMap[(String, Int), Short]
+        // First stop fetchers for all partitions, then stop the corresponding replicas
+        replicaFetcherManager.removeFetcherForPartitions(stopReplicaRequest.partitions.map{
+          case (topic, partition) => TopicAndPartition(topic, partition)
+        })
+        for((topic, partitionId) <- stopReplicaRequest.partitions){
+          val errorCode = stopReplica(topic, partitionId, stopReplicaRequest.deletePartitions)
+          responseMap.put((topic, partitionId), errorCode)
+        }
+        (responseMap, ErrorMapping.NoError)
       }
-      (responseMap, ErrorMapping.NoError)
     }
   }
 
@@ -198,88 +204,176 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): (collection.Map[(String,
Int), Short], Short) = {
-    leaderAndISRRequest.partitionStateInfos.foreach(p =>
+    leaderAndISRRequest.partitionStateInfos.foreach{ case ((topic, partition), stateInfo)
=>
       stateChangeLogger.trace("Broker %d handling LeaderAndIsr request correlation id %d
received from controller %d epoch %d for partition [%s,%d]"
                                 .format(localBrokerId, leaderAndISRRequest.correlationId,
leaderAndISRRequest.controllerId,
-                                        leaderAndISRRequest.controllerEpoch, p._1._1, p._1._2)))
+                                        leaderAndISRRequest.controllerEpoch, topic, partition))}
     info("Handling LeaderAndIsr request %s".format(leaderAndISRRequest))
 
-    val responseMap = new collection.mutable.HashMap[(String, Int), Short]
-    if(leaderAndISRRequest.controllerEpoch < controllerEpoch) {
-      stateChangeLogger.warn("Broker %d received LeaderAndIsr request correlation id %d with
an old controller epoch %d. Latest known controller epoch is %d"
-                                .format(localBrokerId, leaderAndISRRequest.controllerEpoch,
leaderAndISRRequest.correlationId, controllerEpoch))
-      (responseMap, ErrorMapping.StaleControllerEpochCode)
-    }else {
-      val controllerId = leaderAndISRRequest.controllerId
-      controllerEpoch = leaderAndISRRequest.controllerEpoch
-      for((topicAndPartition, partitionStateInfo) <- leaderAndISRRequest.partitionStateInfos)
{
-        var errorCode = ErrorMapping.NoError
-        val topic = topicAndPartition._1
-        val partitionId = topicAndPartition._2
-
-        val requestedLeaderId = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
-        try {
-          if(requestedLeaderId == config.brokerId)
-            makeLeader(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo,
leaderAndISRRequest.correlationId)
-          else
-            makeFollower(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo,
leaderAndISRRequest.leaders,
-                         leaderAndISRRequest.correlationId)
-        } catch {
-          case e: Throwable =>
-            val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId
%d received from controller %d " +
-                            "epoch %d for partition %s").format(localBrokerId, leaderAndISRRequest.correlationId,
leaderAndISRRequest.controllerId,
-                                                                leaderAndISRRequest.controllerEpoch,
topicAndPartition)
-            stateChangeLogger.error(errorMsg, e)
-            errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
+    replicaStateChangeLock synchronized {
+      val responseMap = new collection.mutable.HashMap[(String, Int), Short]
+      if(leaderAndISRRequest.controllerEpoch < controllerEpoch) {
+        stateChangeLogger.warn("Broker %d received LeaderAndIsr request correlation id %d
with an old controller epoch %d. Latest known controller epoch is %d"
+          .format(localBrokerId, leaderAndISRRequest.controllerEpoch, leaderAndISRRequest.correlationId,
controllerEpoch))
+        (responseMap, ErrorMapping.StaleControllerEpochCode)
+      } else {
+        val controllerId = leaderAndISRRequest.controllerId
+        val correlationId = leaderAndISRRequest.correlationId
+        controllerEpoch = leaderAndISRRequest.controllerEpoch
+
+        // First check partition's leader epoch
+        val partitionleaderIsrAndControllerEpoch = new HashMap[Partition, LeaderIsrAndControllerEpoch]()
+        leaderAndISRRequest.partitionStateInfos.foreach{ case ((topic, partitionId), partitionStateInfo)
=>
+          val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
+          val partitionLeaderEpoch = partition.getLeaderEpoch()
+          if (partitionLeaderEpoch < partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch)
{
+            // If the leader epoch is valid record the epoch of the controller that made
the leadership decision.
+            // This is useful while updating the isr to maintain the decision maker controller's
epoch in the zookeeper path
+            partitionleaderIsrAndControllerEpoch.put(partition, partitionStateInfo.leaderIsrAndControllerEpoch)
+          } else {
+            // Otherwise record the error code in response
+            stateChangeLogger.warn(("Broker %d received invalid LeaderAndIsr request with
correlation id %d from " +
+              "controller %d epoch %d with an older leader epoch %d for partition [%s,%d],
current leader epoch is %d")
+              .format(localBrokerId, correlationId, controllerId, partitionStateInfo.leaderIsrAndControllerEpoch.controllerEpoch,
+              partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, topic,
partition, partitionLeaderEpoch))
+            responseMap.put((topic, partitionId), ErrorMapping.StaleLeaderEpochCode)
+          }
         }
-        responseMap.put(topicAndPartition, errorCode)
-        stateChangeLogger.trace("Broker %d handled LeaderAndIsr request correlationId %d
received from controller %d epoch %d for partition [%s,%d]"
-          .format(localBrokerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId,
leaderAndISRRequest.controllerEpoch,
-          topicAndPartition._1, topicAndPartition._2))
-      }
-      info("Handled leader and isr request %s".format(leaderAndISRRequest))
-      // we initialize highwatermark thread after the first leaderisrrequest. This ensures
that all the partitions
-      // have been completely populated before starting the checkpointing there by avoiding
weird race conditions
-      if (!hwThreadInitialized) {
-        startHighWaterMarksCheckPointThread()
-        hwThreadInitialized = true
+
+        val partitionsTobeLeader = partitionleaderIsrAndControllerEpoch
+          .filter{ case (partition, leaderIsrAndControllerEpoch) => leaderIsrAndControllerEpoch.leaderAndIsr.leader
== config.brokerId}
+        val partitionsTobeFollower = (partitionleaderIsrAndControllerEpoch -- partitionsTobeLeader.keys)
+
+        if (!partitionsTobeLeader.isEmpty) makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader,
leaderAndISRRequest.correlationId, responseMap)
+        if (!partitionsTobeFollower.isEmpty) makeFollowers(controllerId, controllerEpoch,
partitionsTobeFollower, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap)
+
+        info("Handled leader and isr request %s".format(leaderAndISRRequest))
+        // we initialize highwatermark thread after the first leaderisrrequest. This ensures
that all the partitions
+        // have been completely populated before starting the checkpointing there by avoiding
weird race conditions
+        if (!hwThreadInitialized) {
+          startHighWaterMarksCheckPointThread()
+          hwThreadInitialized = true
+        }
+        replicaFetcherManager.shutdownIdleFetcherThreads()
+        (responseMap, ErrorMapping.NoError)
       }
-      replicaFetcherManager.shutdownIdleFetcherThreads()
-      (responseMap, ErrorMapping.NoError)
     }
   }
 
-  private def makeLeader(controllerId: Int, epoch:Int, topic: String, partitionId: Int,
-                         partitionStateInfo: PartitionStateInfo, correlationId: Int) = {
-    val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
+  /*
+   * Make the current broker to become follower for a given set of partitions by:
+   *
+   * 1. Stop fetchers for these partitions
+   * 2. Update the partition metadata in cache
+   * 3. Add these partitions to the leader partitions set
+   *
+   * If an unexpected error is thrown in this function, it will be propagated to KafkaApis
where
+   * the error message will be set on each partition since we do not know which partition
caused it
+   *  TODO: the above may need to be fixed later
+   */
+  private def makeLeaders(controllerId: Int, epoch: Int, partitionLeaderISRAndControllerEpochs:
Map[Partition, LeaderIsrAndControllerEpoch],
+                          correlationId: Int, responseMap: mutable.Map[(String, Int), Short])
= {
     stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from
controller %d epoch %d " +
-                             "starting the become-leader transition for partition [%s,%d]")
-                               .format(localBrokerId, correlationId, controllerId, epoch,
topic, partitionId))
-    val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
-    if (partition.makeLeader(controllerId, topic, partitionId, leaderIsrAndControllerEpoch,
correlationId)) {
-      // also add this partition to the list of partitions for which the leader is the current
broker
+      "starting the become-leader transition for partitions %s")
+      .format(localBrokerId, correlationId, controllerId, epoch,
+      partitionLeaderISRAndControllerEpochs.keySet.mkString(",")))
+
+    for (partition <- partitionLeaderISRAndControllerEpochs.keys)
+      responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError)
+
+    try {
+      // First stop fetchers for all the partitions
+      replicaFetcherManager.removeFetcherForPartitions(partitionLeaderISRAndControllerEpochs.keySet.map(new
TopicAndPartition(_)))
+      stateChangeLogger.trace("Broker %d stopped fetchers for partitions %s as per becoming-follower
request from controller %d epoch %d"
+        .format(localBrokerId, partitionLeaderISRAndControllerEpochs.keySet.mkString(","),
controllerId, correlationId))
+
+      // Update the partition information to be the leader
+      partitionLeaderISRAndControllerEpochs.foreach{ case (partition, leaderIsrAndControllerEpoch)
=> partition.makeLeader(controllerId, leaderIsrAndControllerEpoch, correlationId)}
+
+      // Finally add these partitions to the list of partitions for which the leader is the
current broker
       leaderPartitionsLock synchronized {
-        leaderPartitions += partition
-      } 
+        leaderPartitions ++= partitionLeaderISRAndControllerEpochs.keySet
+      }
+    } catch {
+      case e: Throwable =>
+        val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId
%d received from controller %d " +
+          "epoch %d").format(localBrokerId, correlationId, controllerId, epoch)
+        stateChangeLogger.error(errorMsg, e)
+        // Re-throw the exception for it to be caught in KafkaApis
+        throw e
     }
-    stateChangeLogger.trace("Broker %d completed become-leader transition for partition [%s,%d]".format(localBrokerId,
topic, partitionId))
+
+    stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from
controller %d epoch %d " +
+      "for the become-leader transition for partitions %s")
+      .format(localBrokerId, correlationId, controllerId, epoch, partitionLeaderISRAndControllerEpochs.keySet.mkString(",")))
   }
 
-  private def makeFollower(controllerId: Int, epoch: Int, topic: String, partitionId: Int,
-                           partitionStateInfo: PartitionStateInfo, leaders: Set[Broker],
correlationId: Int) {
-    val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
+  /*
+   * Make the current broker to become follower for a given set of partitions by:
+   *
+   * 1. Stop fetchers for these partitions
+   * 2. Truncate the log and checkpoint offsets for these partitions.
+   * 3. If the broker is not shutting down, add the fetcher to the new leaders
+   * 4. Update the partition metadata in cache
+   * 5. Remove these partitions from the leader partitions set
+   *
+   * The ordering of doing these steps make sure that the replicas in transition will not
+   * take any more messages before checkpointing offsets so that all messages before the
checkpoint
+   * are guaranteed to be flushed to disks
+   *
+   * If an unexpected error is thrown in this function, it will be propagated to KafkaApis
where
+   * the error message will be set on each partition since we do not know which partition
caused it
+   *  TODO: the above may need to be fixed later
+   */
+  private def makeFollowers(controllerId: Int, epoch: Int, partitionLeaderISRAndControllerEpochs:
Map[Partition, LeaderIsrAndControllerEpoch],
+                            leaders: Set[Broker], correlationId: Int, responseMap: mutable.Map[(String,
Int), Short]) {
     stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from
controller %d epoch %d " +
-                             "starting the become-follower transition for partition [%s,%d]")
-                               .format(localBrokerId, correlationId, controllerId, epoch,
topic, partitionId))
+      "starting the become-follower transition for partitions %s")
+      .format(localBrokerId, correlationId, controllerId, epoch,
+      partitionLeaderISRAndControllerEpochs.keySet.mkString(",")))
+
+    for (partition <- partitionLeaderISRAndControllerEpochs.keys)
+      responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError)
+
+    try {
+      replicaFetcherManager.removeFetcherForPartitions(partitionLeaderISRAndControllerEpochs.keySet.map(new
TopicAndPartition(_)))
+      stateChangeLogger.trace("Broker %d stopped fetchers for partitions %s as per becoming-follower
request from controller %d epoch %d"
+        .format(localBrokerId, partitionLeaderISRAndControllerEpochs.keySet.mkString(","),
controllerId, correlationId))
+
+      logManager.truncateTo(partitionLeaderISRAndControllerEpochs.map{ case(partition, leaderISRAndControllerEpoch)
=>
+        new TopicAndPartition(partition) -> partition.getOrCreateReplica().highWatermark
+      })
+      stateChangeLogger.trace("Broker %d truncated logs and checkpoint recovery boundaries
for partitions %s as per becoming-follower request from controller %d epoch %d"
+        .format(localBrokerId, partitionLeaderISRAndControllerEpochs.keySet.mkString(","),
controllerId, correlationId))
+
+      if (!isShuttingDown.get()) {
+        replicaFetcherManager.addFetcherForPartitions(partitionLeaderISRAndControllerEpochs.map{
case(partition, leaderISRAndControllerEpoch) =>
+          new TopicAndPartition(partition) -> BrokerAndInitialOffset(leaders.find(_.id
== leaderISRAndControllerEpoch.leaderAndIsr.leader).get, partition.getReplica().get.logEndOffset)}
+        )
+      }
+      else {
+        stateChangeLogger.trace(("Broker %d ignored the become-follower state change with
correlation id %d from " +
+          "controller %d epoch %d since it is shutting down")
+          .format(localBrokerId, correlationId, controllerId, epoch))
+      }
+
+      partitionLeaderISRAndControllerEpochs.foreach{ case (partition, leaderIsrAndControllerEpoch)
=> partition.makeFollower(controllerId, leaderIsrAndControllerEpoch, leaders, correlationId)}
 
-    val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
-    if (partition.makeFollower(controllerId, topic, partitionId, leaderIsrAndControllerEpoch,
leaders, correlationId)) {
-      // remove this replica's partition from the ISR expiration queue
       leaderPartitionsLock synchronized {
-        leaderPartitions -= partition
+        leaderPartitions --= partitionLeaderISRAndControllerEpochs.keySet
       }
+    } catch {
+      case e: Throwable =>
+        val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId
%d received from controller %d " +
+          "epoch %d").format(localBrokerId, correlationId, controllerId, epoch)
+        stateChangeLogger.error(errorMsg, e)
+        // Re-throw the exception for it to be caught in KafkaApis
+        throw e
     }
-    stateChangeLogger.trace("Broker %d completed the become-follower transition for partition
[%s,%d]".format(localBrokerId, topic, partitionId))
+
+    stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from
controller %d epoch %d " +
+      "for the become-follower transition for partitions %s")
+      .format(localBrokerId, correlationId, controllerId, epoch, partitionLeaderISRAndControllerEpochs.keySet.mkString(",")))
   }
 
   private def maybeShrinkIsr(): Unit = {
@@ -307,7 +401,7 @@ class ReplicaManager(val config: KafkaConfig,
     val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect{case Some(replica)
=> replica}
     val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParent)
     for((dir, reps) <- replicasByDir) {
-      val hwms = reps.map(r => (TopicAndPartition(r.topic, r.partitionId) -> r.highWatermark)).toMap
+      val hwms = reps.map(r => (new TopicAndPartition(r) -> r.highWatermark)).toMap
       try {
         highWatermarkCheckpoints(dir).write(hwms)
       } catch {


Mime
View raw message