kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject kafka git commit: KAFKA-1546; Automate replica lag tuning; reviewed by Joel Koshy, Neha Narkhede, Jun Rao and Guozhang Wang
Date Sat, 04 Apr 2015 01:40:42 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 48f997047 -> 15b93a410


KAFKA-1546; Automate replica lag tuning; reviewed by Joel Koshy, Neha
Narkhede, Jun Rao and Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/15b93a41
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/15b93a41
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/15b93a41

Branch: refs/heads/trunk
Commit: 15b93a410a8e988e09dc147c6d5250cbbe328b26
Parents: 48f9970
Author: Aditya Auradkar <aauradkar@linkedin.com>
Authored: Fri Apr 3 18:39:27 2015 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Fri Apr 3 18:40:17 2015 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/cluster/Partition.scala    | 75 ++++++++++---------
 core/src/main/scala/kafka/cluster/Replica.scala | 25 +++++--
 core/src/main/scala/kafka/log/Log.scala         |  2 +-
 .../main/scala/kafka/server/FetchDataInfo.scala |  2 +-
 .../main/scala/kafka/server/KafkaConfig.scala   | 10 +--
 .../scala/kafka/server/ReplicaManager.scala     | 62 +++++++++++----
 .../unit/kafka/server/ISRExpirationTest.scala   | 79 ++++++++++++++++----
 .../kafka/server/KafkaConfigConfigDefTest.scala |  2 -
 .../unit/kafka/server/LogRecoveryTest.scala     |  1 -
 .../unit/kafka/server/SimpleFetchTest.scala     |  7 +-
 10 files changed, 177 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/15b93a41/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index c4bf48a..6d142d6 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -22,7 +22,7 @@ import kafka.utils.Utils.{inReadLock,inWriteLock}
 import kafka.admin.AdminUtils
 import kafka.api.{PartitionStateInfo, LeaderAndIsr}
 import kafka.log.LogConfig
-import kafka.server.{TopicPartitionOperationKey, LogOffsetMetadata, OffsetManager, ReplicaManager}
+import kafka.server.{TopicPartitionOperationKey, LogOffsetMetadata, OffsetManager, LogReadResult,
ReplicaManager}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.controller.KafkaController
 import kafka.message.ByteBufferMessageSet
@@ -33,7 +33,6 @@ import scala.collection.immutable.Set
 
 import com.yammer.metrics.core.Gauge
 
-
 /**
  * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR,
RAR
  */
@@ -51,6 +50,7 @@ class Partition(val topic: String,
   @volatile private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1
   @volatile var leaderReplicaIdOpt: Option[Int] = None
   @volatile var inSyncReplicas: Set[Replica] = Set.empty[Replica]
+
   /* Epoch of the controller that last changed the leader. This needs to be initialized correctly
upon broker startup.
    * One way of doing that is through the controller's start replica state change command.
When a new broker starts up
    * the controller sends it a start replica command containing the leader for each partition
that the broker hosts.
@@ -182,7 +182,8 @@ class Partition(val topic: String,
       val newLeaderReplica = getReplica().get
       newLeaderReplica.convertHWToLocalOffsetMetadata()
       // reset log end offset for remote replicas
-      assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset =
LogOffsetMetadata.UnknownOffsetMetadata)
+      assignedReplicas.foreach(r =>
+        if (r.brokerId != localBrokerId) r.updateLogReadResult(LogReadResult.UnknownLogReadResult))
       // we may need to increment high watermark since ISR could be down to 1
       maybeIncrementLeaderHW(newLeaderReplica)
       if (topic == OffsetManager.OffsetsTopicName)
@@ -234,21 +235,27 @@ class Partition(val topic: String,
   /**
    * Update the log end offset of a certain replica of this partition
    */
-  def updateReplicaLEO(replicaId: Int, offset: LogOffsetMetadata) {
+  def updateReplicaLogReadResult(replicaId: Int, logReadResult: LogReadResult) {
     getReplica(replicaId) match {
       case Some(replica) =>
-        replica.logEndOffset = offset
-
+        replica.updateLogReadResult(logReadResult)
         // check if we need to expand ISR to include this replica
         // if it is not in the ISR yet
         maybeExpandIsr(replicaId)
 
         debug("Recorded replica %d log end offset (LEO) position %d for partition %s."
-          .format(replicaId, offset.messageOffset, TopicAndPartition(topic, partitionId)))
+          .format(replicaId,
+                  logReadResult.info.fetchOffsetMetadata.messageOffset,
+                  TopicAndPartition(topic, partitionId)))
       case None =>
         throw new NotAssignedReplicaException(("Leader %d failed to record follower %d's
position %d since the replica" +
-          " is not recognized to be one of the assigned replicas %s for partition [%s,%d]").format(localBrokerId,
replicaId,
-          offset.messageOffset, assignedReplicas().map(_.brokerId).mkString(","), topic,
partitionId))
+          " is not recognized to be one of the assigned replicas %s for partition [%s,%d]")
+          .format(localBrokerId,
+                  replicaId,
+                  logReadResult.info.fetchOffsetMetadata,
+                  assignedReplicas().map(_.brokerId).mkString(","),
+                  topic,
+                  partitionId))
     }
   }
 
