kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-1178 Replica fetcher thread dies while becoming follower; reviewed by Jun Rao and Guozhang Wang
Date Wed, 11 Dec 2013 19:18:29 GMT
Updated Branches:
  refs/heads/trunk 32aae7202 -> 6bc290f66


KAFKA-1178 Replica fetcher thread dies while becoming follower; reviewed by Jun Rao and Guozhang
Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6bc290f6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6bc290f6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6bc290f6

Branch: refs/heads/trunk
Commit: 6bc290f66d738a9133afe93e2a6ac64f1344c111
Parents: 32aae72
Author: Neha Narkhede <neha.narkhede@gmail.com>
Authored: Wed Dec 11 11:18:24 2013 -0800
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Wed Dec 11 11:18:24 2013 -0800

----------------------------------------------------------------------
 .../kafka/admin/ReassignPartitionsCommand.scala |  2 +-
 .../main/scala/kafka/cluster/Partition.scala    |  2 +-
 .../kafka/controller/KafkaController.scala      | 17 +++++++-----
 .../kafka/server/AbstractFetcherManager.scala   |  4 +--
 .../scala/kafka/server/ReplicaManager.scala     | 28 +++++++++++++-------
 .../test/scala/unit/kafka/admin/AdminTest.scala | 20 ++++++++++++--
 6 files changed, 51 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6bc290f6/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 65c04ed..2637586 100644
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -122,7 +122,7 @@ object ReassignPartitionsCommand extends Logging {
       .format(ZkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment)))
     // start the reassignment
     if(reassignPartitionsCommand.reassignPartitions())
