kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1367811 [4/4] - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/admin/ main/scala/kafka/api/ main/scala/kafka/cluster/ main/scala/kafka/common/ main/scala/kafka/consumer/ main/scala/kafka/log/ main/scala/kafka/network/ main/sca...
Date Tue, 31 Jul 2012 22:51:01 GMT
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Tue
Jul 31 22:50:59 2012
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
+ * 
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -43,33 +43,33 @@ import collection.mutable.{Map, Set}
  * Utility functions to help with testing
  */
 object TestUtils extends Logging {
-
+  
   val Letters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
   val Digits = "0123456789"
   val LettersAndDigits = Letters + Digits
-
+  
   /* A consistent random number generator to make tests repeatable */
   val seededRandom = new Random(192348092834L)
   val random = new Random()
-
+  
   /**
    * Choose a number of random available ports
    */
   def choosePorts(count: Int): List[Int] = {
-    val sockets =
+    val sockets = 
       for(i <- 0 until count)
-      yield new ServerSocket(0)
+        yield new ServerSocket(0)
     val socketList = sockets.toList
     val ports = socketList.map(_.getLocalPort)
     socketList.map(_.close)
     ports
   }
-
+  
   /**
    * Choose an available port
    */
   def choosePort(): Int = choosePorts(1).head
-
+  
   /**
    * Create a temporary directory
    */
@@ -80,7 +80,7 @@ object TestUtils extends Logging {
     f.deleteOnExit()
     f
   }
-
+  
   /**
    * Create a temporary file
    */
@@ -89,12 +89,12 @@ object TestUtils extends Logging {
     f.deleteOnExit()
     f
   }
-
+  
   /**
    * Create a temporary file and return an open file channel for this file
    */
   def tempChannel(): FileChannel = new RandomAccessFile(tempFile(), "rw").getChannel()
-
+  
   /**
    * Create a kafka server instance with appropriate test settings
    * USING THIS IS A SIGN YOU ARE NOT WRITING A REAL UNIT TEST
@@ -105,15 +105,15 @@ object TestUtils extends Logging {
     server.startup()
     server
   }
-
+  
   /**
    * Create a test config for the given node id
    */
   def createBrokerConfigs(numConfigs: Int): List[Properties] = {
     for((port, node) <- choosePorts(numConfigs).zipWithIndex)
-    yield createBrokerConfig(node, port)
+      yield createBrokerConfig(node, port)
   }
-
+  
   /**
    * Create a test config for the given node id
    */
@@ -127,7 +127,7 @@ object TestUtils extends Logging {
     props.put("replica.socket.timeout.ms", "1500")
     props
   }
-
+  
   /**
    * Create a test config for a consumer
    */
@@ -150,9 +150,9 @@ object TestUtils extends Logging {
    * Wrap the message in a message set
    * @param payload The bytes of the message
    */
-  def singleMessageSet(payload: Array[Byte]) =
+  def singleMessageSet(payload: Array[Byte]) = 
     new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(payload))
-
+  
   /**
    * Generate an array of random bytes
    * @param numBytes The size of the array
@@ -162,7 +162,7 @@ object TestUtils extends Logging {
     seededRandom.nextBytes(bytes)
     bytes
   }
-
+  
   /**
    * Generate a random string of letters and digits of the given length
    * @param len The length of the string
@@ -183,7 +183,7 @@ object TestUtils extends Logging {
     for(i <- 0 until b1.limit - b1.position)
       assertEquals("byte " + i + " byte not equal.", b1.get(b1.position + i), b2.get(b1.position
+ i))
   }
-
+  
   /**
    * Throw an exception if the two iterators are of differing lengths or contain
    * different messages on their Nth element
@@ -197,28 +197,28 @@ object TestUtils extends Logging {
 
     // check if the expected iterator is longer
     if (expected.hasNext) {
-      var length1 = length;
-      while (expected.hasNext) {
-        expected.next
-        length1 += 1
-      }
-      assertFalse("Iterators have uneven length-- first has more: "+length1 + " > " +
length, true);
+     var length1 = length;
+     while (expected.hasNext) {
+       expected.next
+       length1 += 1
+     }
+     assertFalse("Iterators have uneven length-- first has more: "+length1 + " > " + length,
true);
     }
 
     // check if the actual iterator was longer
     if (actual.hasNext) {
-      var length2 = length;
-      while (actual.hasNext) {
-        actual.next
-        length2 += 1
-      }
-      assertFalse("Iterators have uneven length-- second has more: "+length2 + " > " +
length, true);
+     var length2 = length;
+     while (actual.hasNext) {
+       actual.next
+       length2 += 1
+     }
+     assertFalse("Iterators have uneven length-- second has more: "+length2 + " > " +
length, true);
     }
   }
 
   /**
    *  Throw an exception if an iterable has different length than expected
-   *
+   *  
    */
   def checkLength[T](s1: Iterator[T], expectedLength:Int) {
     var n = 0
@@ -269,7 +269,7 @@ object TestUtils extends Logging {
    * Create a hexidecimal string for the given bytes
    */
   def hexString(bytes: Array[Byte]): String = hexString(ByteBuffer.wrap(bytes))
-
+  
   /**
    * Create a hexidecimal string for the given bytes
    */
@@ -279,7 +279,7 @@ object TestUtils extends Logging {
       builder.append(String.format("%x", Integer.valueOf(buffer.get(buffer.position + i))))
     builder.toString
   }
-
+  
   /**
    * Create a producer for the given host and port
    */
@@ -340,7 +340,7 @@ object TestUtils extends Logging {
       buffer += ("msg" + i)
     buffer
   }
-
+  
   /**
    * Create a wired format request based on simple basic information
    */
@@ -381,55 +381,34 @@ object TestUtils extends Logging {
   }
 
   def makeLeaderForPartition(zkClient: ZkClient, topic: String, leaderPerPartitionMap: scala.collection.immutable.Map[Int,
Int]) {
-    leaderPerPartitionMap.foreach
-    {
-      leaderForPartition => {
-        val partition = leaderForPartition._1
-        val leader = leaderForPartition._2
-        try{
-          val currentLeaderAndISROpt = ZkUtils.getLeaderAndISRForPartition(zkClient, topic,
partition)
-          var newLeaderAndISR: LeaderAndISR = null
-          if(currentLeaderAndISROpt == None)
-            newLeaderAndISR = new LeaderAndISR(leader, List(leader))
-          else{
-            newLeaderAndISR = currentLeaderAndISROpt.get
-            newLeaderAndISR.leader = leader
-            newLeaderAndISR.leaderEpoch += 1
-            newLeaderAndISR.zkVersion += 1
-          }
-          ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(
topic, partition), newLeaderAndISR.toString)
-        } catch {
-          case oe => error("Error while electing leader for topic %s partition %d".format(topic,
partition), oe)
-        }
-      }
-    }
+    leaderPerPartitionMap.foreach(leaderForPartition => ZkUtils.tryToBecomeLeaderForPartition(zkClient,
topic,
+      leaderForPartition._1, leaderForPartition._2))
   }
 
-  def waitUntilLiveLeaderIsElected(zkClient: ZkClient, topic: String, partition: Int, timeoutMs:
Long): Option[Int] = {
-    // If the current leader is alive, just return it
-    val curLeaderOpt = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
-    if(curLeaderOpt.isDefined && ZkUtils.pathExists(zkClient, ZkUtils.getBrokerPath(curLeaderOpt.get)))
-      return curLeaderOpt
-
+  def waitUntilLeaderIsElected(zkClient: ZkClient, topic: String, partition: Int, timeoutMs:
Long): Option[Int] = {
     val leaderLock = new ReentrantLock()
-    val liveLeaderIsElected = leaderLock.newCondition()
+    val leaderExists = leaderLock.newCondition()
 
+    info("Waiting for leader to be elected for topic %s partition %d".format(topic, partition))
     leaderLock.lock()
     try {
-      zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndISRPath(topic, partition),
new LeaderElectionListener(topic, partition, leaderLock,  liveLeaderIsElected, zkClient))
-      liveLeaderIsElected.await(timeoutMs, TimeUnit.MILLISECONDS)
+      // check if leader already exists
       val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
       leader match {
-        case Some(l) =>
-          if(ZkUtils.pathExists(zkClient, ZkUtils.getBrokerPath(l))){
-            info("Leader %d is elected for topic %s partition %d".format(l, topic, partition))
-            return leader
-          } else {
-            warn("Timing out after %d ms but current leader in zookeeper is not alive, and
no live leader for partition [%s, %d] is elected".format(topic, partition))
-            return None
-          }
-        case None => warn("Timing out after %d ms but no leader is elected for topic %s
partition %d".format(timeoutMs, topic, partition))
-        return None
+        case Some(l) => info("Leader %d exists for topic %s partition %d".format(l, topic,
partition))
+          leader
+        case None => zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderPath(topic,
partition.toString),
+          new LeaderExistsListener(topic, partition, leaderLock, leaderExists))
+        info("No leader exists. Waiting for %d ms".format(timeoutMs))
+        leaderExists.await(timeoutMs, TimeUnit.MILLISECONDS)
+          // check if leader is elected
+        val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
+        leader match {
+          case Some(l) => info("Leader %d elected for topic %s partition %d".format(l,
topic, partition))
+          case None => error("Timing out after %d ms since leader is not elected for topic
%s partition %d"
+            .format(timeoutMs, topic, partition))
+        }
+        leader
       }
     } finally {
       leaderLock.unlock()
@@ -451,49 +430,50 @@ object TestUtils extends Logging {
 }
 
 object ControllerTestUtils{
-  def createTestLeaderAndISRRequest() : LeaderAndISRRequest = {
+  def createSampleLeaderAndISRRequest() : LeaderAndISRRequest = {
     val topic1 = "test1"
     val topic2 = "test2"
 
-    val leader1 = 0;
-    val isr1 = List(0, 1, 2)
+    val leader1 = 1;
+    val ISR1 = List(1, 2, 3)
 
-    val leader2 = 0;
-    val isr2 = List(0, 2, 3)
+    val leader2 = 2;
+    val ISR2 = List(2, 3, 4)
 
-    val leaderAndISR1 = new LeaderAndISR(leader1, 1, isr1, 1)
-    val leaderAndISR2 = new LeaderAndISR(leader2, 1, isr2, 2)
-    val map = Map(((topic1, 0), leaderAndISR1),
-                  ((topic2, 0), leaderAndISR2))
-    new LeaderAndISRRequest( LeaderAndISRRequest.NotInit, map)
+    val leaderAndISR1 = new LeaderAndISR(leader1, 1, ISR1, 1)
+    val leaderAndISR2 = new LeaderAndISR(leader2, 1, ISR2, 2)
+    val map = Map(((topic1, 1), leaderAndISR1), ((topic1, 2), leaderAndISR1),
+                  ((topic2, 1), leaderAndISR2), ((topic2, 2), leaderAndISR2))
+    new LeaderAndISRRequest(1, "client 1", 1, 4, map)
   }
 
-  def createTestLeaderAndISRResponse() : LeaderAndISRResponse = {
+  def createSampleLeaderAndISRResponse() : LeaderAndISRResponse = {
     val topic1 = "test1"
     val topic2 = "test2"
-    val responseMap = Map(((topic1, 0), ErrorMapping.NoError),
-                          ((topic2, 0), ErrorMapping.NoError))
+    val responseMap = Map(((topic1, 1), ErrorMapping.NoError), ((topic1, 2), ErrorMapping.NoError),
+                          ((topic2, 1), ErrorMapping.NoError), ((topic2, 2), ErrorMapping.NoError))
     new LeaderAndISRResponse(1, responseMap)
   }
 
 
-  def createTestStopReplicaRequest() : StopReplicaRequest = {
+  def createSampleStopReplicaRequest() : StopReplicaRequest = {
     val topic1 = "test1"
     val topic2 = "test2"
-    new StopReplicaRequest(Set((topic1, 0), (topic2, 0)))
+    new StopReplicaRequest(1, "client 1", 1000, Set((topic1, 1), (topic1, 2),
+                                                    (topic2, 1), (topic2, 2)))
   }
 
-  def createTestStopReplicaResponse() : StopReplicaResponse = {
+  def createSampleStopReplicaResponse() : StopReplicaResponse = {
     val topic1 = "test1"
     val topic2 = "test2"
-    val responseMap = Map(((topic1, 0), ErrorMapping.NoError),
-                          ((topic2, 0), ErrorMapping.NoError))
+    val responseMap = Map(((topic1, 1), ErrorMapping.NoError), ((topic1, 2), ErrorMapping.NoError),
+                          ((topic2, 1), ErrorMapping.NoError), ((topic2, 2), ErrorMapping.NoError))
     new StopReplicaResponse(1, responseMap)
   }
 }
 
 object TestZKUtils {
-  val zookeeperConnect = "127.0.0.1:2182"
+  val zookeeperConnect = "127.0.0.1:2182"  
 }
 
 class StringSerializer extends Encoder[String] {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala Tue
Jul 31 22:50:59 2012
@@ -40,7 +40,7 @@ class ZKEphemeralTest extends JUnit3Suit
 
     var testData: String = null
 
-    testData = ZkUtils.readData(zkClient, "/tmp/zktest")._1
+    testData = ZkUtils.readData(zkClient, "/tmp/zktest")
     Assert.assertNotNull(testData)
 
     zkClient.close



Mime
View raw message