@@ -264,21 +271,18 @@ class Partition(val topic: String,
         case Some(leaderReplica) =>
           val replica = getReplica(replicaId).get
           val leaderHW = leaderReplica.highWatermark
-          // For a replica to get added back to ISR, it has to satisfy 3 conditions-
-          // 1. It is not already in the ISR
-          // 2. It is part of the assigned replica list. See KAFKA-1097
-          // 3. It's log end offset >= leader's high watermark
-          if (!inSyncReplicas.contains(replica) &&
-            assignedReplicas.map(_.brokerId).contains(replicaId) &&
-            replica.logEndOffset.offsetDiff(leaderHW) >= 0) {
-            // expand ISR
+          if(!inSyncReplicas.contains(replica) &&
+             assignedReplicas.map(_.brokerId).contains(replicaId) &&
+                  replica.logEndOffset.offsetDiff(leaderHW) >= 0) {
             val newInSyncReplicas = inSyncReplicas + replica
             info("Expanding ISR for partition [%s,%d] from %s to %s"
-                 .format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","),
newInSyncReplicas.map(_.brokerId).mkString(",")))
+                         .format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","),
+                                 newInSyncReplicas.map(_.brokerId).mkString(",")))
             // update ISR in ZK and cache
             updateIsr(newInSyncReplicas)
             replicaManager.isrExpandRate.mark()
           }
+
           // check if the HW of the partition can now be incremented
           // since the replica maybe now be in the ISR and its LEO has just incremented
           maybeIncrementLeaderHW(leaderReplica)
@@ -353,11 +357,11 @@ class Partition(val topic: String,
     }
   }
 
