kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1382988 [2/2] - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/admin/ main/scala/kafka/api/ main/scala/kafka/cluster/ main/scala/kafka/common/ main/scala/kafka/consumer/ main/scala/kafka/server/ main/scala/kafka/utils/ test/sc...
Date Mon, 10 Sep 2012 17:09:52 GMT
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1382988&r1=1382987&r2=1382988&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Mon Sep 10
17:09:52 2012
@@ -23,10 +23,10 @@ import org.I0Itec.zkclient.{IZkDataListe
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError}
 import org.I0Itec.zkclient.serialize.ZkSerializer
 import scala.collection._
-import kafka.api.LeaderAndISR
-import kafka.common.NoEpochForPartitionException
+import kafka.api.LeaderAndIsr
 import org.apache.zookeeper.data.Stat
 import java.util.concurrent.locks.{ReentrantLock, Condition}
+import kafka.common.{KafkaException, NoEpochForPartitionException}
 
 object ZkUtils extends Logging {
   val ConsumersPath = "/consumers"
@@ -43,15 +43,17 @@ object ZkUtils extends Logging {
   }
 
   def getController(zkClient: ZkClient): Int= {
-    val controller = readDataMaybeNull(zkClient, ControllerPath)._1
-    controller.toInt
+    readDataMaybeNull(zkClient, ControllerPath)._1 match {
+      case Some(controller) => controller.toInt
+      case None => throw new KafkaException("Controller doesn't exist")
+    }
   }
 
   def getTopicPartitionPath(topic: String, partitionId: Int): String ={
     getTopicPartitionsPath(topic) + "/" + partitionId
   }
 
-  def getTopicPartitionLeaderAndISRPath(topic: String, partitionId: Int): String ={
+  def getTopicPartitionLeaderAndIsrPath(topic: String, partitionId: Int): String ={
     getTopicPartitionPath(topic, partitionId) + "/" + "leaderAndISR"
   }
 
@@ -65,41 +67,42 @@ object ZkUtils extends Logging {
 
   def getAllBrokersInCluster(zkClient: ZkClient): Seq[Broker] = {
     val brokerIds = ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted
-    getBrokerInfoFromIds(zkClient, brokerIds.map(_.toInt))
+    brokerIds.map(_.toInt).map(getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
   }
 
-
-  def getLeaderAndISRForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndISR]
= {
-    val leaderAndISRPath = getTopicPartitionLeaderAndISRPath(topic, partition)
-    val ret = readDataMaybeNull(zkClient, leaderAndISRPath)
-    val leaderAndISRStr: String = ret._1
-    val stat = ret._2
-    if(leaderAndISRStr == null) None
-    else {
-      SyncJSON.parseFull(leaderAndISRStr) match {
-        case Some(m) =>
-          val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt
-          val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
-          val ISRString = m.asInstanceOf[Map[String, String]].get("ISR").get
-          val ISR = Utils.getCSVList(ISRString).map(r => r.toInt)
-          val zkPathVersion = stat.getVersion
-          debug("Leader %d, Epoch %d, isr %s, zk path version %d for topic %s and partition
%d".format(leader, epoch, ISR.toString(), zkPathVersion, topic, partition))
-          Some(LeaderAndISR(leader, epoch, ISR.toList, zkPathVersion))
-        case None => None
-      }
+  def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndIsr]
= {
+    val leaderAndISRPath = getTopicPartitionLeaderAndIsrPath(topic, partition)
+    val leaderAndIsrInfo = readDataMaybeNull(zkClient, leaderAndISRPath)
+    val leaderAndIsrOpt = leaderAndIsrInfo._1
+    val stat = leaderAndIsrInfo._2
+    leaderAndIsrOpt match {
+      case Some(leaderAndIsrStr) =>
+        SyncJSON.parseFull(leaderAndIsrStr) match {
+          case Some(m) =>
+            val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt
+            val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
+            val isrString = m.asInstanceOf[Map[String, String]].get("ISR").get
+            val isr = Utils.getCSVList(isrString).map(r => r.toInt)
+            val zkPathVersion = stat.getVersion
+            debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for topic %s and partition
%d".format(leader, epoch,
+              isr.toString(), zkPathVersion, topic, partition))
+            Some(LeaderAndIsr(leader, epoch, isr.toList, zkPathVersion))
+          case None => None
+        }
+      case None => None // TODO: Handle if leader and isr info is not available in zookeeper
     }
   }
 
-
   def getLeaderForPartition(zkClient: ZkClient, topic: String, partition: Int): Option[Int]
= {
-    val leaderAndISR = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic,
partition))._1
-    if(leaderAndISR == null) None
-    else {
-      SyncJSON.parseFull(leaderAndISR) match {
-        case Some(m) =>
-          Some(m.asInstanceOf[Map[String, String]].get("leader").get.toInt)
-        case None => None
-      }
+    val leaderAndIsrOpt = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndIsrPath(topic,
partition))._1
+    leaderAndIsrOpt match {
+      case Some(leaderAndIsr) =>
+        SyncJSON.parseFull(leaderAndIsr) match {
+          case Some(m) =>
+            Some(m.asInstanceOf[Map[String, String]].get("leader").get.toInt)
+          case None => None
+        }
+      case None => None
     }
   }
 
@@ -109,32 +112,32 @@ object ZkUtils extends Logging {
    * other broker will retry becoming leader with the same new epoch value.
    */
   def getEpochForPartition(zkClient: ZkClient, topic: String, partition: Int): Int = {
-    val leaderAndISR = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic,
partition))._1
-    if(leaderAndISR != null) {
-      val epoch = SyncJSON.parseFull(leaderAndISR) match {
-        case None => throw new NoEpochForPartitionException("No epoch, leaderAndISR data
for topic %s partition %d is invalid".format(topic, partition))
-        case Some(m) =>
-          m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
-      }
-      epoch
+    val leaderAndIsrOpt = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndIsrPath(topic,
partition))._1
+    leaderAndIsrOpt match {
+      case Some(leaderAndIsr) =>
+        SyncJSON.parseFull(leaderAndIsr) match {
+          case None => throw new NoEpochForPartitionException("No epoch, leaderAndISR
data for topic %s partition %d is invalid".format(topic, partition))
+          case Some(m) => m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
+        }
+      case None => throw new NoEpochForPartitionException("No epoch, ISR path for topic
%s partition %d is empty"
+        .format(topic, partition))
     }
-    else
-      throw new NoEpochForPartitionException("No epoch, ISR path for topic %s partition %d
is empty".format(topic, partition))
   }
 
   /**
    * Gets the in-sync replicas (ISR) for a specific topic and partition
    */
   def getInSyncReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): Seq[Int]
= {
-    val leaderAndISR = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic,
partition))._1
-    if(leaderAndISR == null) Seq.empty[Int]
-    else {
-      SyncJSON.parseFull(leaderAndISR) match {
-        case Some(m) =>
-          val ISRString = m.asInstanceOf[Map[String, String]].get("ISR").get
-          Utils.getCSVList(ISRString).map(r => r.toInt)
-        case None => Seq.empty[Int]
-      }
+    val leaderAndIsrOpt = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndIsrPath(topic,
partition))._1
+    leaderAndIsrOpt match {
+      case Some(leaderAndIsr) =>
+        SyncJSON.parseFull(leaderAndIsr) match {
+          case Some(m) =>
+            val ISRString = m.asInstanceOf[Map[String, String]].get("ISR").get
+            Utils.getCSVList(ISRString).map(r => r.toInt)
+          case None => Seq.empty[Int]
+        }
+      case None => Seq.empty[Int]
     }
   }
 
