kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-1382 follow up patch; reviewed by Neha Narkhede
Date Thu, 19 Jun 2014 04:13:46 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 548d1ba09 -> dd048d8fe


KAFKA-1382 follow up patch; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dd048d8f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dd048d8f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dd048d8f

Branch: refs/heads/trunk
Commit: dd048d8fee60db7282076a71eeeb8a1d0a4381d9
Parents: 548d1ba
Author: Sriharsha Chintalapani <schintalapani@hortonworks.com>
Authored: Wed Jun 18 21:13:41 2014 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Wed Jun 18 21:13:48 2014 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/cluster/Partition.scala    |  35 +++----
 .../kafka/controller/KafkaController.scala      |  31 ++----
 .../controller/PartitionStateMachine.scala      |   9 +-
 .../kafka/controller/ReplicaStateMachine.scala  |   6 +-
 .../scala/kafka/utils/ReplicationUtils.scala    |  39 ++++---
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  38 +++----
 .../unit/kafka/utils/ReplicationUtilsTest.scala | 103 ++++++++-----------
 7 files changed, 110 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dd048d8f/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index a9c0465..f2ca856 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -18,7 +18,7 @@ package kafka.cluster
 
 import kafka.common._
 import kafka.admin.AdminUtils
-import kafka.utils.{ZkUtils, ReplicationUtils, Pool, Time, Logging}
+import kafka.utils.{ReplicationUtils, Pool, Time, Logging}
 import kafka.utils.Utils.inLock
 import kafka.api.{PartitionStateInfo, LeaderAndIsr}
 import kafka.log.LogConfig
@@ -261,15 +261,7 @@ class Partition(val topic: String,
             info("Expanding ISR for partition [%s,%d] from %s to %s"
                  .format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","),
newInSyncReplicas.map(_.brokerId).mkString(",")))
             // update ISR in ZK and cache
-            val (updateSucceeded,newVersion) = ReplicationUtils.updateIsr(zkClient, topic,
partitionId, localBrokerId,
-              leaderEpoch, controllerEpoch, zkVersion, newInSyncReplicas)
-            if(updateSucceeded) {
-              inSyncReplicas = newInSyncReplicas
-              zkVersion = newVersion
-              trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newInSyncReplicas.mkString(","),
zkVersion))
-            } else {
-              info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion))
-            }
+            updateIsr(newInSyncReplicas)
             replicaManager.isrExpandRate.mark()
           }
           maybeIncrementLeaderHW(leaderReplica)
@@ -333,15 +325,7 @@ class Partition(val topic: String,
             info("Shrinking ISR for partition [%s,%d] from %s to %s".format(topic, partitionId,
               inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(",")))
             // update ISR in zk and in cache
-            val (updateSucceeded,newVersion) = ReplicationUtils.updateIsr(zkClient, topic,
partitionId, localBrokerId,
-              leaderEpoch, controllerEpoch, zkVersion, newInSyncReplicas)
-            if(updateSucceeded) {
-              inSyncReplicas = newInSyncReplicas
-              zkVersion = newVersion
-              trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newInSyncReplicas.mkString(","),
zkVersion))
-            } else {
-              info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion))
-            }
+            updateIsr(newInSyncReplicas)
             // we may need to increment high watermark since ISR could be down to 1
             maybeIncrementLeaderHW(leaderReplica)
             replicaManager.isrShrinkRate.mark()
@@ -389,6 +373,19 @@ class Partition(val topic: String,
     }
   }
 