-  def maybeShrinkIsr(replicaMaxLagTimeMs: Long,  replicaMaxLagMessages: Long) {
+  def maybeShrinkIsr(replicaMaxLagTimeMs: Long) {
     inWriteLock(leaderIsrUpdateLock) {
       leaderReplicaIfLocal() match {
         case Some(leaderReplica) =>
-          val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs,
replicaMaxLagMessages)
+          val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs)
           if(outOfSyncReplicas.size > 0) {
             val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
             assert(newInSyncReplicas.size > 0)
@@ -374,27 +378,26 @@ class Partition(val topic: String,
     }
   }
 
-  def getOutOfSyncReplicas(leaderReplica: Replica, keepInSyncTimeMs: Long, keepInSyncMessages:
Long): Set[Replica] = {
+  def getOutOfSyncReplicas(leaderReplica: Replica, maxLagMs: Long): Set[Replica] = {
     /**
-     * there are two cases that need to be handled here -
-     * 1. Stuck followers: If the leo of the replica hasn't been updated for keepInSyncTimeMs
ms,
+     * there are two cases that will be handled here -
+     * 1. Stuck followers: If the leo of the replica hasn't been updated for maxLagMs ms,
      *                     the follower is stuck and should be removed from the ISR
-     * 2. Slow followers: If the leo of the slowest follower is behind the leo of the leader
by keepInSyncMessages, the
-     *                     follower is not catching up and should be removed from the ISR
+     * 2. Slow followers: If the replica has not read up to the leo within the last maxLagMs
ms,
+     *                    then the follower is lagging and should be removed from the ISR
+     * Both these cases are handled by checking the lastCaughtUpTimeMs which represents
+     * the last time when the replica was fully caught up. If either of the above conditions
+     * is violated, that replica is considered to be out of sync
+     *
      **/
     val leaderLogEndOffset = leaderReplica.logEndOffset
     val candidateReplicas = inSyncReplicas - leaderReplica
-    // Case 1 above
-    val stuckReplicas = candidateReplicas.filter(r => (time.milliseconds - r.logEndOffsetUpdateTimeMs)
> keepInSyncTimeMs)
-    if(stuckReplicas.size > 0)
-      debug("Stuck replicas for partition [%s,%d] are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(",")))
-    // Case 2 above
-    val slowReplicas = candidateReplicas.filter(r =>
-      r.logEndOffset.messageOffset >= 0 &&
-      leaderLogEndOffset.messageOffset - r.logEndOffset.messageOffset > keepInSyncMessages)
-    if(slowReplicas.size > 0)
-      debug("Slow replicas for partition [%s,%d] are %s".format(topic, partitionId, slowReplicas.map(_.brokerId).mkString(",")))
-    stuckReplicas ++ slowReplicas
+
+    val laggingReplicas = candidateReplicas.filter(r => (time.milliseconds - r.lastCaughtUpTimeMs)
> maxLagMs)
+    if(laggingReplicas.size > 0)
+      debug("Lagging replicas for partition %s are %s".format(TopicAndPartition(topic, partitionId),
laggingReplicas.map(_.brokerId).mkString(",")))
+
+    laggingReplicas
   }
 
   def appendMessagesToLeader(messages: ByteBufferMessageSet, requiredAcks: Int = 0) = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/15b93a41/core/src/main/scala/kafka/cluster/Replica.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index bd13c20..740e835 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -19,7 +19,7 @@ package kafka.cluster
 
 import kafka.log.Log
 import kafka.utils.{SystemTime, Time, Logging}
-import kafka.server.LogOffsetMetadata
+import kafka.server.{LogReadResult, LogOffsetMetadata}
 import kafka.common.KafkaException
 
 import java.util.concurrent.atomic.AtomicLong
@@ -34,8 +34,6 @@ class Replica(val brokerId: Int,
   // the log end offset value, kept in all replicas;
   // for local replica it is the log's end offset, for remote replicas its value is only
updated by follower fetch
   @volatile private[this] var logEndOffsetMetadata: LogOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata
-  // the time when log offset is updated
-  private[this] val logEndOffsetUpdateTimeMsValue = new AtomicLong(time.milliseconds)
 
   val topic = partition.topic
   val partitionId = partition.partitionId
@@ -47,12 +45,27 @@ class Replica(val brokerId: Int,
     }
   }
 
-  def logEndOffset_=(newLogEndOffset: LogOffsetMetadata) {
+  private[this] val lastCaughtUpTimeMsUnderlying = new AtomicLong(time.milliseconds)
+
+  def lastCaughtUpTimeMs = lastCaughtUpTimeMsUnderlying.get()
+
+  def updateLogReadResult(logReadResult : LogReadResult) {
+    logEndOffset = logReadResult.info.fetchOffsetMetadata
+
+    /* If the request read up to the log end offset snapshot when the read was initiated,
+     * set the lastCaughtUpTimeMsUnderlying to the current time.
+     * This means that the replica is fully caught up.
+     */
+    if(logReadResult.isReadFromLogEnd) {
+      lastCaughtUpTimeMsUnderlying.set(time.milliseconds)
+    }
+  }
+
+  private def logEndOffset_=(newLogEndOffset: LogOffsetMetadata) {
     if (isLocal) {
       throw new KafkaException("Should not set log end offset on partition [%s,%d]'s local
replica %d".format(topic, partitionId, brokerId))
     } else {
       logEndOffsetMetadata = newLogEndOffset
-      logEndOffsetUpdateTimeMsValue.set(time.milliseconds)
       trace("Setting log end offset for replica %d for partition [%s,%d] to [%s]"
         .format(brokerId, topic, partitionId, logEndOffsetMetadata))
     }
@@ -64,8 +77,6 @@ class Replica(val brokerId: Int,
     else
       logEndOffsetMetadata
 
-  def logEndOffsetUpdateTimeMs = logEndOffsetUpdateTimeMsValue.get()
-
   def highWatermark_=(newHighWatermark: LogOffsetMetadata) {
     if (isLocal) {
       highWatermarkMetadata = newHighWatermark

http://git-wip-us.apache.org/repos/asf/kafka/blob/15b93a41/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 06b8ecc..a0745be 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -470,7 +470,7 @@ class Log(val dir: File,
   def convertToOffsetMetadata(offset: Long): LogOffsetMetadata = {
     try {
       val fetchDataInfo = read(offset, 1)
-      fetchDataInfo.fetchOffset
+      fetchDataInfo.fetchOffsetMetadata
     } catch {
       case e: OffsetOutOfRangeException => LogOffsetMetadata.UnknownOffsetMetadata
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/15b93a41/core/src/main/scala/kafka/server/FetchDataInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/FetchDataInfo.scala b/core/src/main/scala/kafka/server/FetchDataInfo.scala
index 26f278f..1a8a604 100644
--- a/core/src/main/scala/kafka/server/FetchDataInfo.scala
+++ b/core/src/main/scala/kafka/server/FetchDataInfo.scala
@@ -19,4 +19,4 @@ package kafka.server
 
 import kafka.message.MessageSet
 
-case class FetchDataInfo(fetchOffset: LogOffsetMetadata, messageSet: MessageSet)
+case class FetchDataInfo(fetchOffsetMetadata: LogOffsetMetadata, messageSet: MessageSet)

http://git-wip-us.apache.org/repos/asf/kafka/blob/15b93a41/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 422451a..6217302 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -87,7 +87,6 @@ object Defaults {
   val ControllerMessageQueueSize = Int.MaxValue
   val DefaultReplicationFactor = 1
   val ReplicaLagTimeMaxMs = 10000L
-  val ReplicaLagMaxMessages = 4000
   val ReplicaSocketTimeoutMs = ConsumerConfig.SocketTimeout
   val ReplicaSocketReceiveBufferBytes = ConsumerConfig.SocketBufferSize
   val ReplicaFetchMaxBytes = ConsumerConfig.FetchSize
@@ -194,7 +193,6 @@ object KafkaConfig {
   val ControllerMessageQueueSizeProp = "controller.message.queue.size"
   val DefaultReplicationFactorProp = "default.replication.factor"
   val ReplicaLagTimeMaxMsProp = "replica.lag.time.max.ms"
-  val ReplicaLagMaxMessagesProp = "replica.lag.max.messages"
   val ReplicaSocketTimeoutMsProp = "replica.socket.timeout.ms"
   val ReplicaSocketReceiveBufferBytesProp = "replica.socket.receive.buffer.bytes"
   val ReplicaFetchMaxBytesProp = "replica.fetch.max.bytes"
@@ -303,8 +301,8 @@ object KafkaConfig {
   val ControllerSocketTimeoutMsDoc = "The socket timeout for controller-to-broker channels"
   val ControllerMessageQueueSizeDoc = "The buffer size for controller-to-broker-channels"
   val DefaultReplicationFactorDoc = "default replication factors for automatically created
topics"
-  val ReplicaLagTimeMaxMsDoc = "If a follower hasn't sent any fetch requests during this
time, the leader will remove the follower from isr"
-  val ReplicaLagMaxMessagesDoc = "If the lag in messages between a leader and a follower
exceeds this number, the leader will remove the follower from isr"
+  val ReplicaLagTimeMaxMsDoc = "If a follower hasn't sent any fetch requests or hasn't consumed
up to the leaders log end offset for at least this time," +
+    " the leader will remove the follower from isr"
   val ReplicaSocketTimeoutMsDoc = "The socket timeout for network requests. Its value should
be at least replica.fetch.wait.max.ms"
   val ReplicaSocketReceiveBufferBytesDoc = "The socket receive buffer for network requests"
   val ReplicaFetchMaxBytesDoc = "The number of byes of messages to attempt to fetch"
@@ -427,7 +425,6 @@ object KafkaConfig {
       .define(ControllerMessageQueueSizeProp, INT, Defaults.ControllerMessageQueueSize, MEDIUM,
ControllerMessageQueueSizeDoc)
       .define(DefaultReplicationFactorProp, INT, Defaults.DefaultReplicationFactor, MEDIUM,
DefaultReplicationFactorDoc)
       .define(ReplicaLagTimeMaxMsProp, LONG, Defaults.ReplicaLagTimeMaxMs, HIGH, ReplicaLagTimeMaxMsDoc)
-      .define(ReplicaLagMaxMessagesProp, LONG, Defaults.ReplicaLagMaxMessages, HIGH, ReplicaLagMaxMessagesDoc)
       .define(ReplicaSocketTimeoutMsProp, INT, Defaults.ReplicaSocketTimeoutMs, HIGH, ReplicaSocketTimeoutMsDoc)
       .define(ReplicaSocketReceiveBufferBytesProp, INT, Defaults.ReplicaSocketReceiveBufferBytes,
HIGH, ReplicaSocketReceiveBufferBytesDoc)
       .define(ReplicaFetchMaxBytesProp, INT, Defaults.ReplicaFetchMaxBytes, HIGH, ReplicaFetchMaxBytesDoc)
@@ -546,7 +543,6 @@ object KafkaConfig {
       controllerMessageQueueSize = parsed.get(ControllerMessageQueueSizeProp).asInstanceOf[Int],
       defaultReplicationFactor = parsed.get(DefaultReplicationFactorProp).asInstanceOf[Int],
       replicaLagTimeMaxMs = parsed.get(ReplicaLagTimeMaxMsProp).asInstanceOf[Long],
-      replicaLagMaxMessages = parsed.get(ReplicaLagMaxMessagesProp).asInstanceOf[Long],
       replicaSocketTimeoutMs = parsed.get(ReplicaSocketTimeoutMsProp).asInstanceOf[Int],
       replicaSocketReceiveBufferBytes = parsed.get(ReplicaSocketReceiveBufferBytesProp).asInstanceOf[Int],
       replicaFetchMaxBytes = parsed.get(ReplicaFetchMaxBytesProp).asInstanceOf[Int],
@@ -687,7 +683,6 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/
                   val controllerMessageQueueSize: Int = Defaults.ControllerMessageQueueSize,
                   val defaultReplicationFactor: Int = Defaults.DefaultReplicationFactor,
                   val replicaLagTimeMaxMs: Long = Defaults.ReplicaLagTimeMaxMs,
-                  val replicaLagMaxMessages: Long = Defaults.ReplicaLagMaxMessages,
                   val replicaSocketTimeoutMs: Int = Defaults.ReplicaSocketTimeoutMs,
                   val replicaSocketReceiveBufferBytes: Int = Defaults.ReplicaSocketReceiveBufferBytes,
                   val replicaFetchMaxBytes: Int = Defaults.ReplicaFetchMaxBytes,
@@ -856,7 +851,6 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/
     props.put(ControllerMessageQueueSizeProp, controllerMessageQueueSize.toString)
     props.put(DefaultReplicationFactorProp, defaultReplicationFactor.toString)
     props.put(ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString)
-    props.put(ReplicaLagMaxMessagesProp, replicaLagMaxMessages.toString)
     props.put(ReplicaSocketTimeoutMsProp, replicaSocketTimeoutMs.toString)
     props.put(ReplicaSocketReceiveBufferBytesProp, replicaSocketReceiveBufferBytes.toString)
     props.put(ReplicaFetchMaxBytesProp, replicaFetchMaxBytes.toString)

http://git-wip-us.apache.org/repos/asf/kafka/blob/15b93a41/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 44f0026..6e43622 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -52,12 +52,36 @@ case class LogAppendResult(info: LogAppendInfo, error: Option[Throwable]
= None)
 
 /*
  * Result metadata of a log read operation on the log
+ * @param info @FetchDataInfo returned by the @Log read
+ * @param hw high watermark of the local replica
+ * @param readSize amount of data that was read from the log i.e. size of the fetch
+ * @param isReadFromLogEnd true if the request read up to the log end offset snapshot
+ *                         when the read was initiated, false otherwise
+ * @param error Exception if error encountered while reading from the log
  */
-case class LogReadResult(info: FetchDataInfo, hw: Long, readSize: Int, error: Option[Throwable]
= None) {
+case class LogReadResult(info: FetchDataInfo,
+                         hw: Long,
+                         readSize: Int,
+                         isReadFromLogEnd : Boolean,
+                         error: Option[Throwable] = None) {
+
   def errorCode = error match {
     case None => ErrorMapping.NoError
     case Some(e) => ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
   }
+
+  override def toString = {
+    "Fetch Data: [%s], HW: [%d], readSize: [%d], isReadFromLogEnd: [%b], error: [%s]"
+            .format(info, hw, readSize, isReadFromLogEnd, error)
+  }
+}
+
+object LogReadResult {
+  val UnknownLogReadResult = LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata,
+                                                         MessageSet.Empty),
+                                           -1L,
+                                           -1,
+                                           false)
 }
 
 object ReplicaManager {
@@ -406,7 +430,7 @@ class ReplicaManager(val config: KafkaConfig,
     // if the fetch comes from the follower,
     // update its corresponding log end offset
     if(Request.isValidBrokerId(replicaId))
-      updateFollowerLEOs(replicaId, logReadResults.mapValues(_.info.fetchOffset))
+      updateFollowerLogReadResults(replicaId, logReadResults)
 
     // check if this fetch request can be satisfied right away
     val bytesReadable = logReadResults.values.map(_.info.messageSet.sizeInBytes).sum
@@ -424,7 +448,7 @@ class ReplicaManager(val config: KafkaConfig,
     } else {
       // construct the fetch results from the read results
       val fetchPartitionStatus = logReadResults.map { case (topicAndPartition, result) =>
-        (topicAndPartition, FetchPartitionStatus(result.info.fetchOffset, fetchInfo.get(topicAndPartition).get))
+        (topicAndPartition, FetchPartitionStatus(result.info.fetchOffsetMetadata, fetchInfo.get(topicAndPartition).get))
       }
       val fetchMetadata = FetchMetadata(fetchMinBytes, fetchOnlyFromLeader, fetchOnlyCommitted,
isFromFollower, fetchPartitionStatus)
       val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, responseCallback)
@@ -466,7 +490,13 @@ class ReplicaManager(val config: KafkaConfig,
           else
             None
 
-          // read on log
+          /* Read the LogOffsetMetadata prior to performing the read from the log.
+           * We use the LogOffsetMetadata to determine if a particular replica is in-sync
or not.
+           * Using the log end offset after performing the read can lead to a race condition
+           * where data gets appended to the log immediately after the replica has consumed
from it
+           * This can cause a replica to always be out of sync.
+           */
+          val initialLogEndOffset = localReplica.logEndOffset
           val logReadInfo = localReplica.log match {
             case Some(log) =>
               log.read(offset, fetchSize, maxOffsetOpt)
@@ -475,23 +505,25 @@ class ReplicaManager(val config: KafkaConfig,
               FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty)
           }
 
-          LogReadResult(logReadInfo, localReplica.highWatermark.messageOffset, fetchSize,
None)
+          val readToEndOfLog = initialLogEndOffset.messageOffset - logReadInfo.fetchOffsetMetadata.messageOffset
<= 0
+
+          LogReadResult(logReadInfo, localReplica.highWatermark.messageOffset, fetchSize,
readToEndOfLog, None)
         } catch {
           // NOTE: Failed fetch requests metric is not incremented for known exceptions since
it
           // is supposed to indicate un-expected failure of a broker in handling a fetch
request
           case utpe: UnknownTopicOrPartitionException =>
-            LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty),
-1L, fetchSize, Some(utpe))
+            LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty),
-1L, fetchSize, false, Some(utpe))
           case nle: NotLeaderForPartitionException =>
-            LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty),
-1L, fetchSize, Some(nle))
+            LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty),
-1L, fetchSize, false, Some(nle))
           case rnae: ReplicaNotAvailableException =>
-            LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty),
-1L, fetchSize, Some(rnae))
+            LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty),
-1L, fetchSize, false, Some(rnae))
           case oor : OffsetOutOfRangeException =>
-            LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty),
-1L, fetchSize, Some(oor))
+            LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty),
-1L, fetchSize, false, Some(oor))
           case e: Throwable =>
             BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
             BrokerTopicStats.getBrokerAllTopicsStats().failedFetchRequestRate.mark()
             error("Error processing fetch operation on partition [%s,%d] offset %d".format(topic,
partition, offset))
-            LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty),
-1L, fetchSize, Some(e))
+            LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty),
-1L, fetchSize, false, Some(e))
         }
       (TopicAndPartition(topic, partition), partitionDataAndOffsetInfo)
     }