-      println("Successfully started reassignment of partitions %s".format(partitionsToBeReassigned))
+      println("Successfully started reassignment of partitions %s".format(ZkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned)))
     else
       println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6bc290f6/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 13f48ba..5c9307d 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -72,7 +72,7 @@ class Partition(val topic: String,
     leaderIsrUpdateLock synchronized {
       leaderReplicaIfLocal() match {
         case Some(_) =>
-          inSyncReplicas.size < replicationFactor
+          inSyncReplicas.size < assignedReplicas.size
         case None =>
           false
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6bc290f6/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index fd92c65..a1f7ff4 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -361,12 +361,19 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient)
extends Logg
    * Reassigning replicas for a partition goes through a few stages -
    * RAR = Reassigned replicas
    * AR = Original list of replicas for partition
-   * 1. Write new AR = AR + RAR
+   * 1. Write new AR = AR + RAR. At this time, update the leader epoch in zookeeper and send
a LeaderAndIsr request with
+   *    AR = AR + RAR to all replicas in (AR + RAR)
    * 2. Start new replicas RAR - AR.
    * 3. Wait until new replicas are in sync with the leader
-   * 4. If the leader is not in RAR, elect a new leader from RAR
-   * 5. Stop old replicas AR - RAR
-   * 6. Write new AR = RAR
+   * 4. If the leader is not in RAR, elect a new leader from RAR. If new leader needs to
be elected from RAR, a LeaderAndIsr
+   *    will be sent. If not, then leader epoch will be incremented in zookeeper and a LeaderAndIsr
request will be sent.
+   *    In any case, the LeaderAndIsr request will have AR = RAR. This will prevent the leader
from adding any replica in
+   *    RAR - AR back in the ISR
+   * 5. Stop old replicas AR - RAR. As part of this, we make 2 state changes OfflineReplica
and NonExistentReplica. As part
+   *    of OfflineReplica state change, we shrink the ISR to remove RAR - AR in zookeeper
and sent a LeaderAndIsr ONLY to
+   *    the Leader to notify it of the shrunk ISR. After that, we send a StopReplica (delete
= false) to the replicas in
+   *    RAR - AR. Currently, NonExistentReplica state change is a NO-OP
+   * 6. Write new AR = RAR. As part of this, we finally change the AR in zookeeper to RAR.
    * 7. Remove partition from the /admin/reassign_partitions path
    */
   def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext:
ReassignedPartitionsContext) {
@@ -677,8 +684,6 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends
Logg
     brokerRequestBatch.newBatch()
     updateLeaderEpoch(topicAndPartition.topic, topicAndPartition.partition) match {
       case Some(updatedLeaderIsrAndControllerEpoch) =>
-        // send the shrunk assigned replica list to all the replicas, including the leader,
so that it no longer
-        // allows old replicas to enter ISR
         brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest, topicAndPartition.topic,
           topicAndPartition.partition, updatedLeaderIsrAndControllerEpoch, newAssignedReplicas)
         brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch, controllerContext.correlationId.getAndIncrement)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6bc290f6/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index 394e981..9390edf 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -20,7 +20,7 @@ package kafka.server
 import scala.collection.mutable
 import scala.collection.Set
 import scala.collection.Map
-import kafka.utils.Logging
+import kafka.utils.{Utils, Logging}
 import kafka.cluster.Broker
 import kafka.metrics.KafkaMetricsGroup
 import kafka.common.TopicAndPartition
@@ -63,7 +63,7 @@ abstract class AbstractFetcherManager(protected val name: String, metricPrefix:
   )
 
   private def getFetcherId(topic: String, partitionId: Int) : Int = {
-    (31 * topic.hashCode() + partitionId) % numFetchers
+    Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers
   }
 
   // to be defined in subclass to create a specific fetcher

http://git-wip-us.apache.org/repos/asf/kafka/blob/6bc290f6/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index b0a3962..242c18d 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -77,16 +77,17 @@ class ReplicaManager(val config: KafkaConfig,
   newGauge(
     "UnderReplicatedPartitions",
     new Gauge[Int] {
-      def value = {
-        leaderPartitionsLock synchronized {
-          leaderPartitions.count(_.isUnderReplicated)
-        }
-      }
+      def value = underReplicatedPartitionCount()
     }
   )
   val isrExpandRate = newMeter("IsrExpandsPerSec",  "expands", TimeUnit.SECONDS)
   val isrShrinkRate = newMeter("IsrShrinksPerSec",  "shrinks", TimeUnit.SECONDS)
 
+  def underReplicatedPartitionCount(): Int = {
+    leaderPartitionsLock synchronized {
+      leaderPartitions.count(_.isUnderReplicated)
+    }
+  }
 
   def startHighWaterMarksCheckPointThread() = {
     if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true))
@@ -206,7 +207,7 @@ class ReplicaManager(val config: KafkaConfig,
   def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): (collection.Map[(String,
Int), Short], Short) = {
     leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo)
=>
       stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id
