kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-1382. Update zkVersion on partition state update failures; reviewed by Neha Narkhede
Date Tue, 10 Jun 2014 21:55:11 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk b04a3be54 -> dee16451e


KAFKA-1382. Update zkVersion on partition state update failures; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dee16451
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dee16451
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dee16451

Branch: refs/heads/trunk
Commit: dee16451e04a4bb905a0479e08fa1f0848fea4fb
Parents: b04a3be
Author: Neha Narkhede <neha.narkhede@gmail.com>
Authored: Tue Jun 10 14:54:41 2014 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Tue Jun 10 14:54:58 2014 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/cluster/Partition.scala    |  40 +++----
 .../kafka/controller/KafkaController.scala      |  14 ++-
 .../controller/PartitionStateMachine.scala      |   6 +-
 .../scala/kafka/utils/ReplicationUtils.scala    |  83 ++++++++++++++
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  46 ++++----
 .../unit/kafka/utils/ReplicationUtilsTest.scala | 110 +++++++++++++++++++
 6 files changed, 247 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dee16451/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 518d2df..a9c0465 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -18,7 +18,7 @@ package kafka.cluster
 
 import kafka.common._
 import kafka.admin.AdminUtils
-import kafka.utils.{ZkUtils, Pool, Time, Logging}
+import kafka.utils.{ZkUtils, ReplicationUtils, Pool, Time, Logging}
 import kafka.utils.Utils.inLock
 import kafka.api.{PartitionStateInfo, LeaderAndIsr}
 import kafka.log.LogConfig
@@ -216,7 +216,7 @@ class Partition(val topic: String,
       inSyncReplicas = Set.empty[Replica]
       leaderEpoch = leaderAndIsr.leaderEpoch
       zkVersion = leaderAndIsr.zkVersion
-      
+
       leaderReplicaIdOpt.foreach { leaderReplica =>
         if (topic == OffsetManager.OffsetsTopicName &&
            /* if we are making a leader->follower transition */
@@ -261,7 +261,15 @@ class Partition(val topic: String,
             info("Expanding ISR for partition [%s,%d] from %s to %s"
                  .format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","),
newInSyncReplicas.map(_.brokerId).mkString(",")))
             // update ISR in ZK and cache
-            updateIsr(newInSyncReplicas)
+            val (updateSucceeded,newVersion) = ReplicationUtils.updateIsr(zkClient, topic,
partitionId, localBrokerId,
+              leaderEpoch, controllerEpoch, zkVersion, newInSyncReplicas)
+            if(updateSucceeded) {
+              inSyncReplicas = newInSyncReplicas
+              zkVersion = newVersion
+              trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newInSyncReplicas.mkString(","),
zkVersion))
+            } else {
+              info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion))
+            }
             replicaManager.isrExpandRate.mark()
           }
           maybeIncrementLeaderHW(leaderReplica)
@@ -325,7 +333,15 @@ class Partition(val topic: String,
             info("Shrinking ISR for partition [%s,%d] from %s to %s".format(topic, partitionId,
               inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(",")))
             // update ISR in zk and in cache
-            updateIsr(newInSyncReplicas)
+            val (updateSucceeded,newVersion) = ReplicationUtils.updateIsr(zkClient, topic,
partitionId, localBrokerId,
+              leaderEpoch, controllerEpoch, zkVersion, newInSyncReplicas)
+            if(updateSucceeded) {
+              inSyncReplicas = newInSyncReplicas
+              zkVersion = newVersion
+              trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newInSyncReplicas.mkString(","),
zkVersion))
+            } else {
+              info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion))
+            }
             // we may need to increment high watermark since ISR could be down to 1
             maybeIncrementLeaderHW(leaderReplica)
             replicaManager.isrShrinkRate.mark()
@@ -373,22 +389,6 @@ class Partition(val topic: String,
     }
   }
 