@@ -748,15 +780,15 @@ class ReplicaManager(val config: KafkaConfig,
 
   private def maybeShrinkIsr(): Unit = {
     trace("Evaluating ISR list of partitions to see which replicas can be removed from the
ISR")
-    allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs,
config.replicaLagMaxMessages))
+    allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs))
   }
 
-  private def updateFollowerLEOs(replicaId: Int, offsets: Map[TopicAndPartition, LogOffsetMetadata])
{
-    debug("Recording follower broker %d log end offsets: %s ".format(replicaId, offsets))
-    offsets.foreach { case (topicAndPartition, offset) =>
+  private def updateFollowerLogReadResults(replicaId: Int, readResults: Map[TopicAndPartition,
LogReadResult]) {
+    debug("Recording follower broker %d log read results: %s ".format(replicaId, readResults))
+    readResults.foreach { case (topicAndPartition, readResult) =>
       getPartition(topicAndPartition.topic, topicAndPartition.partition) match {
         case Some(partition) =>
-          partition.updateReplicaLEO(replicaId, offset)
+          partition.updateReplicaLogReadResult(replicaId, readResult)
 
           // for producer requests with ack > 1, we need to check
           // if they can be unblocked after some follower's log end offsets have moved

http://git-wip-us.apache.org/repos/asf/kafka/blob/15b93a41/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 9215235..c1d168a 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -27,18 +27,18 @@ import kafka.log.Log
 import org.junit.Assert._
 import kafka.utils._
 import java.util.concurrent.atomic.AtomicBoolean
+import kafka.message.MessageSet
+
 
 class IsrExpirationTest extends JUnit3Suite {
 
   var topicPartitionIsr: Map[(String, Int), Seq[Int]] = new HashMap[(String, Int), Seq[Int]]()
   val replicaLagTimeMaxMs = 100L
   val replicaFetchWaitMaxMs = 100
-  val replicaLagMaxMessages = 10L
 
   val overridingProps = new Properties()
   overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString)
   overridingProps.put(KafkaConfig.ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString)
-  overridingProps.put(KafkaConfig.ReplicaLagMaxMessagesProp, replicaLagMaxMessages.toString)
   val configs = TestUtils.createBrokerConfigs(2).map(KafkaConfig.fromProps(_, overridingProps))
   val topic = "foo"
 
@@ -56,6 +56,9 @@ class IsrExpirationTest extends JUnit3Suite {
     super.tearDown()
   }
 
+  /*
+   * Test the case where a follower is caught up but stops making requests to the leader.
Once beyond the configured time limit, it should fall out of ISR
+   */
   def testIsrExpirationForStuckFollowers() {
     val log = getLogWithLogEndOffset(15L, 2) // set logEndOffset for leader to 15L
 
@@ -64,36 +67,84 @@ class IsrExpirationTest extends JUnit3Suite {
     assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
     val leaderReplica = partition0.getReplica(configs.head.brokerId).get
 
-    // let the follower catch up to 10
-    (partition0.assignedReplicas() - leaderReplica).foreach(r => r.logEndOffset = new
LogOffsetMetadata(10L))
-    var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs,
configs.head.replicaLagMaxMessages)
+    // let the follower catch up to the Leader logEndOffset (15)
+    (partition0.assignedReplicas() - leaderReplica).foreach(
+      r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L),
+                                                                 MessageSet.Empty),
+                                                   -1L,
+                                                   -1,
+                                                   true)))
+    var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs)
     assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId))
 
     // let some time pass
     time.sleep(150)
 