+  private def updateIsr(newIsr: Set[Replica]) {
+    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r =>
r.brokerId).toList, zkVersion)
+    val (updateSucceeded,newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topic,
partitionId,
+      newLeaderAndIsr, controllerEpoch, zkVersion)
+    if(updateSucceeded) {
+      inSyncReplicas = newIsr
+      zkVersion = newVersion
+      trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newIsr.mkString(","),
zkVersion))
+    } else {
+      info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion))
+    }
+  }
+
   override def equals(that: Any): Boolean = {
     if(!(that.isInstanceOf[Partition]))
       return false

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd048d8f/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 8af48ab..94bbd33 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -953,7 +953,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val
brokerSt
     var zkWriteCompleteOrUnnecessary = false
     while (!zkWriteCompleteOrUnnecessary) {
       // refresh leader and isr from zookeeper again
-      val leaderIsrAndEpochOpt = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic,
partition)
+      val leaderIsrAndEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient,
topic, partition)
       zkWriteCompleteOrUnnecessary = leaderIsrAndEpochOpt match {
         case Some(leaderIsrAndEpoch) => // increment the leader epoch even if the ISR
changes
           val leaderAndIsr = leaderIsrAndEpoch.leaderAndIsr
@@ -979,13 +979,10 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient,
val brokerSt
             val newLeaderAndIsr = new LeaderAndIsr(newLeader, leaderAndIsr.leaderEpoch +
1,
               newIsr, leaderAndIsr.zkVersion + 1)
             // update the new leadership decision in zookeeper or retry
-            val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(
-              zkClient,
-              ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
-              ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, epoch),
-              leaderAndIsr.zkVersion,Some(ReplicationUtils.checkLeaderAndIsrZkData))
-            newLeaderAndIsr.zkVersion = newVersion
+            val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient,
topic, partition,
+              newLeaderAndIsr, epoch, leaderAndIsr.zkVersion)
 
+            newLeaderAndIsr.zkVersion = newVersion
             finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(newLeaderAndIsr,
epoch))
             controllerContext.partitionLeadershipInfo.put(topicAndPartition, finalLeaderIsrAndControllerEpoch.get)
             if (updateSucceeded)
@@ -1019,7 +1016,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient,
val brokerSt
     var zkWriteCompleteOrUnnecessary = false
     while (!zkWriteCompleteOrUnnecessary) {
       // refresh leader and isr from zookeeper again
-      val leaderIsrAndEpochOpt = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic,
partition)
+      val leaderIsrAndEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient,
topic, partition)
       zkWriteCompleteOrUnnecessary = leaderIsrAndEpochOpt match {
         case Some(leaderIsrAndEpoch) =>
           val leaderAndIsr = leaderIsrAndEpoch.leaderAndIsr
@@ -1033,11 +1030,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient,
val brokerSt
           val newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch
+ 1,
                                                  leaderAndIsr.isr, leaderAndIsr.zkVersion
+ 1)
           // update the new leadership decision in zookeeper or retry
-          val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(
-            zkClient,
-            ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
-            ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, epoch),
-            leaderAndIsr.zkVersion,Some(ReplicationUtils.checkLeaderAndIsrZkData))
+          val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient,
topic,
+            partition, newLeaderAndIsr, epoch, leaderAndIsr.zkVersion)
+
           newLeaderAndIsr.zkVersion = newVersion
           finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(newLeaderAndIsr,
epoch))
           if (updateSucceeded)
@@ -1335,16 +1330,6 @@ case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr,
controlle
     leaderAndIsrInfo.append(",ControllerEpoch:" + controllerEpoch + ")")
     leaderAndIsrInfo.toString()
   }
-
-  override def equals(obj: Any): Boolean = {
-    obj match {
-      case null => false
-      case n: LeaderIsrAndControllerEpoch =>
-        leaderAndIsr.leader == n.leaderAndIsr.leader && leaderAndIsr.isr.sorted ==
n.leaderAndIsr.isr.sorted &&
-        leaderAndIsr.leaderEpoch == n.leaderAndIsr.leaderEpoch && controllerEpoch
== n.controllerEpoch
-      case _ => false
-    }
-  }
 }
 
 object ControllerStats extends KafkaMetricsGroup {

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd048d8f/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index e29e470..34c70b6 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -293,7 +293,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging
{
         } catch {
           case e: ZkNodeExistsException =>
             // read the controller epoch
-            val leaderIsrAndEpoch = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topicAndPartition.topic,
+            val leaderIsrAndEpoch = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient,
topicAndPartition.topic,
               topicAndPartition.partition).get
             val failMsg = ("encountered error while changing partition %s's state from New
to Online since LeaderAndIsr path already " +
                            "exists with value %s and controller epoch %d")
@@ -334,9 +334,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging
{
         }
         // elect new leader or throw exception
         val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topicAndPartition, currentLeaderAndIsr)