%d from controller %d epoch %d for partition [%s,%d]"
-                                .format(localBrokerId, stateInfo.leaderIsrAndControllerEpoch,
leaderAndISRRequest.correlationId,
+                                .format(localBrokerId, stateInfo, leaderAndISRRequest.correlationId,
                                         leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch,
topic, partition))
     }
     replicaStateChangeLock synchronized {
@@ -228,10 +229,17 @@ class ReplicaManager(val config: KafkaConfig,
         leaderAndISRRequest.partitionStateInfos.foreach{ case ((topic, partitionId), partitionStateInfo)
=>
           val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
           val partitionLeaderEpoch = partition.getLeaderEpoch()
+          // If the leader epoch is valid record the epoch of the controller that made the
leadership decision.
+          // This is useful while updating the isr to maintain the decision maker controller's
epoch in the zookeeper path
           if (partitionLeaderEpoch < partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch)
{
-            // If the leader epoch is valid record the epoch of the controller that made
the leadership decision.
-            // This is useful while updating the isr to maintain the decision maker controller's
epoch in the zookeeper path
-            partitionState.put(partition, partitionStateInfo)
+            if(partitionStateInfo.allReplicas.contains(config.brokerId))
+              partitionState.put(partition, partitionStateInfo)
+            else {
+              stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request with correlation
id %d from " +
+                "controller %d epoch %d as broker is not in assigned replica list %s for
partition [%s,%d]")
+                .format(localBrokerId, correlationId, controllerId, leaderAndISRRequest.controllerEpoch,
+                partitionStateInfo.allReplicas.mkString(","), topic, partition.partitionId))
+            }
           } else {
             // Otherwise record the error code in response
             stateChangeLogger.warn(("Broker %d received invalid LeaderAndIsr request with
correlation id %d from " +
@@ -303,7 +311,7 @@ class ReplicaManager(val config: KafkaConfig,
       case e: Throwable =>
         partitionState.foreach { state =>
           val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId
%d received from controller %d" +
-            "epoch %d for partition %s").format(localBrokerId, correlationId, controllerId,
epoch,
+            " epoch %d for partition %s").format(localBrokerId, correlationId, controllerId,
epoch,
                                                 TopicAndPartition(state._1.topic, state._1.partitionId))
           stateChangeLogger.error(errorMsg, e)
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6bc290f6/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 86f88f5..52d35a3 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -23,7 +23,7 @@ import java.util.Properties
 import kafka.utils._
 import kafka.log._
 import kafka.zk.ZooKeeperTestHarness
-import kafka.server.KafkaConfig
+import kafka.server.{KafkaServer, KafkaConfig}
 import kafka.utils.{Logging, ZkUtils, TestUtils}
 import kafka.common.{TopicExistsException, ErrorMapping, TopicAndPartition}
 
@@ -153,10 +153,10 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging
{
       Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted;
     }, 1000)
     val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
-    val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionToBeReassigned)
     // in sync replicas should not have any replica that is not in the new assigned replicas
     checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas)
     assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas)
+    ensureNoUnderReplicatedPartitions(topic, partitionToBeReassigned, assignedReplicas, servers)
     servers.foreach(_.shutdown())
   }
 
@@ -183,6 +183,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging
{
     val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
     assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas)
     checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas)
+    ensureNoUnderReplicatedPartitions(topic, partitionToBeReassigned, assignedReplicas, servers)
     servers.foreach(_.shutdown())
   }
 
@@ -209,6 +210,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging
{
     val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
     assertEquals("Partition should have been reassigned to 2, 3", newReplicas, assignedReplicas)
     checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas)
+    ensureNoUnderReplicatedPartitions(topic, partitionToBeReassigned, assignedReplicas, servers)
     servers.foreach(_.shutdown())
   }
 
@@ -247,6 +249,8 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging
{
     val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
     assertEquals("Partition should have been reassigned to 0, 1", newReplicas, assignedReplicas)
     checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas)
+    // ensure that there are no under replicated partitions
+    ensureNoUnderReplicatedPartitions(topic, partitionToBeReassigned, assignedReplicas, servers)
     servers.foreach(_.shutdown())
   }
 
@@ -382,6 +386,18 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging
{
       phantomInSyncReplicas.size == 0)
   }
 
+  private def ensureNoUnderReplicatedPartitions(topic: String, partitionToBeReassigned: Int,
assignedReplicas: Seq[Int],
+                                                servers: Seq[KafkaServer]) {
+    val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionToBeReassigned)
+    assertFalse("Reassigned partition [%s,%d] is underreplicated".format(topic, partitionToBeReassigned),
+                inSyncReplicas.size < assignedReplicas.size)
+    val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitionToBeReassigned)
+    assertTrue("Reassigned partition [%s,%d] is unavailable".format(topic, partitionToBeReassigned),
leader.isDefined)
+    val leaderBroker = servers.filter(s => s.config.brokerId == leader.get).head
+    assertTrue("Reassigned partition [%s,%d] is underreplicated as reported by the leader
%d".format(topic, partitionToBeReassigned, leader.get),
+      leaderBroker.replicaManager.underReplicatedPartitionCount() == 0)
+  }
+
   private def checkIfReassignPartitionPathExists(): Boolean = {
     ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath)
   }


Mime
View raw message