-    // now follower (broker id 1) has caught up to only 10, while the leader is at 15 AND
the follower hasn't
-    // pulled any data for > replicaMaxLagTimeMs ms. So it is stuck
-    partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs,
configs.head.replicaLagMaxMessages)
+    // now follower hasn't pulled any data for > replicaMaxLagTimeMs ms. So it is stuck
+    partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs)
+    assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
+    EasyMock.verify(log)
+  }
+
+  /*
+   * Test the case where a follower never makes a fetch request. It should fall out of ISR
because it will be declared stuck
+   */
+  def testIsrExpirationIfNoFetchRequestMade() {
+    val log = getLogWithLogEndOffset(15L, 1) // set logEndOffset for leader to 15L
+
+    // create one partition and all replicas
+    val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time, configs.head, log)
+    assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
+    val leaderReplica = partition0.getReplica(configs.head.brokerId).get
+
+    // Let enough time pass for the replica to be considered stuck
+    time.sleep(150)
+
+    val partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs)
     assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
     EasyMock.verify(log)
   }
 
+  /*
+   * Test the case where a follower continually makes fetch requests but is unable to catch
up. It should fall out of the ISR
+   * However, any time it makes a request to the LogEndOffset it should be back in the ISR
+   */
   def testIsrExpirationForSlowFollowers() {
     // create leader replica
-    val log = getLogWithLogEndOffset(15L, 1)
+    val log = getLogWithLogEndOffset(15L, 4)
     // add one partition
     val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time, configs.head, log)
     assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
     val leaderReplica = partition0.getReplica(configs.head.brokerId).get