-        val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
-          ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
-          ZkUtils.leaderAndIsrZkData(leaderAndIsr, controller.epoch), currentLeaderAndIsr.zkVersion,Some(ReplicationUtils.checkLeaderAndIsrZkData))
+        val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient,
topic, partition,
+          leaderAndIsr, controller.epoch, currentLeaderAndIsr.zkVersion)
         newLeaderAndIsr = leaderAndIsr
         newLeaderAndIsr.zkVersion = newVersion
         zookeeperPathUpdateSucceeded = updateSucceeded
@@ -383,7 +382,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging
{
 
   private def getLeaderIsrAndEpochOrThrowException(topic: String, partition: Int): LeaderIsrAndControllerEpoch
= {
     val topicAndPartition = TopicAndPartition(topic, partition)
-    ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) match {
+    ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) match {
       case Some(currentLeaderIsrAndEpoch) => currentLeaderIsrAndEpoch
       case None =>
         val failMsg = "LeaderAndIsr information doesn't exist for partition %s in %s state"

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd048d8f/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 2f0f29d..ad9c7c4 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -20,7 +20,7 @@ import collection._
 import collection.JavaConversions._
 import java.util.concurrent.atomic.AtomicBoolean
 import kafka.common.{TopicAndPartition, StateChangeFailedException}
-import kafka.utils.{ZkUtils, Logging}
+import kafka.utils.{ZkUtils, ReplicationUtils, Logging}
 import org.I0Itec.zkclient.IZkChildListener
 import org.apache.log4j.Logger
 import kafka.controller.Callbacks._
@@ -153,7 +153,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging
{
         case NewReplica =>
           assertValidPreviousStates(partitionAndReplica, List(NonExistentReplica), targetState)
           // start replica as a follower to the current leader for its partition
-          val leaderIsrAndControllerEpochOpt = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient,
topic, partition)
+          val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient,
topic, partition)
           leaderIsrAndControllerEpochOpt match {
             case Some(leaderIsrAndControllerEpoch) =>
               if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId)
@@ -367,5 +367,3 @@ case object ReplicaDeletionStarted extends ReplicaState { val state: Byte
= 4}
 case object ReplicaDeletionSuccessful extends ReplicaState { val state: Byte = 5}
 case object ReplicaDeletionIneligible extends ReplicaState { val state: Byte = 6}
 case object NonExistentReplica extends ReplicaState { val state: Byte = 7 }
-
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd048d8f/core/src/main/scala/kafka/utils/ReplicationUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
index eb53837..7157673 100644
--- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala
+++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
@@ -16,7 +16,7 @@
  */
 
 package kafka.utils
-import kafka.cluster.Replica
+
 import kafka.api.LeaderAndIsr
 import kafka.controller.LeaderIsrAndControllerEpoch
 import org.apache.zookeeper.data.Stat
@@ -27,31 +27,27 @@ import scala.collection._
 
 object ReplicationUtils extends Logging {
 
-  def updateIsr(zkClient: ZkClient, topic: String, partitionId: Int, brokerId: Int, leaderEpoch:
Int,
-    controllerEpoch: Int, zkVersion: Int, newIsr: Set[Replica]): (Boolean,Int) = {
-    debug("Updated ISR for partition [%s,%d] to %s".format(topic, partitionId, newIsr.mkString(",")))
-    val newLeaderAndIsr = new LeaderAndIsr(brokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList,
zkVersion)
+  def updateLeaderAndIsr(zkClient: ZkClient, topic: String, partitionId: Int, newLeaderAndIsr:
LeaderAndIsr, controllerEpoch: Int,
+    zkVersion: Int): (Boolean,Int) = {
+    debug("Updated ISR for partition [%s,%d] to %s".format(topic, partitionId, newLeaderAndIsr.isr.mkString(",")))
     val path = ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId)
     val newLeaderData = ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch)
     // use the epoch of the controller that made the leadership decision, instead of the
current controller epoch
-    ZkUtils.conditionalUpdatePersistentPath(zkClient, path, newLeaderData, zkVersion,
-      Some(checkLeaderAndIsrZkData))
+    ZkUtils.conditionalUpdatePersistentPath(zkClient, path, newLeaderData, zkVersion, Some(checkLeaderAndIsrZkData))
   }
 
-  def checkLeaderAndIsrZkData(zkClient: ZkClient, path: String,newLeaderData: String, zkVersion:
Int): (Boolean,Int) = {
+  def checkLeaderAndIsrZkData(zkClient: ZkClient, path: String, expectedLeaderAndIsrInfo:
String): (Boolean,Int) = {
     try {
-      val newLeaderStat: Stat = new Stat()
-      newLeaderStat.setVersion(zkVersion)
-      val newLeader = parseLeaderAndIsr(newLeaderData, path, newLeaderStat)
-      val writtenLeaderAndIsrInfo = ZkUtils.readDataMaybeNull(zkClient,path)
+      val writtenLeaderAndIsrInfo = ZkUtils.readDataMaybeNull(zkClient, path)
       val writtenLeaderOpt = writtenLeaderAndIsrInfo._1
       val writtenStat = writtenLeaderAndIsrInfo._2
+      val expectedLeader = parseLeaderAndIsr(expectedLeaderAndIsrInfo, path, writtenStat)
       writtenLeaderOpt match {
         case Some(writtenData) =>
           val writtenLeader = parseLeaderAndIsr(writtenData, path, writtenStat)
-            (newLeader,writtenLeader) match {
-            case (Some(newLeader),Some(writtenLeader)) =>
-              if(newLeader.equals(writtenLeader))
+          (expectedLeader,writtenLeader) match {
+            case (Some(expectedLeader),Some(writtenLeader)) =>
+              if(expectedLeader == writtenLeader)
                 return (true,writtenStat.getVersion())
             case _ =>
           }
@@ -63,7 +59,18 @@ object ReplicationUtils extends Logging {
     (false,-1)
   }
 
-  def parseLeaderAndIsr(leaderAndIsrStr: String, path: String, stat: Stat)
+  def getLeaderIsrAndEpochForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderIsrAndControllerEpoch]
= {
+    val leaderAndIsrPath = ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition)
+    val leaderAndIsrInfo = ZkUtils.readDataMaybeNull(zkClient, leaderAndIsrPath)
+    val leaderAndIsrOpt = leaderAndIsrInfo._1
+    val stat = leaderAndIsrInfo._2
+    leaderAndIsrOpt match {
+      case Some(leaderAndIsrStr) => parseLeaderAndIsr(leaderAndIsrStr, leaderAndIsrPath,
stat)
+      case None => None
+    }
+  }
+
+  private def parseLeaderAndIsr(leaderAndIsrStr: String, path: String, stat: Stat)
       : Option[LeaderIsrAndControllerEpoch] = {
     Json.parseFull(leaderAndIsrStr) match {
       case Some(m) =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd048d8f/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 1a23eb4..dcdc1ce 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -19,24 +19,20 @@ package kafka.utils
 
 import kafka.cluster.{Broker, Cluster}
 import kafka.consumer.TopicCount
-import org.I0Itec.zkclient.{IZkDataListener, ZkClient}
+import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException,
   ZkMarshallingError, ZkBadVersionException}
 import org.I0Itec.zkclient.serialize.ZkSerializer
 import collection._
 import kafka.api.LeaderAndIsr
-import mutable.ListBuffer
 import org.apache.zookeeper.data.Stat
-import java.util.concurrent.locks.{ReentrantLock, Condition}
 import kafka.admin._
 import kafka.common.{KafkaException, NoEpochForPartitionException}
 import kafka.controller.ReassignedPartitionsContext
-import kafka.controller.PartitionAndReplica
 import kafka.controller.KafkaController
-import scala.{collection, Some}
+import scala.Some
 import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.common.TopicAndPartition
-import kafka.utils.Utils.inLock
 import scala.collection
 
 object ZkUtils extends Logging {
@@ -86,19 +82,8 @@ object ZkUtils extends Logging {
     brokerIds.map(_.toInt).map(getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
   }
 
-  def getLeaderIsrAndEpochForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderIsrAndControllerEpoch]
= {
-    val leaderAndIsrPath = getTopicPartitionLeaderAndIsrPath(topic, partition)
-    val leaderAndIsrInfo = readDataMaybeNull(zkClient, leaderAndIsrPath)
-    val leaderAndIsrOpt = leaderAndIsrInfo._1
-    val stat = leaderAndIsrInfo._2
-    leaderAndIsrOpt match {
-      case Some(leaderAndIsrStr) => ReplicationUtils.parseLeaderAndIsr(leaderAndIsrStr,
leaderAndIsrPath, stat)
-      case None => None
-    }
-  }
-
   def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndIsr]
= {
-    getLeaderIsrAndEpochForPartition(zkClient, topic, partition).map(_.leaderAndIsr)
+    ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition).map(_.leaderAndIsr)
   }
 
   def setupCommonPaths(zkClient: ZkClient) {
@@ -363,26 +348,29 @@ object ZkUtils extends Logging {
   /**
    * Conditional update the persistent path data, return (true, newVersion) if it succeeds,
otherwise (the path doesn't
    * exist, the current version is not the expected version, etc.) return (false, -1)
+   *
+   * When there is a ConnectionLossException during the conditional update, zkClient will
retry the update and may fail
+   * since the previous update may have succeeded (but the stored zkVersion no longer matches
the expected one).
+   * In this case, we will run the optionalChecker to further check if the previous write
did indeed succeeded.
    */
   def conditionalUpdatePersistentPath(client: ZkClient, path: String, data: String, expectVersion:
Int,
-    optionalChecker:Option[(ZkClient, String,String,Int) => (Boolean,Int)] = None): (Boolean,
Int) = {
+    optionalChecker:Option[(ZkClient, String, String) => (Boolean,Int)] = None): (Boolean,
Int) = {
     try {
       val stat = client.writeDataReturnStat(path, data, expectVersion)
       debug("Conditional update of path %s with value %s and expected version %d succeeded,
returning the new version: %d"
         .format(path, data, expectVersion, stat.getVersion))
       (true, stat.getVersion)
     } catch {
-      case e1: ZkBadVersionException => {
+      case e1: ZkBadVersionException =>
         optionalChecker match {
-          case Some(checker) => return checker(client,path,data,expectVersion)
+          case Some(checker) => return checker(client, path, data)
           case _ => debug("Checker method is not passed skipping zkData match")
         }
-        error("Conditional update of path %s with data %s and expected version %d failed
due to %s".format(path, data,
+        warn("Conditional update of path %s with data %s and expected version %d failed due
to %s".format(path, data,
           expectVersion, e1.getMessage))
         (false, -1)
-      }
       case e2: Exception =>
-        error("Conditional update of path %s with data %s and expected version %d failed
due to %s".format(path, data,
+        warn("Conditional update of path %s with data %s and expected version %d failed due
to %s".format(path, data,
           expectVersion, e2.getMessage))
         (false, -1)
     }
@@ -512,7 +500,7 @@ object ZkUtils extends Logging {
   : mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = {
     val ret = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
     for(topicAndPartition <- topicAndPartitions) {
-      ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topicAndPartition.topic, topicAndPartition.partition)
match {
+      ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topicAndPartition.topic,
topicAndPartition.partition) match {
         case Some(leaderIsrAndControllerEpoch) => ret.put(topicAndPartition, leaderIsrAndControllerEpoch)
         case None =>
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd048d8f/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
index f364980..84e0855 100644
--- a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
@@ -17,22 +17,17 @@
 
 package kafka.utils
 
-import kafka.cluster.{Replica, Partition}
 import kafka.server.{ReplicaFetcherManager, KafkaConfig}
-import kafka.utils.TestUtils._
+import kafka.api.LeaderAndIsr
 import kafka.zk.ZooKeeperTestHarness
-import kafka.log.Log
 import kafka.common.TopicAndPartition
 import org.scalatest.junit.JUnit3Suite
 import org.junit.Assert._
 import org.junit.Test
-import org.I0Itec.zkclient.ZkClient
 import org.easymock.EasyMock
-import org.apache.log4j.Logger
 
 
 class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness {
-  private val logger = Logger.getLogger(classOf[UtilsTest])
   val topic = "my-topic-test"
   val partitionId = 0
   val brokerId = 1
@@ -43,7 +38,7 @@ class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness
{
   val topicData = Json.encode(Map("controller_epoch" -> 1, "leader" -> 1,
     "versions" -> 1, "leader_epoch" -> 1,"isr" -> List(1,2)))
   val topicDataVersionMismatch = Json.encode(Map("controller_epoch" -> 1, "leader" ->
1,
-    "versions" -> 2, "leader_epoch" -> 1,"isr" -> List(2,1)))
+    "versions" -> 2, "leader_epoch" -> 1,"isr" -> List(1,2)))
   val topicDataMismatch = Json.encode(Map("controller_epoch" -> 1, "leader" -> 1,
     "versions" -> 2, "leader_epoch" -> 2,"isr" -> List(1,2)))
 
@@ -53,58 +48,48 @@ class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness
{
     ZkUtils.createPersistentPath(zkClient,topicPath,topicData)
   }
 
-  def testCheckLeaderAndIsrZkData() {
-    //mismatched zkVersion with the same data
-    val(dataMatched1,newZkVersion1) = ReplicationUtils.checkLeaderAndIsrZkData(zkClient,topicPath,topicDataVersionMismatch,1)
-    assertTrue(dataMatched1)
-    assertEquals(newZkVersion1,0)
-
-    //mismatched zkVersion and leaderEpoch
-    val(dataMatched2,newZkVersion2) = ReplicationUtils.checkLeaderAndIsrZkData(zkClient,topicPath,topicDataMismatch,1)
-    assertFalse(dataMatched2)
-    assertEquals(newZkVersion2,-1)
+  @Test
+  def testUpdateLeaderAndIsr() {
+    val configs = TestUtils.createBrokerConfigs(1).map(new KafkaConfig(_))
+    val log = EasyMock.createMock(classOf[kafka.log.Log])
+    EasyMock.expect(log.logEndOffset).andReturn(20).anyTimes()
+    EasyMock.expect(log)
+    EasyMock.replay(log)
+
+    val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
+    EasyMock.expect(logManager.getLog(TopicAndPartition(topic, partitionId))).andReturn(Some(log)).anyTimes()
+    EasyMock.replay(logManager)
+
+    val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager])
+    EasyMock.expect(replicaManager.config).andReturn(configs.head)
+    EasyMock.expect(replicaManager.logManager).andReturn(logManager)
+    EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager]))
+    EasyMock.expect(replicaManager.zkClient).andReturn(zkClient)
+    EasyMock.replay(replicaManager)
+
+    val replicas = List(0,1)
+
+    // regular update
+    val newLeaderAndIsr1 = new LeaderAndIsr(brokerId, leaderEpoch, replicas, 0)
+    val (updateSucceeded1,newZkVersion1) = ReplicationUtils.updateLeaderAndIsr(zkClient,
+      "my-topic-test", partitionId, newLeaderAndIsr1, controllerEpoch, 0)
+    assertTrue(updateSucceeded1)
+    assertEquals(newZkVersion1, 1)
+
+    // mismatched zkVersion with the same data
+    val newLeaderAndIsr2 = new LeaderAndIsr(brokerId, leaderEpoch, replicas, zkVersion +
1)
+    val (updateSucceeded2,newZkVersion2) = ReplicationUtils.updateLeaderAndIsr(zkClient,
+      "my-topic-test", partitionId, newLeaderAndIsr2, controllerEpoch, zkVersion + 1)
+    assertTrue(updateSucceeded2)
+    // returns true with existing zkVersion
+    assertEquals(newZkVersion2,1)
+
+    // mismatched zkVersion and leaderEpoch
+    val newLeaderAndIsr3 = new LeaderAndIsr(brokerId, leaderEpoch + 1, replicas, zkVersion
+ 1)
+    val (updateSucceeded3,newZkVersion3) = ReplicationUtils.updateLeaderAndIsr(zkClient,
+      "my-topic-test", partitionId, newLeaderAndIsr3, controllerEpoch, zkVersion + 1)
+    assertFalse(updateSucceeded3)
+    assertEquals(newZkVersion3,-1)
   }
 
- def testUpdateIsr() {
-   val configs = TestUtils.createBrokerConfigs(1).map(new KafkaConfig(_))
-
-   val log = EasyMock.createMock(classOf[kafka.log.Log])
-   EasyMock.expect(log.logEndOffset).andReturn(20).anyTimes()
-   EasyMock.expect(log)
-   EasyMock.replay(log)
-
-   val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
-   EasyMock.expect(logManager.getLog(TopicAndPartition(topic, partitionId))).andReturn(Some(log)).anyTimes()
-   EasyMock.replay(logManager)
-
-   val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager])
-   EasyMock.expect(replicaManager.config).andReturn(configs.head)
-   EasyMock.expect(replicaManager.logManager).andReturn(logManager)
-   EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager]))
-   EasyMock.expect(replicaManager.zkClient).andReturn(zkClient)
-   EasyMock.replay(replicaManager)
-
-   val partition = new Partition(topic,0,1,new MockTime,replicaManager)
-   val replicas = Set(new Replica(1,partition),new Replica(2,partition))
-
-   // regular update
-   val (updateSucceeded1,newZkVersion1) = ReplicationUtils.updateIsr(zkClient,
-     "my-topic-test", partitionId, brokerId, leaderEpoch, controllerEpoch, 0, replicas)
-   assertTrue(updateSucceeded1)
-   assertEquals(newZkVersion1,1)
-
-   // mismatched zkVersion with the same data
-   val (updateSucceeded2,newZkVersion2) = ReplicationUtils.updateIsr(zkClient,
-     "my-topic-test", partitionId, brokerId, leaderEpoch, controllerEpoch, zkVersion + 1,
replicas)
-   assertTrue(updateSucceeded2)
-   // returns true with existing zkVersion
-   assertEquals(newZkVersion2,1)
-
-   // mismatched zkVersion and leaderEpoch
-   val (updateSucceeded3,newZkVersion3) = ReplicationUtils.updateIsr(zkClient,
-     "my-topic-test", partitionId, brokerId, leaderEpoch + 1, controllerEpoch, zkVersion
+ 1, replicas)
-   assertFalse(updateSucceeded3)
-   assertEquals(newZkVersion3,-1)
- }
-
 }


Mime
View raw message