Updated Branches:
refs/heads/trunk 274b12f33 -> bd49e4f3e
KAFKA-1001; Handle follower transition in batch; patched by Guozhang Wang; reviewed by Jun
Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bd49e4f3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bd49e4f3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bd49e4f3
Branch: refs/heads/trunk
Commit: bd49e4f3e77c86a9cd93262e628143fff762d4ee
Parents: 274b12f
Author: Guozhang Wang <guwang@linkedin.com>
Authored: Tue Oct 29 13:11:17 2013 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Tue Oct 29 13:11:17 2013 -0700
----------------------------------------------------------------------
.../main/scala/kafka/cluster/Partition.scala | 66 +----
.../main/scala/kafka/common/ErrorMapping.scala | 1 +
.../scala/kafka/common/TopicAndPartition.scala | 6 +
.../kafka/consumer/ConsumerFetcherManager.scala | 34 +--
.../kafka/consumer/ConsumerFetcherThread.scala | 2 +-
core/src/main/scala/kafka/log/LogManager.scala | 16 ++
.../kafka/server/AbstractFetcherManager.scala | 48 ++--
.../kafka/server/AbstractFetcherThread.scala | 27 +-
.../scala/kafka/server/ReplicaManager.scala | 254 +++++++++++++------
9 files changed, 275 insertions(+), 179 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd49e4f3/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 5ccecd1..d8078bd 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -41,7 +41,6 @@ class Partition(val topic: String,
val replicaManager: ReplicaManager) extends Logging with KafkaMetricsGroup
{
private val localBrokerId = replicaManager.config.brokerId
private val logManager = replicaManager.logManager
- private val replicaFetcherManager = replicaManager.replicaFetcherManager
private val zkClient = replicaManager.zkClient
var leaderReplicaIdOpt: Option[Int] = None
var inSyncReplicas: Set[Replica] = Set.empty[Replica]
@@ -132,30 +131,23 @@ class Partition(val topic: String,
assignedReplicaMap.values.toSet
}
+ def getLeaderEpoch(): Int = {
+ leaderIsrUpdateLock synchronized {
+ return this.leaderEpoch
+ }
+ }
+
/**
- * If the leaderEpoch of the incoming request is higher than locally cached epoch, make
the local replica the leader in the following steps.
- * 1. stop the existing replica fetcher
- * 2. create replicas in ISR if needed (the ISR expand/shrink logic needs replicas in
ISR to be available)
- * 3. reset LogEndOffset for remote replicas (there could be old LogEndOffset from the
time when this broker was the leader last time)
- * 4. set the new leader and ISR
+ * Make the local replica the leader by resetting LogEndOffset for remote replicas (there
could be old LogEndOffset from the time when this broker was the leader last time)
+ * and setting the new leader and ISR
*/
- def makeLeader(controllerId: Int, topic: String, partitionId: Int,
- leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, correlationId:
Int): Boolean = {
+ def makeLeader(controllerId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
correlationId: Int): Boolean = {
leaderIsrUpdateLock synchronized {
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
- if (leaderEpoch >= leaderAndIsr.leaderEpoch){
- stateChangeLogger.trace(("Broker %d discarded the become-leader request with correlation
id %d from " +
- "controller %d epoch %d for partition [%s,%d] since current
leader epoch %d is >= the request's leader epoch %d")
- .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch,
topic,
- partitionId, leaderEpoch, leaderAndIsr.leaderEpoch))
- return false
- }
// record the epoch of the controller that made the leadership decision. This is useful
while updating the isr
// to maintain the decision maker controller's epoch in the zookeeper path
controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
- // stop replica fetcher thread, if any
- replicaFetcherManager.removeFetcher(topic, partitionId)
val newInSyncReplicas = leaderAndIsr.isr.map(r => getOrCreateReplica(r)).toSet
// reset LogEndOffset for remote replicas
@@ -171,52 +163,22 @@ class Partition(val topic: String,
}
/**
- * If the leaderEpoch of the incoming request is higher than locally cached epoch, make
the local replica the follower in the following steps.
- * 1. stop any existing fetcher on this partition from the local replica
- * 2. make sure local replica exists and truncate the log to high watermark
- * 3. set the leader and set ISR to empty
- * 4. start a fetcher to the new leader
+ * Make the local replica the follower by setting the new leader and ISR to empty
*/
- def makeFollower(controllerId: Int, topic: String, partitionId: Int, leaderIsrAndControllerEpoch:
LeaderIsrAndControllerEpoch,
- leaders: Set[Broker], correlationId: Int): Boolean = {
+ def makeFollower(controllerId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
leaders: Set[Broker], correlationId: Int): Boolean = {
leaderIsrUpdateLock synchronized {
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
- if (leaderEpoch >= leaderAndIsr.leaderEpoch) {
- stateChangeLogger.trace(("Broker %d discarded the become-follower request with correlation
id %d from " +
- "controller %d epoch %d for partition [%s,%d] since current
leader epoch %d is >= the request's leader epoch %d")
- .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch,
topic,
- partitionId, leaderEpoch, leaderAndIsr.leaderEpoch))
- return false
- }
- // record the epoch of the controller that made the leadership decision. This is useful
while updating the isr
- // to maintain the decision maker controller's epoch in the zookeeper path
- controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
- // make sure local replica exists. This reads the last check pointed high watermark
from disk. On startup, it is
- // important to ensure that this operation happens for every single partition in a
leader and isr request, else
- // some high watermark values could be overwritten with 0. This leads to replicas fetching
from the earliest offset
- // on the leader
- val localReplica = getOrCreateReplica()
val newLeaderBrokerId: Int = leaderAndIsr.leader
// TODO: Delete leaders from LeaderAndIsrRequest in 0.8.1
leaders.find(_.id == newLeaderBrokerId) match {
case Some(leaderBroker) =>
- // stop fetcher thread to previous leader
- replicaFetcherManager.removeFetcher(topic, partitionId)
- localReplica.log.get.truncateTo(localReplica.highWatermark)
- logManager.checkpointRecoveryPointOffsets()
+ // record the epoch of the controller that made the leadership decision. This is
useful while updating the isr
+ // to maintain the decision maker controller's epoch in the zookeeper path
+ controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
inSyncReplicas = Set.empty[Replica]
leaderEpoch = leaderAndIsr.leaderEpoch
zkVersion = leaderAndIsr.zkVersion
leaderReplicaIdOpt = Some(newLeaderBrokerId)
- if (!replicaManager.isShuttingDown.get()) {
- // start fetcher thread to current leader if we are not shutting down
- replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset,
leaderBroker)
- }
- else {
- stateChangeLogger.trace(("Broker %d ignored the become-follower state change
with correlation id %d from " +
- "controller %d epoch %d since it is shutting down")
- .format(localBrokerId, correlationId, controllerId,
leaderIsrAndControllerEpoch.controllerEpoch))
- }
case None => // we should not come here
stateChangeLogger.error(("Broker %d aborted the become-follower state change with
correlation id %d from " +
"controller %d epoch %d for partition [%s,%d] new leader
%d")
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd49e4f3/core/src/main/scala/kafka/common/ErrorMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index 153bc0b..b0b5dce 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -42,6 +42,7 @@ object ErrorMapping {
val MessageSizeTooLargeCode: Short = 10
val StaleControllerEpochCode: Short = 11
val OffsetMetadataTooLargeCode: Short = 12
+ val StaleLeaderEpochCode: Short = 13
private val exceptionToCode =
Map[Class[Throwable], Short](
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd49e4f3/core/src/main/scala/kafka/common/TopicAndPartition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/TopicAndPartition.scala b/core/src/main/scala/kafka/common/TopicAndPartition.scala
index 63596b7..df3db91 100644
--- a/core/src/main/scala/kafka/common/TopicAndPartition.scala
+++ b/core/src/main/scala/kafka/common/TopicAndPartition.scala
@@ -1,5 +1,7 @@
package kafka.common
+import kafka.cluster.{Replica, Partition}
+
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -24,6 +26,10 @@ case class TopicAndPartition(topic: String, partition: Int) {
def this(tuple: (String, Int)) = this(tuple._1, tuple._2)
+ def this(partition: Partition) = this(partition.topic, partition.partitionId)
+
+ def this(replica: Replica) = this(replica.topic, replica.partitionId)
+
def asTuple = (topic, partition)
override def toString = "[%s,%d]".format(topic, partition)
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd49e4f3/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 566ca46..e4451bb 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -18,9 +18,10 @@
package kafka.consumer
import org.I0Itec.zkclient.ZkClient
-import kafka.server.{AbstractFetcherThread, AbstractFetcherManager}
+import kafka.server.{BrokerAndInitialOffset, AbstractFetcherThread, AbstractFetcherManager}
import kafka.cluster.{Cluster, Broker}
import scala.collection.immutable
+import scala.collection.Map
import collection.mutable.HashMap
import scala.collection.mutable
import java.util.concurrent.locks.ReentrantLock
@@ -90,23 +91,22 @@ class ConsumerFetcherManager(private val consumerIdString: String,
lock.unlock()
}
- leaderForPartitionsMap.foreach {
- case(topicAndPartition, leaderBroker) =>
- val pti = partitionMap(topicAndPartition)
- try {
- addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(),
leaderBroker)
- } catch {
- case t: Throwable => {
- if (!isRunning.get())
- throw t /* If this thread is stopped, propagate this exception to kill
the thread. */
- else {
- warn("Failed to add leader for partition %s; will retry".format(topicAndPartition),
t)
- lock.lock()
- noLeaderPartitionSet += topicAndPartition
- lock.unlock()
- }
- }
+ try {
+ addFetcherForPartitions(leaderForPartitionsMap.map{
+ case (topicAndPartition, broker) =>
+ topicAndPartition -> BrokerAndInitialOffset(broker, partitionMap(topicAndPartition).getFetchOffset())}
+ )
+ } catch {
+ case t: Throwable => {
+ if (!isRunning.get())
+ throw t /* If this thread is stopped, propagate this exception to kill the thread.
*/
+ else {
+ warn("Failed to add leader for partitions %s; will retry".format(leaderForPartitionsMap.keySet.mkString(",")),
t)
+ lock.lock()
+ noLeaderPartitionSet ++= leaderForPartitionsMap.keySet
+ lock.unlock()
}
+ }
}
shutdownIdleFetcherThreads()
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd49e4f3/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index dda0a8f..f8c1b4e 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -66,7 +66,7 @@ class ConsumerFetcherThread(name: String,
// any logic for partitions whose leader has changed
def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) {
- partitions.foreach(tap => removePartition(tap.topic, tap.partition))
+ removePartitions(partitions.toSet)
consumerFetcherManager.addPartitionsWithError(partitions)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd49e4f3/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 4719715..d489e08 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -177,6 +177,22 @@ class LogManager(val logDirs: Array[File],
}
debug("Shutdown complete.")
}
+
+ /**
+ * Truncate the partition logs to the specified offsets and checkpoint the recovery point
to this offset
+ *
+ * @param partitionAndOffsets Partition logs that need to be truncated
+ */
+ def truncateTo(partitionAndOffsets: Map[TopicAndPartition, Long]) {
+ for ((topicAndPartition, truncateOffset) <- partitionAndOffsets) {
+ val log = logs.get(topicAndPartition)
+ // If the log does not exist, skip it
+ if (log != null) {
+ log.truncateTo(truncateOffset)
+ }
+ }
+ checkpointRecoveryPointOffsets()
+ }
/**
* Write out the current recovery point for all logs to a text file in the log directory
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd49e4f3/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index 15b7bd3..394e981 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -18,14 +18,17 @@
package kafka.server
import scala.collection.mutable
+import scala.collection.Set
+import scala.collection.Map
import kafka.utils.Logging
import kafka.cluster.Broker
import kafka.metrics.KafkaMetricsGroup
+import kafka.common.TopicAndPartition
import com.yammer.metrics.core.Gauge
abstract class AbstractFetcherManager(protected val name: String, metricPrefix: String, numFetchers:
Int = 1)
extends Logging with KafkaMetricsGroup {
- // map of (source brokerid, fetcher Id per source broker) => fetcher
+ // map of (source broker_id, fetcher_id per source broker) => fetcher
private val fetcherThreadMap = new mutable.HashMap[BrokerAndFetcherId, AbstractFetcherThread]
private val mapLock = new Object
this.logIdent = "[" + name + "] "
@@ -60,36 +63,43 @@ abstract class AbstractFetcherManager(protected val name: String, metricPrefix:
)
private def getFetcherId(topic: String, partitionId: Int) : Int = {
- (topic.hashCode() + 31 * partitionId) % numFetchers
+ (31 * topic.hashCode() + partitionId) % numFetchers
}
// to be defined in subclass to create a specific fetcher
def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread
- def addFetcher(topic: String, partitionId: Int, initialOffset: Long, sourceBroker: Broker)
{
+ def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, BrokerAndInitialOffset])
{
mapLock synchronized {
- var fetcherThread: AbstractFetcherThread = null
- val key = new BrokerAndFetcherId(sourceBroker, getFetcherId(topic, partitionId))
- fetcherThreadMap.get(key) match {
- case Some(f) => fetcherThread = f
- case None =>
- fetcherThread = createFetcherThread(key.fetcherId, sourceBroker)
- fetcherThreadMap.put(key, fetcherThread)
- fetcherThread.start
+ val partitionsPerFetcher = partitionAndOffsets.groupBy{ case(topicAndPartition, brokerAndInitialOffset)
=>
+ BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicAndPartition.topic,
topicAndPartition.partition))}
+ for ((brokerAndFetcherId, partitionAndOffsets) <- partitionsPerFetcher) {
+ var fetcherThread: AbstractFetcherThread = null
+ fetcherThreadMap.get(brokerAndFetcherId) match {
+ case Some(f) => fetcherThread = f
+ case None =>
+ fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)
+ fetcherThreadMap.put(brokerAndFetcherId, fetcherThread)
+ fetcherThread.start
+ }
+
+ fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { case
(topicAndPartition, brokerAndInitOffset) =>
+ topicAndPartition -> brokerAndInitOffset.initOffset
+ })
}
- fetcherThread.addPartition(topic, partitionId, initialOffset)
- info("Adding fetcher for partition [%s,%d], initOffset %d to broker %d with fetcherId
%d"
- .format(topic, partitionId, initialOffset, sourceBroker.id, key.fetcherId))
}
+
+ info("Added fetcher for partitions %s".format(partitionAndOffsets.map{ case (topicAndPartition,
brokerAndInitialOffset) =>
+ "[" + topicAndPartition + ", initOffset " + brokerAndInitialOffset.initOffset + " to
broker " + brokerAndInitialOffset.broker + "] "}))
}
- def removeFetcher(topic: String, partitionId: Int) {
- info("Removing fetcher for partition [%s,%d]".format(topic, partitionId))
+ def removeFetcherForPartitions(partitions: Set[TopicAndPartition]) {
mapLock synchronized {
for ((key, fetcher) <- fetcherThreadMap) {
- fetcher.removePartition(topic, partitionId)
+ fetcher.removePartitions(partitions)
}
}
+ info("Removed fetcher for partitions %s".format(partitions.mkString(",")))
}
def shutdownIdleFetcherThreads() {
@@ -115,4 +125,6 @@ abstract class AbstractFetcherManager(protected val name: String, metricPrefix:
}
}
-case class BrokerAndFetcherId(broker: Broker, fetcherId: Int)
\ No newline at end of file
+case class BrokerAndFetcherId(broker: Broker, fetcherId: Int)
+
+case class BrokerAndInitialOffset(broker: Broker, initOffset: Long)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd49e4f3/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index c64260f..bb2dd90 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -19,17 +19,19 @@ package kafka.server
import kafka.cluster.Broker
import collection.mutable
+import scala.collection.Set
+import scala.collection.Map
import kafka.message.{InvalidMessageException, ByteBufferMessageSet, MessageAndOffset}
import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
-import java.util.concurrent.atomic.AtomicLong
import kafka.utils.{Pool, ShutdownableThread}
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.locks.ReentrantLock
import kafka.consumer.{PartitionTopicInfo, SimpleConsumer}
import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData, FetchRequestBuilder}
import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition, ErrorMapping}
import kafka.utils.Utils.inLock
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.ReentrantLock
+import java.util.concurrent.atomic.AtomicLong
/**
@@ -166,23 +168,26 @@ abstract class AbstractFetcherThread(name: String, clientId: String,
sourceBroke
}
}
- def addPartition(topic: String, partitionId: Int, initialOffset: Long) {
+ def addPartitions(partitionAndOffsets: Map[TopicAndPartition, Long]) {
partitionMapLock.lockInterruptibly()
try {
- val topicPartition = TopicAndPartition(topic, partitionId)
- partitionMap.put(
- topicPartition,
- if (PartitionTopicInfo.isOffsetInvalid(initialOffset)) handleOffsetOutOfRange(topicPartition)
else initialOffset)
+ for ((topicAndPartition, offset) <- partitionAndOffsets) {
+ // If the partitionMap already has the topic/partition, then do not update the map
with the old offset
+ if (!partitionMap.contains(topicAndPartition))
+ partitionMap.put(
+ topicAndPartition,
+ if (PartitionTopicInfo.isOffsetInvalid(offset)) handleOffsetOutOfRange(topicAndPartition)
else offset)
+ }
partitionMapCond.signalAll()
} finally {
partitionMapLock.unlock()
}
}
- def removePartition(topic: String, partitionId: Int) {
+ def removePartitions(topicAndPartitions: Set[TopicAndPartition]) {
partitionMapLock.lockInterruptibly()
try {
- partitionMap.remove(TopicAndPartition(topic, partitionId))
+ topicAndPartitions.foreach(tp => partitionMap.remove(tp))
} finally {
partitionMapLock.unlock()
}
@@ -199,7 +204,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
}
class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMetricsGroup
{
- private[this] var lagVal = new AtomicLong(-1L)
+ private[this] val lagVal = new AtomicLong(-1L)
newGauge(
metricId + "-ConsumerLag",
new Gauge[Long] {
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd49e4f3/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index ee1cc0c..7b8f89e 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -28,8 +28,8 @@ import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
import java.util.concurrent.TimeUnit
import kafka.common._
-import kafka.api.{StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest}
-import kafka.controller.KafkaController
+import kafka.api.{StopReplicaRequest, LeaderAndIsrRequest}
+import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController}
import org.apache.log4j.Logger
@@ -50,6 +50,7 @@ class ReplicaManager(val config: KafkaConfig,
private val allPartitions = new Pool[(String, Int), Partition]
private var leaderPartitions = new mutable.HashSet[Partition]()
private val leaderPartitionsLock = new Object
+ private val replicaStateChangeLock = new Object
val replicaFetcherManager = new ReplicaFetcherManager(config, this)
private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new OffsetCheckpoint(new
File(dir, ReplicaManager.HighWatermarkFilename)))).toMap
@@ -116,7 +117,6 @@ class ReplicaManager(val config: KafkaConfig,
val errorCode = ErrorMapping.NoError
getReplica(topic, partitionId) match {
case Some(replica) =>
- replicaFetcherManager.removeFetcher(topic, partitionId)
/* TODO: handle deleteLog in a better way */
//if (deletePartition)
// logManager.deleteLog(topic, partition)
@@ -132,20 +132,26 @@ class ReplicaManager(val config: KafkaConfig,
}
def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[(String, Int), Short],
Short) = {
- val responseMap = new collection.mutable.HashMap[(String, Int), Short]
- if(stopReplicaRequest.controllerEpoch < controllerEpoch) {
- stateChangeLogger.warn("Broker %d received stop replica request from an old controller
epoch %d."
- .format(localBrokerId, stopReplicaRequest.controllerEpoch) +
- " Latest known controller epoch is %d " + controllerEpoch)
- (responseMap, ErrorMapping.StaleControllerEpochCode)
- } else {
- controllerEpoch = stopReplicaRequest.controllerEpoch
- val responseMap = new HashMap[(String, Int), Short]
- for((topic, partitionId) <- stopReplicaRequest.partitions){
- val errorCode = stopReplica(topic, partitionId, stopReplicaRequest.deletePartitions)
- responseMap.put((topic, partitionId), errorCode)
+ replicaStateChangeLock synchronized {
+ val responseMap = new collection.mutable.HashMap[(String, Int), Short]
+ if(stopReplicaRequest.controllerEpoch < controllerEpoch) {
+ stateChangeLogger.warn("Broker %d received stop replica request from an old controller
epoch %d."
+ .format(localBrokerId, stopReplicaRequest.controllerEpoch) +
+ " Latest known controller epoch is %d " + controllerEpoch)
+ (responseMap, ErrorMapping.StaleControllerEpochCode)
+ } else {
+ controllerEpoch = stopReplicaRequest.controllerEpoch
+ val responseMap = new HashMap[(String, Int), Short]
+ // First stop fetchers for all partitions, then stop the corresponding replicas
+ replicaFetcherManager.removeFetcherForPartitions(stopReplicaRequest.partitions.map{
+ case (topic, partition) => TopicAndPartition(topic, partition)
+ })
+ for((topic, partitionId) <- stopReplicaRequest.partitions){
+ val errorCode = stopReplica(topic, partitionId, stopReplicaRequest.deletePartitions)
+ responseMap.put((topic, partitionId), errorCode)
+ }
+ (responseMap, ErrorMapping.NoError)
}
- (responseMap, ErrorMapping.NoError)
}
}
@@ -198,88 +204,176 @@ class ReplicaManager(val config: KafkaConfig,
}
def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): (collection.Map[(String,
Int), Short], Short) = {
- leaderAndISRRequest.partitionStateInfos.foreach(p =>
+ leaderAndISRRequest.partitionStateInfos.foreach{ case ((topic, partition), stateInfo)
=>
stateChangeLogger.trace("Broker %d handling LeaderAndIsr request correlation id %d
received from controller %d epoch %d for partition [%s,%d]"
.format(localBrokerId, leaderAndISRRequest.correlationId,
leaderAndISRRequest.controllerId,
- leaderAndISRRequest.controllerEpoch, p._1._1, p._1._2)))
+ leaderAndISRRequest.controllerEpoch, topic, partition))}
info("Handling LeaderAndIsr request %s".format(leaderAndISRRequest))
- val responseMap = new collection.mutable.HashMap[(String, Int), Short]
- if(leaderAndISRRequest.controllerEpoch < controllerEpoch) {
- stateChangeLogger.warn("Broker %d received LeaderAndIsr request correlation id %d with
an old controller epoch %d. Latest known controller epoch is %d"
- .format(localBrokerId, leaderAndISRRequest.controllerEpoch,
leaderAndISRRequest.correlationId, controllerEpoch))
- (responseMap, ErrorMapping.StaleControllerEpochCode)
- }else {
- val controllerId = leaderAndISRRequest.controllerId
- controllerEpoch = leaderAndISRRequest.controllerEpoch
- for((topicAndPartition, partitionStateInfo) <- leaderAndISRRequest.partitionStateInfos)
{
- var errorCode = ErrorMapping.NoError
- val topic = topicAndPartition._1
- val partitionId = topicAndPartition._2
-
- val requestedLeaderId = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
- try {
- if(requestedLeaderId == config.brokerId)
- makeLeader(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo,
leaderAndISRRequest.correlationId)
- else
- makeFollower(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo,
leaderAndISRRequest.leaders,
- leaderAndISRRequest.correlationId)
- } catch {
- case e: Throwable =>
- val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId
%d received from controller %d " +
- "epoch %d for partition %s").format(localBrokerId, leaderAndISRRequest.correlationId,
leaderAndISRRequest.controllerId,
- leaderAndISRRequest.controllerEpoch,
topicAndPartition)
- stateChangeLogger.error(errorMsg, e)
- errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
+ replicaStateChangeLock synchronized {
+ val responseMap = new collection.mutable.HashMap[(String, Int), Short]
+ if(leaderAndISRRequest.controllerEpoch < controllerEpoch) {
+ stateChangeLogger.warn("Broker %d received LeaderAndIsr request correlation id %d
with an old controller epoch %d. Latest known controller epoch is %d"
+ .format(localBrokerId, leaderAndISRRequest.controllerEpoch, leaderAndISRRequest.correlationId,
controllerEpoch))
+ (responseMap, ErrorMapping.StaleControllerEpochCode)
+ } else {
+ val controllerId = leaderAndISRRequest.controllerId
+ val correlationId = leaderAndISRRequest.correlationId
+ controllerEpoch = leaderAndISRRequest.controllerEpoch
+
+ // First check partition's leader epoch
+ val partitionleaderIsrAndControllerEpoch = new HashMap[Partition, LeaderIsrAndControllerEpoch]()
+ leaderAndISRRequest.partitionStateInfos.foreach{ case ((topic, partitionId), partitionStateInfo)
=>
+ val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
+ val partitionLeaderEpoch = partition.getLeaderEpoch()
+ if (partitionLeaderEpoch < partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch)
{
+ // If the leader epoch is valid record the epoch of the controller that made
the leadership decision.
+ // This is useful while updating the isr to maintain the decision maker controller's
epoch in the zookeeper path
+ partitionleaderIsrAndControllerEpoch.put(partition, partitionStateInfo.leaderIsrAndControllerEpoch)
+ } else {
+ // Otherwise record the error code in response
+ stateChangeLogger.warn(("Broker %d received invalid LeaderAndIsr request with
correlation id %d from " +
+ "controller %d epoch %d with an older leader epoch %d for partition [%s,%d],
current leader epoch is %d")
+ .format(localBrokerId, correlationId, controllerId, partitionStateInfo.leaderIsrAndControllerEpoch.controllerEpoch,
+ partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, topic,
partition, partitionLeaderEpoch))
+ responseMap.put((topic, partitionId), ErrorMapping.StaleLeaderEpochCode)
+ }
}
- responseMap.put(topicAndPartition, errorCode)
- stateChangeLogger.trace("Broker %d handled LeaderAndIsr request correlationId %d
received from controller %d epoch %d for partition [%s,%d]"
- .format(localBrokerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId,
leaderAndISRRequest.controllerEpoch,
- topicAndPartition._1, topicAndPartition._2))
- }
- info("Handled leader and isr request %s".format(leaderAndISRRequest))
- // we initialize highwatermark thread after the first leaderisrrequest. This ensures
that all the partitions
- // have been completely populated before starting the checkpointing there by avoiding
weird race conditions
- if (!hwThreadInitialized) {
- startHighWaterMarksCheckPointThread()
- hwThreadInitialized = true
+
+ val partitionsTobeLeader = partitionleaderIsrAndControllerEpoch
+ .filter{ case (partition, leaderIsrAndControllerEpoch) => leaderIsrAndControllerEpoch.leaderAndIsr.leader
== config.brokerId}
+ val partitionsTobeFollower = (partitionleaderIsrAndControllerEpoch -- partitionsTobeLeader.keys)
+
+ if (!partitionsTobeLeader.isEmpty) makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader,
leaderAndISRRequest.correlationId, responseMap)
+ if (!partitionsTobeFollower.isEmpty) makeFollowers(controllerId, controllerEpoch,
partitionsTobeFollower, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap)
+
+ info("Handled leader and isr request %s".format(leaderAndISRRequest))
+ // we initialize highwatermark thread after the first leaderisrrequest. This ensures
that all the partitions
+ // have been completely populated before starting the checkpointing there by avoiding
weird race conditions
+ if (!hwThreadInitialized) {
+ startHighWaterMarksCheckPointThread()
+ hwThreadInitialized = true
+ }
+ replicaFetcherManager.shutdownIdleFetcherThreads()
+ (responseMap, ErrorMapping.NoError)
}
- replicaFetcherManager.shutdownIdleFetcherThreads()
- (responseMap, ErrorMapping.NoError)
}
}
- private def makeLeader(controllerId: Int, epoch:Int, topic: String, partitionId: Int,
- partitionStateInfo: PartitionStateInfo, correlationId: Int) = {
- val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
+ /*
+ * Make the current broker to become follower for a given set of partitions by:
+ *
+ * 1. Stop fetchers for these partitions
+ * 2. Update the partition metadata in cache
+ * 3. Add these partitions to the leader partitions set
+ *
+ * If an unexpected error is thrown in this function, it will be propagated to KafkaApis
where
+ * the error message will be set on each partition since we do not know which partition
caused it
+ * TODO: the above may need to be fixed later
+ */
+ private def makeLeaders(controllerId: Int, epoch: Int, partitionLeaderISRAndControllerEpochs:
Map[Partition, LeaderIsrAndControllerEpoch],
+ correlationId: Int, responseMap: mutable.Map[(String, Int), Short])
= {
stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from
controller %d epoch %d " +
- "starting the become-leader transition for partition [%s,%d]")
- .format(localBrokerId, correlationId, controllerId, epoch,
topic, partitionId))
- val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
- if (partition.makeLeader(controllerId, topic, partitionId, leaderIsrAndControllerEpoch,
correlationId)) {
- // also add this partition to the list of partitions for which the leader is the current
broker
+ "starting the become-leader transition for partitions %s")
+ .format(localBrokerId, correlationId, controllerId, epoch,
+ partitionLeaderISRAndControllerEpochs.keySet.mkString(",")))
+
+ for (partition <- partitionLeaderISRAndControllerEpochs.keys)
+ responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError)
+
+ try {
+ // First stop fetchers for all the partitions
+ replicaFetcherManager.removeFetcherForPartitions(partitionLeaderISRAndControllerEpochs.keySet.map(new
TopicAndPartition(_)))
+ stateChangeLogger.trace("Broker %d stopped fetchers for partitions %s as per becoming-follower
request from controller %d epoch %d"
+ .format(localBrokerId, partitionLeaderISRAndControllerEpochs.keySet.mkString(","),
controllerId, correlationId))
+
+ // Update the partition information to be the leader
+ partitionLeaderISRAndControllerEpochs.foreach{ case (partition, leaderIsrAndControllerEpoch)
=> partition.makeLeader(controllerId, leaderIsrAndControllerEpoch, correlationId)}
+
+ // Finally add these partitions to the list of partitions for which the leader is the
current broker
leaderPartitionsLock synchronized {
- leaderPartitions += partition
- }
+ leaderPartitions ++= partitionLeaderISRAndControllerEpochs.keySet
+ }
+ } catch {
+ case e: Throwable =>
+ val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId
%d received from controller %d " +
+ "epoch %d").format(localBrokerId, correlationId, controllerId, epoch)
+ stateChangeLogger.error(errorMsg, e)
+ // Re-throw the exception for it to be caught in KafkaApis
+ throw e
}
- stateChangeLogger.trace("Broker %d completed become-leader transition for partition [%s,%d]".format(localBrokerId,
topic, partitionId))
+
+ stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from
controller %d epoch %d " +
+ "for the become-leader transition for partitions %s")
+ .format(localBrokerId, correlationId, controllerId, epoch, partitionLeaderISRAndControllerEpochs.keySet.mkString(",")))
}
- private def makeFollower(controllerId: Int, epoch: Int, topic: String, partitionId: Int,
- partitionStateInfo: PartitionStateInfo, leaders: Set[Broker],
correlationId: Int) {
- val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
+ /*
+ * Make the current broker to become follower for a given set of partitions by:
+ *
+ * 1. Stop fetchers for these partitions
+ * 2. Truncate the log and checkpoint offsets for these partitions.
+ * 3. If the broker is not shutting down, add the fetcher to the new leaders
+ * 4. Update the partition metadata in cache
+ * 5. Remove these partitions from the leader partitions set
+ *
+ * The ordering of doing these steps make sure that the replicas in transition will not
+ * take any more messages before checkpointing offsets so that all messages before the
checkpoint
+ * are guaranteed to be flushed to disks
+ *
+ * If an unexpected error is thrown in this function, it will be propagated to KafkaApis
where
+ * the error message will be set on each partition since we do not know which partition
caused it
+ * TODO: the above may need to be fixed later
+ */
+ private def makeFollowers(controllerId: Int, epoch: Int, partitionLeaderISRAndControllerEpochs:
Map[Partition, LeaderIsrAndControllerEpoch],
+ leaders: Set[Broker], correlationId: Int, responseMap: mutable.Map[(String,
Int), Short]) {
stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from
controller %d epoch %d " +
- "starting the become-follower transition for partition [%s,%d]")
- .format(localBrokerId, correlationId, controllerId, epoch,
topic, partitionId))
+ "starting the become-follower transition for partitions %s")
+ .format(localBrokerId, correlationId, controllerId, epoch,
+ partitionLeaderISRAndControllerEpochs.keySet.mkString(",")))
+
+ for (partition <- partitionLeaderISRAndControllerEpochs.keys)
+ responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError)
+
+ try {
+ replicaFetcherManager.removeFetcherForPartitions(partitionLeaderISRAndControllerEpochs.keySet.map(new
TopicAndPartition(_)))
+ stateChangeLogger.trace("Broker %d stopped fetchers for partitions %s as per becoming-follower
request from controller %d epoch %d"
+ .format(localBrokerId, partitionLeaderISRAndControllerEpochs.keySet.mkString(","),
controllerId, correlationId))
+
+ logManager.truncateTo(partitionLeaderISRAndControllerEpochs.map{ case(partition, leaderISRAndControllerEpoch)
=>
+ new TopicAndPartition(partition) -> partition.getOrCreateReplica().highWatermark
+ })
+ stateChangeLogger.trace("Broker %d truncated logs and checkpoint recovery boundaries
for partitions %s as per becoming-follower request from controller %d epoch %d"
+ .format(localBrokerId, partitionLeaderISRAndControllerEpochs.keySet.mkString(","),
controllerId, correlationId))
+
+ if (!isShuttingDown.get()) {
+ replicaFetcherManager.addFetcherForPartitions(partitionLeaderISRAndControllerEpochs.map{
case(partition, leaderISRAndControllerEpoch) =>
+ new TopicAndPartition(partition) -> BrokerAndInitialOffset(leaders.find(_.id
== leaderISRAndControllerEpoch.leaderAndIsr.leader).get, partition.getReplica().get.logEndOffset)}
+ )
+ }
+ else {
+ stateChangeLogger.trace(("Broker %d ignored the become-follower state change with
correlation id %d from " +
+ "controller %d epoch %d since it is shutting down")
+ .format(localBrokerId, correlationId, controllerId, epoch))
+ }
+
+ partitionLeaderISRAndControllerEpochs.foreach{ case (partition, leaderIsrAndControllerEpoch)
=> partition.makeFollower(controllerId, leaderIsrAndControllerEpoch, leaders, correlationId)}
- val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
- if (partition.makeFollower(controllerId, topic, partitionId, leaderIsrAndControllerEpoch,
leaders, correlationId)) {
- // remove this replica's partition from the ISR expiration queue
leaderPartitionsLock synchronized {
- leaderPartitions -= partition
+ leaderPartitions --= partitionLeaderISRAndControllerEpochs.keySet
}
+ } catch {
+ case e: Throwable =>
+ val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId
%d received from controller %d " +
+ "epoch %d").format(localBrokerId, correlationId, controllerId, epoch)
+ stateChangeLogger.error(errorMsg, e)
+ // Re-throw the exception for it to be caught in KafkaApis
+ throw e
}
- stateChangeLogger.trace("Broker %d completed the become-follower transition for partition
[%s,%d]".format(localBrokerId, topic, partitionId))
+
+ stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from
controller %d epoch %d " +
+ "for the become-follower transition for partitions %s")
+ .format(localBrokerId, correlationId, controllerId, epoch, partitionLeaderISRAndControllerEpochs.keySet.mkString(",")))
}
private def maybeShrinkIsr(): Unit = {
@@ -307,7 +401,7 @@ class ReplicaManager(val config: KafkaConfig,
val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect{case Some(replica)
=> replica}
val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParent)
for((dir, reps) <- replicasByDir) {
- val hwms = reps.map(r => (TopicAndPartition(r.topic, r.partitionId) -> r.highWatermark)).toMap
+ val hwms = reps.map(r => (new TopicAndPartition(r) -> r.highWatermark)).toMap
try {
highWatermarkCheckpoints(dir).write(hwms)
} catch {
|