-    // set remote replicas leo to something low, like 4
-    (partition0.assignedReplicas() - leaderReplica).foreach(r => r.logEndOffset = new
LogOffsetMetadata(4L))
 
-    // now follower (broker id 1) has caught up to only 4, while the leader is at 15. Since
the gap it larger than
-    // replicaMaxLagBytes, the follower is out of sync.
-    val partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs,
configs.head.replicaLagMaxMessages)
+    // Make the remote replica not read to the end of log. It should be not be out of sync
for at least 100 ms
+    for(replica <- (partition0.assignedReplicas() - leaderReplica))
+      replica.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(10L),
MessageSet.Empty), -1L, -1, false))
+
+    // Simulate 2 fetch requests spanning more than 100 ms which do not read to the end of
the log.
+    // The replicas will no longer be in ISR. We do 2 fetches because we want to simulate
the case where the replica is lagging but is not stuck
+    var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs)
+    assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId))
+
+    time.sleep(75)
+
+    (partition0.assignedReplicas() - leaderReplica).foreach(
+      r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(11L),
MessageSet.Empty), -1L, -1, false)))
+    partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs)
+    assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId))
+
+    time.sleep(75)
+
+    // The replicas will no longer be in ISR
+    partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs)
     assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
 
+    // Now actually make a fetch to the end of the log. The replicas should be back in ISR
+    (partition0.assignedReplicas() - leaderReplica).foreach(
+      r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L),
MessageSet.Empty), -1L, -1, true)))
+    partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs)
+    assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId))
+
     EasyMock.verify(log)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/15b93a41/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