-  private def updateIsr(newIsr: Set[Replica]) {
-    debug("Updated ISR for partition [%s,%d] to %s".format(topic, partitionId, newIsr.mkString(",")))
-    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r =>
r.brokerId).toList, zkVersion)
-    // use the epoch of the controller that made the leadership decision, instead of the
current controller epoch
-    val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
-      ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
-      ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
-    if (updateSucceeded){
-      inSyncReplicas = newIsr
-      zkVersion = newVersion
-      trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newIsr.mkString(","),
zkVersion))
-    } else {
-      info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion))
-    }
-  }
-
   override def equals(that: Any): Boolean = {
     if(!(that.isInstanceOf[Partition]))
       return false

http://git-wip-us.apache.org/repos/asf/kafka/blob/dee16451/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index d0cf5f1..8af48ab 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -983,7 +983,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val
brokerSt
               zkClient,
               ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
               ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, epoch),
-              leaderAndIsr.zkVersion)
+              leaderAndIsr.zkVersion,Some(ReplicationUtils.checkLeaderAndIsrZkData))
             newLeaderAndIsr.zkVersion = newVersion
 
             finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(newLeaderAndIsr,
epoch))
@@ -1037,7 +1037,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient,
val brokerSt
             zkClient,
             ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
             ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, epoch),
-            leaderAndIsr.zkVersion)
+            leaderAndIsr.zkVersion,Some(ReplicationUtils.checkLeaderAndIsrZkData))
           newLeaderAndIsr.zkVersion = newVersion
           finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(newLeaderAndIsr,
epoch))
           if (updateSucceeded)
@@ -1335,6 +1335,16 @@ case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr,
controlle
     leaderAndIsrInfo.append(",ControllerEpoch:" + controllerEpoch + ")")
     leaderAndIsrInfo.toString()
   }
+
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case null => false
+      case n: LeaderIsrAndControllerEpoch =>
+        leaderAndIsr.leader == n.leaderAndIsr.leader && leaderAndIsr.isr.sorted ==
n.leaderAndIsr.isr.sorted &&
+        leaderAndIsr.leaderEpoch == n.leaderAndIsr.leaderEpoch && controllerEpoch
== n.controllerEpoch
+      case _ => false
+    }
+  }
 }
 
 object ControllerStats extends KafkaMetricsGroup {

http://git-wip-us.apache.org/repos/asf/kafka/blob/dee16451/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 6457b56..e29e470 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -22,7 +22,7 @@ import collection.mutable.Buffer
 import java.util.concurrent.atomic.AtomicBoolean
 import kafka.api.LeaderAndIsr
 import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException,
NoReplicaOnlineException}
-import kafka.utils.{Logging, ZkUtils}
+import kafka.utils.{Logging, ZkUtils, ReplicationUtils}
 import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener}
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import org.apache.log4j.Logger
@@ -336,7 +336,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging
{
         val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topicAndPartition, currentLeaderAndIsr)
         val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
           ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
-          ZkUtils.leaderAndIsrZkData(leaderAndIsr, controller.epoch), currentLeaderAndIsr.zkVersion)
+          ZkUtils.leaderAndIsrZkData(leaderAndIsr, controller.epoch), currentLeaderAndIsr.zkVersion,Some(ReplicationUtils.checkLeaderAndIsrZkData))
         newLeaderAndIsr = leaderAndIsr
         newLeaderAndIsr.zkVersion = newVersion
         zookeeperPathUpdateSucceeded = updateSucceeded
@@ -521,5 +521,3 @@ case object NewPartition extends PartitionState { val state: Byte = 0
}
 case object OnlinePartition extends PartitionState { val state: Byte = 1 }
 case object OfflinePartition extends PartitionState { val state: Byte = 2 }
 case object NonExistentPartition extends PartitionState { val state: Byte = 3 }
