kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1303473 [2/2] - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/admin/ main/scala/kafka/cluster/ main/scala/kafka/consumer/ main/scala/kafka/javaapi/ main/scala/kafka/log/ main/scala/kafka/producer/ main/scala/kafka/producer/as...
Date Wed, 21 Mar 2012 17:29:34 GMT
Added: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala?rev=1303473&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
(added)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
Wed Mar 21 17:29:32 2012
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.server
+
+import org.scalatest.junit.JUnit3Suite
+import kafka.zk.ZooKeeperTestHarness
+import kafka.admin.CreateTopicCommand
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils.TestUtils._
+import junit.framework.Assert._
+import kafka.utils.{ZKStringSerializer, Utils, TestUtils}
+
+class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
+
+  val brokerId1 = 0
+  val brokerId2 = 1
+
+  val port1 = TestUtils.choosePort()
+  val port2 = TestUtils.choosePort()
+
+  val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1)
+  val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2)
+
+  var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
+  var zkClient: ZkClient = null
+
+  override def setUp() {
+    super.setUp()
+
+    zkClient = new ZkClient(zkConnect, 6000, 3000, ZKStringSerializer)
+
+    // start both servers
+    val server1 = TestUtils.createServer(new KafkaConfig(configProps1))
+    val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
+
+    servers ++= List(server1, server2)
+  }
+
+  override def tearDown() {
+    // shutdown the servers and delete data hosted on them
+    servers.map(server => server.shutdown())
+    servers.map(server => Utils.rm(server.config.logDir))
+
+    super.tearDown()
+  }
+
+  def testLeaderElectionWithCreateTopic {
+    // start 2 brokers
+    val topic = "new-topic"
+    val partitionId = 0
+
+    // create topic with 1 partition, 2 replicas, one on each broker
+    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
+
+    // wait until leader is elected
+    var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+
+    assertEquals("Leader must be preferred replica on broker 0", 0, leader.getOrElse(-1))
+
+    // kill the server hosting the preferred replica
+    servers.head.shutdown()
+
+    // check if leader moves to the other server
+    leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 5000)
+    assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
+
+    Thread.sleep(500)
+
+    // bring the preferred replica back
+    servers.head.startup()
+
+    leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+    // TODO: Once the optimization for preferred replica re-election is in, this check should
change to broker 0
+    assertEquals("Leader must remain on broker 1", 1, leader.getOrElse(-1))
+
+    // shutdown current leader (broker 1)
+    servers.last.shutdown()
+    leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+
+    // test if the leader is the preferred replica
+    assertEquals("Leader must be preferred replica on broker 0", 0, leader.getOrElse(-1))
+  }
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala?rev=1303473&r1=1303472&r2=1303473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
Wed Mar 21 17:29:32 2012
@@ -24,10 +24,11 @@ import junit.framework.Assert._
 import kafka.message.{Message, ByteBufferMessageSet}
 import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
-import kafka.utils.{TestUtils, Utils}
 import kafka.producer._
+import kafka.utils.TestUtils._
 import kafka.admin.CreateTopicCommand
 import kafka.api.FetchRequestBuilder
+import kafka.utils.{TestUtils, Utils}
 
 class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
   val port = TestUtils.choosePort
@@ -72,7 +73,7 @@ class ServerShutdownTest extends JUnit3S
       val server = new KafkaServer(config)
       server.startup()
 
-      Thread.sleep(100)
+      waitUntilLeaderIsElected(zookeeper.client, topic, 0, 1000)
 
       var fetchedMessage: ByteBufferMessageSet = null
       while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
@@ -83,7 +84,6 @@ class ServerShutdownTest extends JUnit3S
       val newOffset = fetchedMessage.validBytes
 
       // send some more messages
-      println("Sending messages to topic " + topic)
       producer.send(new ProducerData[Int, Message](topic, 0, sent2))
 
       Thread.sleep(200)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1303473&r1=1303472&r2=1303473&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Wed
Mar 21 17:29:32 2012
@@ -34,11 +34,13 @@ import kafka.consumer.{KafkaMessageStrea
 import scala.collection.Map
 import kafka.serializer.Encoder
 import kafka.api.{ProducerRequest, TopicData, PartitionData}
+import java.util.concurrent.locks.ReentrantLock
+import java.util.concurrent.TimeUnit
 
 /**
  * Utility functions to help with testing
  */
-object TestUtils {
+object TestUtils extends Logging {
   
   val Letters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
   val Digits = "0123456789"
@@ -385,6 +387,35 @@ object TestUtils {
     val pr = new kafka.javaapi.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeout,
data)  	
     pr
   }
+
+  def waitUntilLeaderIsElected(zkClient: ZkClient, topic: String, partition: Int, timeoutMs:
Long): Option[Int] = {
+    val leaderLock = new ReentrantLock()
+    val leaderExists = leaderLock.newCondition()
+
+    info("Waiting for leader to be elected for topic %s partition %d".format(topic, partition))
+    leaderLock.lock()
+    try {
+      // check if leader already exists
+      val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
+      leader match {
+        case Some(l) => info("Leader %d exists for topic %s partition %d".format(l, topic,
partition))
+          leader
+        case None => zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderPath(topic,
partition.toString),
+          new LeaderExists(topic, partition, leaderExists))
+        leaderExists.await(timeoutMs, TimeUnit.MILLISECONDS)
+          // check if leader is elected
+        val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
+        leader match {
+          case Some(l) => info("Leader %d elected for topic %s partition %d".format(l,
topic, partition))
+          case None => error("Timing out after %d ms since leader is not elected for topic
%s partition %d"
+            .format(timeoutMs, topic, partition))
+        }
+        leader
+      }
+    } finally {
+      leaderLock.unlock()
+    }
+  }
 }
 
 object TestZKUtils {



Mime
View raw message