index 191251d..150c311 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
@@ -114,7 +114,6 @@ class KafkaConfigConfigDefTest extends JUnit3Suite {
     Assert.assertEquals(expectedConfig.controllerMessageQueueSize, actualConfig.controllerMessageQueueSize)
     Assert.assertEquals(expectedConfig.defaultReplicationFactor, actualConfig.defaultReplicationFactor)
     Assert.assertEquals(expectedConfig.replicaLagTimeMaxMs, actualConfig.replicaLagTimeMaxMs)
-    Assert.assertEquals(expectedConfig.replicaLagMaxMessages, actualConfig.replicaLagMaxMessages)
     Assert.assertEquals(expectedConfig.replicaSocketTimeoutMs, actualConfig.replicaSocketTimeoutMs)
     Assert.assertEquals(expectedConfig.replicaSocketReceiveBufferBytes, actualConfig.replicaSocketReceiveBufferBytes)
     Assert.assertEquals(expectedConfig.replicaFetchMaxBytes, actualConfig.replicaFetchMaxBytes)
@@ -311,7 +310,6 @@ class KafkaConfigConfigDefTest extends JUnit3Suite {
         case KafkaConfig.ControllerMessageQueueSizeProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number")
         case KafkaConfig.DefaultReplicationFactorProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number")
         case KafkaConfig.ReplicaLagTimeMaxMsProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number")
-        case KafkaConfig.ReplicaLagMaxMessagesProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number")
         case KafkaConfig.ReplicaSocketTimeoutMsProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number", "-2")
         case KafkaConfig.ReplicaSocketReceiveBufferBytesProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number")
         case KafkaConfig.ReplicaFetchMaxBytesProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number")

http://git-wip-us.apache.org/repos/asf/kafka/blob/15b93a41/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 92d6b2c..de255e3 100644
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -40,7 +40,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
 
   val overridingProps = new Properties()
   overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString)
-  overridingProps.put(KafkaConfig.ReplicaLagMaxMessagesProp, replicaLagMaxMessages.toString)
   overridingProps.put(KafkaConfig.ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString)
   overridingProps.put(KafkaConfig.ReplicaFetchMinBytesProp, replicaFetchMinBytes.toString)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/15b93a41/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index efb4573..519888e 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -21,7 +21,7 @@ import kafka.utils._
 import kafka.cluster.Replica
 import kafka.common.TopicAndPartition
 import kafka.log.Log
-import kafka.message.{ByteBufferMessageSet, Message}
+import kafka.message.{MessageSet, ByteBufferMessageSet, Message}
 
 import scala.Some
 import java.util.{Properties, Collections}
@@ -42,7 +42,6 @@ class SimpleFetchTest extends JUnit3Suite {
   val overridingProps = new Properties()
   overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString)
   overridingProps.put(KafkaConfig.ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString)
-  overridingProps.put(KafkaConfig.ReplicaLagMaxMessagesProp, replicaLagMaxMessages.toString)
 
   val configs = TestUtils.createBrokerConfigs(2).map(KafkaConfig.fromProps(_, overridingProps))
 
@@ -78,6 +77,7 @@ class SimpleFetchTest extends JUnit3Suite {
     // create the log which takes read with either HW max offset or none max offset
     val log = EasyMock.createMock(classOf[Log])
     EasyMock.expect(log.logEndOffset).andReturn(leaderLEO).anyTimes()
+    EasyMock.expect(log.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(leaderLEO)).anyTimes()
     EasyMock.expect(log.read(0, fetchSize, Some(partitionHW))).andReturn(
       new FetchDataInfo(
         new LogOffsetMetadata(0L, 0L, 0),
@@ -108,7 +108,8 @@ class SimpleFetchTest extends JUnit3Suite {
 
     // create the follower replica with defined log end offset
     val followerReplica= new Replica(configs(1).brokerId, partition, time)
-    followerReplica.logEndOffset = new LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt)
+    val leo = new LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt)
+    followerReplica.updateLogReadResult(new LogReadResult(FetchDataInfo(leo, MessageSet.Empty),
-1L, -1, true))
 
     // add both of them to ISR
     val allReplicas = List(leaderReplica, followerReplica)


Mime
View raw message