-
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/dee16451/core/src/main/scala/kafka/utils/ReplicationUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
new file mode 100644
index 0000000..eb53837
--- /dev/null
+++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+import kafka.cluster.Replica
+import kafka.api.LeaderAndIsr
+import kafka.controller.LeaderIsrAndControllerEpoch
+import org.apache.zookeeper.data.Stat
+import org.I0Itec.zkclient.ZkClient
+
+import scala.Some
+import scala.collection._
+
+object ReplicationUtils extends Logging {
+
+  def updateIsr(zkClient: ZkClient, topic: String, partitionId: Int, brokerId: Int, leaderEpoch:
Int,
+    controllerEpoch: Int, zkVersion: Int, newIsr: Set[Replica]): (Boolean,Int) = {
+    debug("Updated ISR for partition [%s,%d] to %s".format(topic, partitionId, newIsr.mkString(",")))
+    val newLeaderAndIsr = new LeaderAndIsr(brokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList,
zkVersion)
+    val path = ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId)
+    val newLeaderData = ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch)
+    // use the epoch of the controller that made the leadership decision, instead of the
current controller epoch
+    ZkUtils.conditionalUpdatePersistentPath(zkClient, path, newLeaderData, zkVersion,
+      Some(checkLeaderAndIsrZkData))
+  }
+
+  def checkLeaderAndIsrZkData(zkClient: ZkClient, path: String,newLeaderData: String, zkVersion:
Int): (Boolean,Int) = {
+    try {
+      val newLeaderStat: Stat = new Stat()
+      newLeaderStat.setVersion(zkVersion)
+      val newLeader = parseLeaderAndIsr(newLeaderData, path, newLeaderStat)
+      val writtenLeaderAndIsrInfo = ZkUtils.readDataMaybeNull(zkClient,path)
+      val writtenLeaderOpt = writtenLeaderAndIsrInfo._1
+      val writtenStat = writtenLeaderAndIsrInfo._2
+      writtenLeaderOpt match {
+        case Some(writtenData) =>
+          val writtenLeader = parseLeaderAndIsr(writtenData, path, writtenStat)
+            (newLeader,writtenLeader) match {
+            case (Some(newLeader),Some(writtenLeader)) =>
+              if(newLeader.equals(writtenLeader))
+                return (true,writtenStat.getVersion())
+            case _ =>
+          }
+        case None =>
+      }
+    } catch {
+      case e1: Exception =>
+    }
+    (false,-1)
+  }
+
+  def parseLeaderAndIsr(leaderAndIsrStr: String, path: String, stat: Stat)
+      : Option[LeaderIsrAndControllerEpoch] = {
+    Json.parseFull(leaderAndIsrStr) match {
+      case Some(m) =>
+        val leaderIsrAndEpochInfo = m.asInstanceOf[Map[String, Any]]
+        val leader = leaderIsrAndEpochInfo.get("leader").get.asInstanceOf[Int]
+        val epoch = leaderIsrAndEpochInfo.get("leader_epoch").get.asInstanceOf[Int]
+        val isr = leaderIsrAndEpochInfo.get("isr").get.asInstanceOf[List[Int]]
+        val controllerEpoch = leaderIsrAndEpochInfo.get("controller_epoch").get.asInstanceOf[Int]
+        val zkPathVersion = stat.getVersion
+        debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for leaderAndIsrPath %s".format(leader,
epoch,
+          isr.toString(), zkPathVersion, path))
+        Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion),
controllerEpoch))
+      case None => None
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/dee16451/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index fcbe269..1a23eb4 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -20,7 +20,8 @@ package kafka.utils
 import kafka.cluster.{Broker, Cluster}
 import kafka.consumer.TopicCount
 import org.I0Itec.zkclient.{IZkDataListener, ZkClient}
-import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError}
+import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException,
+  ZkMarshallingError, ZkBadVersionException}
 import org.I0Itec.zkclient.serialize.ZkSerializer
 import collection._
 import kafka.api.LeaderAndIsr
@@ -58,7 +59,7 @@ object ZkUtils extends Logging {
     getTopicPath(topic) + "/partitions"
   }
 
-  def getTopicConfigPath(topic: String): String = 
+  def getTopicConfigPath(topic: String): String =
     TopicConfigPath + "/" + topic
 
   def getDeleteTopicPath(topic: String): String =
@@ -91,7 +92,7 @@ object ZkUtils extends Logging {
     val leaderAndIsrOpt = leaderAndIsrInfo._1
     val stat = leaderAndIsrInfo._2
     leaderAndIsrOpt match {
-      case Some(leaderAndIsrStr) => parseLeaderAndIsr(leaderAndIsrStr, topic, partition,
stat)
+      case Some(leaderAndIsrStr) => ReplicationUtils.parseLeaderAndIsr(leaderAndIsrStr,
leaderAndIsrPath, stat)
       case None => None
     }
   }
@@ -99,29 +100,12 @@ object ZkUtils extends Logging {
   def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndIsr]