@@ -142,19 +145,18 @@ object ZkUtils extends Logging {
    * Gets the assigned replicas (AR) for a specific topic and partition
    */
   def getReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): Seq[Int]
= {
-    val jsonPartitionMap = readDataMaybeNull(zkClient, getTopicPath(topic))._1
-    val assignedReplicas = if (jsonPartitionMap == null) {
-      Seq.empty[Int]
-    } else {
-      SyncJSON.parseFull(jsonPartitionMap) match {
-        case Some(m) => m.asInstanceOf[Map[String, List[String]]].get(partition.toString)
match {
+    val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1
+    jsonPartitionMapOpt match {
+      case Some(jsonPartitionMap) =>
+        SyncJSON.parseFull(jsonPartitionMap) match {
+          case Some(m) => m.asInstanceOf[Map[String, List[String]]].get(partition.toString)
match {
+            case None => Seq.empty[Int]
+            case Some(seq) => seq.map(_.toInt)
+          }
           case None => Seq.empty[Int]
-          case Some(seq) => seq.map(_.toInt)
         }
-        case None => Seq.empty[Int]
-      }
+      case None => Seq.empty[Int]
     }
-    assignedReplicas
   }
 
   def isPartitionOnBroker(zkClient: ZkClient, topic: String, partition: Int, brokerId: Int):
Boolean = {
@@ -347,17 +349,16 @@ object ZkUtils extends Logging {
     (dataStr, stat)
   }
 
-  def readDataMaybeNull(client: ZkClient, path: String): (String, Stat) = {
+  def readDataMaybeNull(client: ZkClient, path: String): (Option[String], Stat) = {
     val stat: Stat = new Stat()
-    var dataStr: String = null
-    try{
-      dataStr = client.readData(path, stat)
-      return (dataStr, stat)
-    } catch {
-      case e: ZkNoNodeException =>
-        return (null, stat)
-      case e2 => throw e2
-    }
+    val dataAndStat = try {
+                        (Some(client.readData(path, stat)), stat)
+                      } catch {
+                        case e: ZkNoNodeException =>
+                          (None, stat)
+                        case e2 => throw e2
+                      }
+    dataAndStat
   }
 
   def getChildren(client: ZkClient, path: String): Seq[String] = {
@@ -396,59 +397,66 @@ object ZkUtils extends Logging {
     cluster
   }
 
-  def getReplicaAssignmentForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[(String,
Int), Seq[Int]] = {
+  def getReplicaAssignmentForTopics(zkClient: ZkClient, topics: Iterator[String]):
+  mutable.Map[(String, Int), Seq[Int]] = {
     val ret = new mutable.HashMap[(String, Int), Seq[Int]]
-    topics.foreach{ topic =>
-      val jsonPartitionMap = readDataMaybeNull(zkClient, getTopicPath(topic))._1
-      if (jsonPartitionMap != null) {
-        SyncJSON.parseFull(jsonPartitionMap) match {
-          case Some(m) =>
-            val replicaMap = m.asInstanceOf[Map[String, Seq[String]]]
-            for((partition, replicas) <- replicaMap){
-              ret.put((topic, partition.toInt), replicas.map(_.toInt))
-              debug("Replicas assigned to topic [%s], partition [%s] are [%s]".format(topic,
partition, replicas))
-            }
-          case None =>
-        }
+    topics.foreach { topic =>
+      val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1
+      jsonPartitionMapOpt match {
+        case Some(jsonPartitionMap) =>
+          SyncJSON.parseFull(jsonPartitionMap) match {
+            case Some(m) =>
+              val replicaMap = m.asInstanceOf[Map[String, Seq[String]]]
+              for((partition, replicas) <- replicaMap){
+                ret.put((topic, partition.toInt), replicas.map(_.toInt))
+                debug("Replicas assigned to topic [%s], partition [%s] are [%s]".format(topic,
partition, replicas))
+              }
+            case None =>
+          }
+        case None =>
       }
-                  }
+    }
     ret
   }
 
-  def getPartitionLeaderAndISRForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[(String,
Int), LeaderAndISR] = {
-    val ret = new mutable.HashMap[(String, Int), LeaderAndISR]
+  def getPartitionLeaderAndIsrForTopics(zkClient: ZkClient, topics: Iterator[String]):
+  mutable.Map[(String, Int), LeaderAndIsr] = {
+    val ret = new mutable.HashMap[(String, Int), LeaderAndIsr]
     val partitionsForTopics = getPartitionsForTopics(zkClient, topics)
-    for((topic, partitions) <- partitionsForTopics){
-      for(partition <- partitions){
-        val leaderAndISROpt = ZkUtils.getLeaderAndISRForPartition(zkClient, topic, partition.toInt)
-        if(leaderAndISROpt.isDefined)
-          ret.put((topic, partition.toInt), leaderAndISROpt.get)
+    for((topic, partitions) <- partitionsForTopics) {
+      for(partition <- partitions) {
+        ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition.toInt) match {
+          case Some(leaderAndIsr) => ret.put((topic, partition.toInt), leaderAndIsr)
+          case None =>
+        }
       }
     }
     ret
   }
 
-  def getPartitionAssignmentForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String,
collection.Map[Int, Seq[Int]]] = {
+  def getPartitionAssignmentForTopics(zkClient: ZkClient, topics: Iterator[String]):
+  mutable.Map[String, collection.Map[Int, Seq[Int]]] = {
     val ret = new mutable.HashMap[String, Map[Int, Seq[Int]]]()
     topics.foreach{ topic =>
-      val jsonPartitionMap = readDataMaybeNull(zkClient, getTopicPath(topic))._1
-      val partitionMap = if (jsonPartitionMap == null) {
-        Map[Int, Seq[Int]]()
-      } else {
-        SyncJSON.parseFull(jsonPartitionMap) match {
-          case Some(m) =>
-            val m1 = m.asInstanceOf[Map[String, Seq[String]]]
-            m1.map(p => (p._1.toInt, p._2.map(_.toInt)))
-          case None => Map[Int, Seq[Int]]()
-        }
+      val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1
+      val partitionMap = jsonPartitionMapOpt match {
+        case Some(jsonPartitionMap) =>
+          SyncJSON.parseFull(jsonPartitionMap) match {
+            case Some(m) =>
+              val m1 = m.asInstanceOf[Map[String, Seq[String]]]
+              m1.map(p => (p._1.toInt, p._2.map(_.toInt)))
+            case None => Map[Int, Seq[Int]]()
+          }
+        case None => Map[Int, Seq[Int]]()
       }
-      debug("partition map for /brokers/topics/%s is %s".format(topic, partitionMap))
+      debug("Partition map for /brokers/topics/%s is %s".format(topic, partitionMap))
       ret += (topic -> partitionMap)
-                  }
+    }
     ret
   }
 
-  def getReplicaAssignmentFromPartitionAssignment(topicPartitionAssignment: mutable.Map[String,
collection.Map[Int, Seq[Int]]]): mutable.Map[(String, Int), Seq[Int]] = {
+  def getReplicaAssignmentFromPartitionAssignment(topicPartitionAssignment: mutable.Map[String,
collection.Map[Int, Seq[Int]]]):
+  mutable.Map[(String, Int), Seq[Int]] = {
     val ret = new mutable.HashMap[(String, Int), Seq[Int]]
     for((topic, partitionAssignment) <- topicPartitionAssignment){
       for((partition, replicaAssignment) <- partitionAssignment){
@@ -468,7 +476,8 @@ object ZkUtils extends Logging {
     }
   }
 
-  def getPartitionsAssignedToBroker(zkClient: ZkClient, topics: Seq[String], brokerId: Int):
Map[(String, Int), Seq[Int]] = {
+  def getPartitionsAssignedToBroker(zkClient: ZkClient, topics: Seq[String], brokerId: Int):
+  Map[(String, Int), Seq[Int]] = {
     val ret = new mutable.HashMap[(String, Int), Seq[Int]]
     val topicsAndPartitions = getPartitionAssignmentForTopics(zkClient, topics.iterator)
     topicsAndPartitions.map
@@ -499,7 +508,8 @@ object ZkUtils extends Logging {
   def getConsumerTopicMaps(zkClient: ZkClient, group: String): Map[String, TopicCount] =
{
     val dirs = new ZKGroupDirs(group)
     val consumersInGroup = getConsumersInGroup(zkClient, group)
-    val topicCountMaps = consumersInGroup.map(consumerId => TopicCount.constructTopicCount(consumerId,ZkUtils.readData(zkClient,
dirs.consumerRegistryDir + "/" + consumerId)._1, zkClient))
+    val topicCountMaps = consumersInGroup.map(consumerId => TopicCount.constructTopicCount(consumerId,
+      ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)._1, zkClient))
     consumersInGroup.zip(topicCountMaps).toMap
   }
 
@@ -522,7 +532,19 @@ object ZkUtils extends Logging {
     consumersPerTopicMap
   }
 
-  def getBrokerInfoFromIds(zkClient: ZkClient, brokerIds: Seq[Int]): Seq[Broker] = brokerIds.map(
bid => Broker.createBroker(bid, ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath
+ "/" + bid)._1))
+  /**
+   * This API takes in a broker id, queries zookeeper for the broker metadata and returns
the metadata for that broker
+   * or throws an exception if the broker dies before the query to zookeeper finishes
+   * @param brokerId The broker id
+   * @param zkClient The zookeeper client connection
+   * @returns An optional Broker object encapsulating the broker metadata
+   */
+  def getBrokerInfo(zkClient: ZkClient, brokerId: Int): Option[Broker] = {
+    ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match
{
+      case Some(brokerInfo) => Some(Broker.createBroker(brokerId, brokerInfo))
+      case None => None
+    }
+  }
 
   def getAllTopics(zkClient: ZkClient): Seq[String] = {
     val topics = ZkUtils.getChildrenParentMayNotExist(zkClient, BrokerTopicsPath)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala?rev=1382988&r1=1382987&r2=1382988&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala
Mon Sep 10 17:09:52 2012
@@ -53,12 +53,12 @@ object RpcDataSerializationTestUtils{
   private val topicmetaData1 = new TopicMetadata(topic1, partitionMetaDataSeq)
   private val topicmetaData2 = new TopicMetadata(topic2, partitionMetaDataSeq)
 
-  def createTestLeaderAndISRRequest() : LeaderAndISRRequest = {
-    val leaderAndISR1 = new LeaderAndISR(leader1, 1, isr1, 1)
-    val leaderAndISR2 = new LeaderAndISR(leader2, 1, isr2, 2)
+  def createTestLeaderAndISRRequest() : LeaderAndIsrRequest = {
+    val leaderAndISR1 = new LeaderAndIsr(leader1, 1, isr1, 1)
+    val leaderAndISR2 = new LeaderAndIsr(leader2, 1, isr2, 2)
     val map = Map(((topic1, 0), leaderAndISR1),
                   ((topic2, 0), leaderAndISR2))
-    new LeaderAndISRRequest( LeaderAndISRRequest.NotInit, map)
+    new LeaderAndIsrRequest( LeaderAndIsrRequest.NotInit, map)
   }
 
   def createTestLeaderAndISRResponse() : LeaderAndISRResponse = {
@@ -129,7 +129,7 @@ class RpcDataSerializationTest extends J
     var buffer: ByteBuffer = ByteBuffer.allocate(leaderAndISRRequest.sizeInBytes())
     leaderAndISRRequest.writeTo(buffer)
     buffer.rewind()
-    val deserializedLeaderAndISRRequest = LeaderAndISRRequest.readFrom(buffer)
+    val deserializedLeaderAndISRRequest = LeaderAndIsrRequest.readFrom(buffer)
     assertEquals("The original and deserialzed leaderAndISRRequest should be the same", leaderAndISRRequest,
                  deserializedLeaderAndISRRequest)
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala?rev=1382988&r1=1382987&r2=1382988&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
Mon Sep 10 17:09:52 2012
@@ -60,7 +60,7 @@ class RequestPurgatoryTest extends JUnit
   }
 
   @Test
-  def testRequestExpirey() {
+  def testRequestExpiry() {
     val expiration = 20L
     val r1 = new DelayedRequest(Array("test1"), null, expiration)
     val r2 = new DelayedRequest(Array("test1"), null, 200000L)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1382988&r1=1382987&r2=1382988&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Mon
Sep 10 17:09:52 2012
@@ -33,9 +33,7 @@ import collection.mutable.ListBuffer
 import kafka.consumer.ConsumerConfig
 import java.util.concurrent.locks.ReentrantLock
 import java.util.concurrent.TimeUnit
-import kafka.common.ErrorMapping
 import kafka.api._
-import collection.mutable.{Map, Set}
 import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder}
 
 
@@ -397,17 +395,17 @@ object TestUtils extends Logging {
         val partition = leaderForPartition._1
         val leader = leaderForPartition._2
         try{
-          val currentLeaderAndISROpt = ZkUtils.getLeaderAndISRForPartition(zkClient, topic,
partition)
-          var newLeaderAndISR: LeaderAndISR = null
+          val currentLeaderAndISROpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic,
partition)
+          var newLeaderAndISR: LeaderAndIsr = null
           if(currentLeaderAndISROpt == None)
-            newLeaderAndISR = new LeaderAndISR(leader, List(leader))
+            newLeaderAndISR = new LeaderAndIsr(leader, List(leader))
           else{
             newLeaderAndISR = currentLeaderAndISROpt.get
             newLeaderAndISR.leader = leader
             newLeaderAndISR.leaderEpoch += 1
             newLeaderAndISR.zkVersion += 1
           }
-          ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(
topic, partition), newLeaderAndISR.toString)
+          ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(
topic, partition), newLeaderAndISR.toString)
         } catch {
           case oe => error("Error while electing leader for topic %s partition %d".format(topic,
partition), oe)
         }
@@ -426,7 +424,7 @@ object TestUtils extends Logging {
 
     leaderLock.lock()
     try {
-      zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndISRPath(topic, partition),
new LeaderExistsOrChangedListener(topic, partition, leaderLock, leaderExistsOrChanged, oldLeaderOpt,
zkClient))
+      zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
new LeaderExistsOrChangedListener(topic, partition, leaderLock, leaderExistsOrChanged, oldLeaderOpt,
zkClient))
       leaderExistsOrChanged.await(timeoutMs, TimeUnit.MILLISECONDS)
       // check if leader is elected
       val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)



Mime
View raw message