kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject git commit: kafka-1154; replicas may not have consistent data after becoming follower; patched by Jun Rao; reviewed by Guozhang Wang and Neha Narkhede
Date Mon, 02 Dec 2013 21:57:15 GMT
Updated Branches:
  refs/heads/trunk 224f192c7 -> 7c7951426


kafka-1154; replicas may not have consistent data after becoming follower; patched by Jun
Rao; reviewed by Guozhang Wang and Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7c795142
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7c795142
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7c795142

Branch: refs/heads/trunk
Commit: 7c7951426df5fe8a65b176fe49584868e7af3b4f
Parents: 224f192
Author: Jun Rao <junrao@gmail.com>
Authored: Mon Dec 2 13:58:10 2013 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Mon Dec 2 13:58:10 2013 -0800

----------------------------------------------------------------------
 .../src/main/scala/kafka/api/FetchRequest.scala |  2 +-
 .../scala/kafka/api/RequestOrResponse.scala     |  3 +
 .../src/main/scala/kafka/server/KafkaApis.scala |  6 +-
 .../scala/kafka/server/ReplicaManager.scala     | 25 ++++----
 .../kafka/tools/ReplicaVerificationTool.scala   | 60 +++++++++++---------
 5 files changed, 52 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7c795142/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index fb2a230..d41a705 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -132,7 +132,7 @@ case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentV
     })
   }
 
-  def isFromFollower = replicaId != Request.OrdinaryConsumerId && replicaId != Request.DebuggingConsumerId
+  def isFromFollower = Request.isReplicaIdFromFollower(replicaId)
 
   def isFromOrdinaryConsumer = replicaId == Request.OrdinaryConsumerId
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7c795142/core/src/main/scala/kafka/api/RequestOrResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/RequestOrResponse.scala b/core/src/main/scala/kafka/api/RequestOrResponse.scala
index b62330b..ba59c31 100644
--- a/core/src/main/scala/kafka/api/RequestOrResponse.scala
+++ b/core/src/main/scala/kafka/api/RequestOrResponse.scala
@@ -24,6 +24,9 @@ import kafka.utils.Logging
 object Request {
   val OrdinaryConsumerId: Int = -1
   val DebuggingConsumerId: Int = -2
+
+  // Followers use broker id as the replica id, which are non-negative int.
+  def isReplicaIdFromFollower(replicaId: Int): Boolean = (replicaId >= 0)
 }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7c795142/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 80a70f1..c9f92a2 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -392,10 +392,10 @@ class KafkaApis(val requestChannel: RequestChannel,
       replicaManager.getLeaderReplicaIfLocal(topic, partition)
     trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition,
offset, maxSize))
     val maxOffsetOpt = 
-      if (fromReplicaId == Request.OrdinaryConsumerId)
-        Some(localReplica.highWatermark)
-      else
+      if (Request.isReplicaIdFromFollower(fromReplicaId))
         None
+      else
+        Some(localReplica.highWatermark)
     val messages = localReplica.log match {
       case Some(log) =>
         log.read(offset, maxSize, maxOffsetOpt)

http://git-wip-us.apache.org/repos/asf/kafka/blob/7c795142/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 54f6e16..49a0fa8 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -314,11 +314,11 @@ class ReplicaManager(val config: KafkaConfig,
   /*
    * Make the current broker to become follower for a given set of partitions by:
    *
-   * 1. Stop fetchers for these partitions
-   * 2. Truncate the log and checkpoint offsets for these partitions.
-   * 3. If the broker is not shutting down, add the fetcher to the new leaders
-   * 4. Update the partition metadata in cache
-   * 5. Remove these partitions from the leader partitions set
+   * 1. Remove these partitions from the leader partitions set.
+   * 2. Mark the replicas as followers so that no more data can be added from the producer
clients.
+   * 3. Stop fetchers for these partitions so that no more data can be added by the replica
fetcher threads.
+   * 4. Truncate the log and checkpoint offsets for these partitions.
+   * 5. If the broker is not shutting down, add the fetcher to the new leaders.
    *
    * The ordering of doing these steps make sure that the replicas in transition will not
    * take any more messages before checkpointing offsets so that all messages before the
checkpoint
@@ -326,7 +326,6 @@ class ReplicaManager(val config: KafkaConfig,
    *
    * If an unexpected error is thrown in this function, it will be propagated to KafkaApis
where
    * the error message will be set on each partition since we do not know which partition
caused it
-   *  TODO: the above may need to be fixed later
    */
   private def makeFollowers(controllerId: Int, epoch: Int, partitionState: Map[Partition,
PartitionStateInfo],
                             leaders: Set[Broker], correlationId: Int, responseMap: mutable.Map[(String,
Int), Short]) {
@@ -339,6 +338,13 @@ class ReplicaManager(val config: KafkaConfig,
       responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError)
 
     try {
+      leaderPartitionsLock synchronized {
+        leaderPartitions --= partitionState.keySet
+      }
+
+      partitionState.foreach{ case (partition, leaderIsrAndControllerEpoch) =>
+        partition.makeFollower(controllerId, leaderIsrAndControllerEpoch, leaders, correlationId)}
+
       replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_)))
       stateChangeLogger.trace("Broker %d stopped fetchers for partitions %s as per becoming-follower
request from controller %d epoch %d"
         .format(localBrokerId, partitionState.keySet.map(p => TopicAndPartition(p.topic,
p.partitionId)).mkString(","), controllerId, correlationId))
@@ -372,13 +378,6 @@ class ReplicaManager(val config: KafkaConfig,
           "controller %d epoch %d since it is shutting down")
           .format(localBrokerId, correlationId, controllerId, epoch))
       }