= {
     getLeaderIsrAndEpochForPartition(zkClient, topic, partition).map(_.leaderAndIsr)
   }
-  
+
   def setupCommonPaths(zkClient: ZkClient) {
     for(path <- Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, TopicConfigChangesPath,
TopicConfigPath, DeleteTopicsPath))
       makeSurePersistentPathExists(zkClient, path)
   }
 
-  def parseLeaderAndIsr(leaderAndIsrStr: String, topic: String, partition: Int, stat: Stat)
-  : Option[LeaderIsrAndControllerEpoch] = {
-    Json.parseFull(leaderAndIsrStr) match {
-      case Some(m) =>
-        val leaderIsrAndEpochInfo = m.asInstanceOf[Map[String, Any]]
-        val leader = leaderIsrAndEpochInfo.get("leader").get.asInstanceOf[Int]
-        val epoch = leaderIsrAndEpochInfo.get("leader_epoch").get.asInstanceOf[Int]
-        val isr = leaderIsrAndEpochInfo.get("isr").get.asInstanceOf[List[Int]]
-        val controllerEpoch = leaderIsrAndEpochInfo.get("controller_epoch").get.asInstanceOf[Int]
-        val zkPathVersion = stat.getVersion
-        debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for partition [%s,%d]".format(leader,
epoch,
-          isr.toString(), zkPathVersion, topic, partition))
-        Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion),
controllerEpoch))
-      case None => None
-    }
-  }
-
   def getLeaderForPartition(zkClient: ZkClient, topic: String, partition: Int): Option[Int]
= {
     val leaderAndIsrOpt = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndIsrPath(topic,
partition))._1
     leaderAndIsrOpt match {
@@ -380,16 +364,26 @@ object ZkUtils extends Logging {
    * Conditional update the persistent path data, return (true, newVersion) if it succeeds,
otherwise (the path doesn't
    * exist, the current version is not the expected version, etc.) return (false, -1)
    */
-  def conditionalUpdatePersistentPath(client: ZkClient, path: String, data: String, expectVersion:
Int): (Boolean, Int) = {
+  def conditionalUpdatePersistentPath(client: ZkClient, path: String, data: String, expectVersion:
Int,
+    optionalChecker:Option[(ZkClient, String,String,Int) => (Boolean,Int)] = None): (Boolean,
Int) = {
     try {
       val stat = client.writeDataReturnStat(path, data, expectVersion)
       debug("Conditional update of path %s with value %s and expected version %d succeeded,
returning the new version: %d"
         .format(path, data, expectVersion, stat.getVersion))
       (true, stat.getVersion)
     } catch {
-      case e: Exception =>
+      case e1: ZkBadVersionException => {
+        optionalChecker match {
+          case Some(checker) => return checker(client,path,data,expectVersion)
+          case _ => debug("Checker method is not passed skipping zkData match")
+        }
         error("Conditional update of path %s with data %s and expected version %d failed
due to %s".format(path, data,
-          expectVersion, e.getMessage))
+          expectVersion, e1.getMessage))
+        (false, -1)
+      }
+      case e2: Exception =>
+        error("Conditional update of path %s with data %s and expected version %d failed
due to %s".format(path, data,
+          expectVersion, e2.getMessage))
         (false, -1)
     }
   }
@@ -428,7 +422,7 @@ object ZkUtils extends Logging {
       case e2: Throwable => throw e2
     }
   }
-  
+
   def deletePath(client: ZkClient, path: String): Boolean = {
     try {
       client.delete(path)
@@ -451,7 +445,7 @@ object ZkUtils extends Logging {
       case e2: Throwable => throw e2
     }
   }
-  
+
   def maybeDeletePath(zkUrl: String, dir: String) {
     try {
       val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/dee16451/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
new file mode 100644
index 0000000..f364980
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import kafka.cluster.{Replica, Partition}
+import kafka.server.{ReplicaFetcherManager, KafkaConfig}
+import kafka.utils.TestUtils._
+import kafka.zk.ZooKeeperTestHarness
+import kafka.log.Log
+import kafka.common.TopicAndPartition
+import org.scalatest.junit.JUnit3Suite
+import org.junit.Assert._
+import org.junit.Test
+import org.I0Itec.zkclient.ZkClient
+import org.easymock.EasyMock
+import org.apache.log4j.Logger
+
+
+class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness {
+  private val logger = Logger.getLogger(classOf[UtilsTest])
+  val topic = "my-topic-test"
+  val partitionId = 0
+  val brokerId = 1
+  val leaderEpoch = 1
+  val controllerEpoch = 1
+  val zkVersion = 1
+  val topicPath = "/brokers/topics/my-topic-test/partitions/0/state"
+  val topicData = Json.encode(Map("controller_epoch" -> 1, "leader" -> 1,
+    "versions" -> 1, "leader_epoch" -> 1,"isr" -> List(1,2)))
+  val topicDataVersionMismatch = Json.encode(Map("controller_epoch" -> 1, "leader" ->
1,
+    "versions" -> 2, "leader_epoch" -> 1,"isr" -> List(2,1)))
+  val topicDataMismatch = Json.encode(Map("controller_epoch" -> 1, "leader" -> 1,
+    "versions" -> 2, "leader_epoch" -> 2,"isr" -> List(1,2)))
+
+
+  override def setUp() {
+    super.setUp()
+    ZkUtils.createPersistentPath(zkClient,topicPath,topicData)
+  }
+
+  def testCheckLeaderAndIsrZkData() {
+    //mismatched zkVersion with the same data
+    val(dataMatched1,newZkVersion1) = ReplicationUtils.checkLeaderAndIsrZkData(zkClient,topicPath,topicDataVersionMismatch,1)
+    assertTrue(dataMatched1)
+    assertEquals(newZkVersion1,0)
+
+    //mismatched zkVersion and leaderEpoch
+    val(dataMatched2,newZkVersion2) = ReplicationUtils.checkLeaderAndIsrZkData(zkClient,topicPath,topicDataMismatch,1)
+    assertFalse(dataMatched2)
+    assertEquals(newZkVersion2,-1)
+  }
+
+ def testUpdateIsr() {
+   val configs = TestUtils.createBrokerConfigs(1).map(new KafkaConfig(_))
+
+   val log = EasyMock.createMock(classOf[kafka.log.Log])
+   EasyMock.expect(log.logEndOffset).andReturn(20).anyTimes()
+   EasyMock.expect(log)
+   EasyMock.replay(log)
+
+   val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
+   EasyMock.expect(logManager.getLog(TopicAndPartition(topic, partitionId))).andReturn(Some(log)).anyTimes()
+   EasyMock.replay(logManager)
+
+   val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager])
+   EasyMock.expect(replicaManager.config).andReturn(configs.head)
+   EasyMock.expect(replicaManager.logManager).andReturn(logManager)
+   EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager]))
+   EasyMock.expect(replicaManager.zkClient).andReturn(zkClient)
+   EasyMock.replay(replicaManager)
+
+   val partition = new Partition(topic,0,1,new MockTime,replicaManager)
+   val replicas = Set(new Replica(1,partition),new Replica(2,partition))
+
+   // regular update
+   val (updateSucceeded1,newZkVersion1) = ReplicationUtils.updateIsr(zkClient,
+     "my-topic-test", partitionId, brokerId, leaderEpoch, controllerEpoch, 0, replicas)
+   assertTrue(updateSucceeded1)
+   assertEquals(newZkVersion1,1)
+
+   // mismatched zkVersion with the same data
+   val (updateSucceeded2,newZkVersion2) = ReplicationUtils.updateIsr(zkClient,
+     "my-topic-test", partitionId, brokerId, leaderEpoch, controllerEpoch, zkVersion + 1,
replicas)
+   assertTrue(updateSucceeded2)
+   // returns true with existing zkVersion
+   assertEquals(newZkVersion2,1)
+
+   // mismatched zkVersion and leaderEpoch
+   val (updateSucceeded3,newZkVersion3) = ReplicationUtils.updateIsr(zkClient,
+     "my-topic-test", partitionId, brokerId, leaderEpoch + 1, controllerEpoch, zkVersion
+ 1, replicas)
+   assertFalse(updateSucceeded3)
+   assertEquals(newZkVersion3,-1)
+ }
+
+}


Mime
View raw message