-
-      partitionState.foreach{ case (partition, leaderIsrAndControllerEpoch) =>
-        partition.makeFollower(controllerId, leaderIsrAndControllerEpoch, leaders, correlationId)}
-
-      leaderPartitionsLock synchronized {
-        leaderPartitions --= partitionState.keySet
-      }
     } catch {
       case e: Throwable =>
         val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId
%d received from controller %d " +

http://git-wip-us.apache.org/repos/asf/kafka/blob/7c795142/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index f1f139e..5e8c56d 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -269,34 +269,40 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
       while (isMessageInAllReplicas) {
         var messageInfoFromFirstReplicaOpt: Option[MessageInfo] = None
         for ( (replicaId, messageIterator) <- messageIteratorMap) {
-          if (messageIterator.hasNext) {
-            val messageAndOffset = messageIterator.next()
-
-            // only verify up to the high watermark
-            if (messageAndOffset.offset >= fetchResponsePerReplica.get(replicaId).hw)
-              isMessageInAllReplicas = false
-            else {
-              messageInfoFromFirstReplicaOpt match {
-                case None =>
-                  messageInfoFromFirstReplicaOpt = Some(
-                    MessageInfo(replicaId, messageAndOffset.offset,messageAndOffset.nextOffset,
messageAndOffset.message.checksum))
-                case Some(messageInfoFromFirstReplica) =>
-                  if (messageInfoFromFirstReplica.offset != messageAndOffset.offset) {
-                    println(ReplicaVerificationTool.getCurrentTimeString + ": partition "
+ topicAndPartition
-                      + ": replica " + messageInfoFromFirstReplica.replicaId + "'s offset
"
-                      + messageInfoFromFirstReplica.offset + " doesn't match replica "
-                      + replicaId + "'s offset " + messageAndOffset.offset)
-                    System.exit(1)
-                  }
-                  if (messageInfoFromFirstReplica.checksum != messageAndOffset.message.checksum)
-                    println(ReplicaVerificationTool.getCurrentTimeString + ": partition "
-                      + topicAndPartition + " has unmatched checksum at offset " + messageAndOffset.offset
+ "; replica "
-                      + messageInfoFromFirstReplica.replicaId + "'s checksum " + messageInfoFromFirstReplica.checksum
-                      + "; replica " + replicaId + "'s checksum " + messageAndOffset.message.checksum)
+          try {
+            if (messageIterator.hasNext) {
+              val messageAndOffset = messageIterator.next()
+
+              // only verify up to the high watermark
+              if (messageAndOffset.offset >= fetchResponsePerReplica.get(replicaId).hw)
+                isMessageInAllReplicas = false
+              else {
+                messageInfoFromFirstReplicaOpt match {
+                  case None =>
+                    messageInfoFromFirstReplicaOpt = Some(
+                      MessageInfo(replicaId, messageAndOffset.offset,messageAndOffset.nextOffset,
messageAndOffset.message.checksum))
+                  case Some(messageInfoFromFirstReplica) =>
+                    if (messageInfoFromFirstReplica.offset != messageAndOffset.offset) {
+                      println(ReplicaVerificationTool.getCurrentTimeString + ": partition
" + topicAndPartition
+                        + ": replica " + messageInfoFromFirstReplica.replicaId + "'s offset
"
+                        + messageInfoFromFirstReplica.offset + " doesn't match replica "
+                        + replicaId + "'s offset " + messageAndOffset.offset)
+                      System.exit(1)
+                    }
+                    if (messageInfoFromFirstReplica.checksum != messageAndOffset.message.checksum)
+                      println(ReplicaVerificationTool.getCurrentTimeString + ": partition
"
+                        + topicAndPartition + " has unmatched checksum at offset " + messageAndOffset.offset
+ "; replica "
+                        + messageInfoFromFirstReplica.replicaId + "'s checksum " + messageInfoFromFirstReplica.checksum
+                        + "; replica " + replicaId + "'s checksum " + messageAndOffset.message.checksum)
+                }
               }
-            }
-          } else
-            isMessageInAllReplicas = false
+            } else
+              isMessageInAllReplicas = false
+          } catch {
+            case t =>
+              throw new RuntimeException("Error in processing replica %d in partition %s
at offset %d."
+              .format(replicaId, topicAndPartition, fetchOffsetMap.get(topicAndPartition)),
t)
+          }
         }
         if (isMessageInAllReplicas) {
           val nextOffset = messageInfoFromFirstReplicaOpt.get.nextOffset


Mime
View raw message