kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1411010 [2/2] - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/api/ main/scala/kafka/cluster/ main/scala/kafka/common/ main/scala/kafka/controller/ main/scala/kafka/server/ main/scala/kafka/utils/ test/scala/unit/kafka/admin/ ...
Date Sun, 18 Nov 2012 22:48:25 GMT
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala?rev=1411010&r1=1411009&r2=1411010&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
Sun Nov 18 22:48:20 2012
@@ -25,6 +25,7 @@ import kafka.message.{Message, ByteBuffe
 import kafka.cluster.Broker
 import collection.mutable._
 import kafka.common.{TopicAndPartition, ErrorMapping}
+import kafka.controller.LeaderIsrAndControllerEpoch
 
 
 object SerializationTestUtils{
@@ -83,11 +84,11 @@ object SerializationTestUtils{
   private val topicmetaData2 = new TopicMetadata(topic2, partitionMetaDataSeq)
 
   def createTestLeaderAndIsrRequest() : LeaderAndIsrRequest = {
-    val leaderAndIsr1 = new LeaderAndIsr(leader1, 1, isr1, 1)
-    val leaderAndIsr2 = new LeaderAndIsr(leader2, 1, isr2, 2)
+    val leaderAndIsr1 = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader1, 1, isr1,
1), 1)
+    val leaderAndIsr2 = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader2, 1, isr2,
2), 1)
     val map = Map(((topic1, 0), PartitionStateInfo(leaderAndIsr1, 3)),
                   ((topic2, 0), PartitionStateInfo(leaderAndIsr2, 3)))
-    new LeaderAndIsrRequest(map, collection.immutable.Set[Broker]())
+    new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[Broker](), 1)
   }
 
   def createTestLeaderAndIsrResponse() : LeaderAndIsrResponse = {
@@ -97,13 +98,13 @@ object SerializationTestUtils{
   }
 
   def createTestStopReplicaRequest() : StopReplicaRequest = {
-    new StopReplicaRequest(deletePartitions = true, partitions = collection.immutable.Set((topic1,
0), (topic2, 0)))
+    new StopReplicaRequest(controllerEpoch = 1, deletePartitions = true, partitions = collection.immutable.Set((topic1,
0), (topic2, 0)))
   }
 
   def createTestStopReplicaResponse() : StopReplicaResponse = {
     val responseMap = Map(((topic1, 0), ErrorMapping.NoError),
                           ((topic2, 0), ErrorMapping.NoError))
-    new StopReplicaResponse(1, responseMap)
+    new StopReplicaResponse(1, responseMap.toMap)
   }
 
   def createTestProducerRequest: ProducerRequest = {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala?rev=1411010&r1=1411009&r2=1411010&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
Sun Nov 18 22:48:20 2012
@@ -69,7 +69,7 @@ class TopicMetadataTest extends JUnit3Su
     val leaderForPartitionMap = Map(
       0 -> configs.head.brokerId
     )
-    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap)
+    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
     val topicMetadata = mockLogManagerAndTestTopic(topic)
     assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
     assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala?rev=1411010&r1=1411009&r2=1411010&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
Sun Nov 18 22:48:20 2012
@@ -23,6 +23,10 @@ import kafka.admin.CreateTopicCommand
 import kafka.utils.TestUtils._
 import junit.framework.Assert._
 import kafka.utils.{ZkUtils, Utils, TestUtils}
+import kafka.controller.{LeaderIsrAndControllerEpoch, ControllerChannelManager}
+import kafka.cluster.Broker
+import kafka.common.ErrorMapping
+import kafka.api._
 
 class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
   val brokerId1 = 0
@@ -35,6 +39,8 @@ class LeaderElectionTest extends JUnit3S
   val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2)
   var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
 
+  var staleControllerEpochDetected = false
+
   override def setUp() {
     super.setUp()
     // start both servers
@@ -95,4 +101,50 @@ class LeaderElectionTest extends JUnit3S
     else
       assertEquals("Second epoch value should be %d".format(leaderEpoch2+1) , leaderEpoch2+1,
leaderEpoch3)
   }
+
+  def testLeaderElectionWithStaleControllerEpoch() {
+    // start 2 brokers
+    val topic = "new-topic"
+    val partitionId = 0
+
+    // create topic with 1 partition, 2 replicas, one on each broker
+    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
+
+    // wait until leader is elected
+    val leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
+    val leaderEpoch1 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId)
+    debug("leader Epoc: " + leaderEpoch1)
+    debug("Leader is elected to be: %s".format(leader1.getOrElse(-1)))
+    assertTrue("Leader should get elected", leader1.isDefined)
+    // NOTE: this is to avoid transient test failures
+    assertTrue("Leader could be broker 0 or broker 1", (leader1.getOrElse(-1) == 0) || (leader1.getOrElse(-1)
== 1))
+    assertEquals("First epoch value should be 0", 0, leaderEpoch1)
+
+
+    // start another controller
+    val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(2, TestUtils.choosePort()))
+    val brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, "localhost",
s.config.port))
+    val controllerChannelManager = new ControllerChannelManager(brokers.toSet, controllerConfig)
+    controllerChannelManager.startup()
+    val staleControllerEpoch = 0
+    val leaderAndIsr = new collection.mutable.HashMap[(String, Int), LeaderIsrAndControllerEpoch]
+    leaderAndIsr.put((topic, partitionId),
+      new LeaderIsrAndControllerEpoch(new LeaderAndIsr(brokerId2, List(brokerId1, brokerId2)),
2))
+    val partitionStateInfo = leaderAndIsr.mapValues(l => new PartitionStateInfo(l, 1)).toMap
+    val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, brokers.toSet,
staleControllerEpoch)
+
+    controllerChannelManager.sendRequest(brokerId2, leaderAndIsrRequest, staleControllerEpochCallback)
+    TestUtils.waitUntilTrue(() => staleControllerEpochDetected == true, 1000)
+    assertTrue("Stale controller epoch not detected by the broker", staleControllerEpochDetected)
+
+    controllerChannelManager.shutdown()
+  }
+
+  private def staleControllerEpochCallback(response: RequestOrResponse): Unit = {
+    val leaderAndIsrResponse = response.asInstanceOf[LeaderAndIsrResponse]
+    staleControllerEpochDetected = leaderAndIsrResponse.errorCode match {
+      case ErrorMapping.StaleControllerEpochCode => true
+      case _ => false
+    }
+  }
 }
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1411010&r1=1411009&r2=1411010&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Sun
Nov 18 22:48:20 2012
@@ -372,7 +372,9 @@ object TestUtils extends Logging {
     new kafka.api.ProducerRequest(correlationId, clientId, acks.toShort, ackTimeoutMs, Map(data:_*))
   }
 
-  def makeLeaderForPartition(zkClient: ZkClient, topic: String, leaderPerPartitionMap: scala.collection.immutable.Map[Int,
Int]) {
+  def makeLeaderForPartition(zkClient: ZkClient, topic: String,
+                             leaderPerPartitionMap: scala.collection.immutable.Map[Int, Int],
+                             controllerEpoch: Int) {
     leaderPerPartitionMap.foreach
     {
       leaderForPartition => {
@@ -390,7 +392,7 @@ object TestUtils extends Logging {
             newLeaderAndIsr.zkVersion += 1
           }
           ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic,
partition),
-            newLeaderAndIsr.toString)
+            ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch))
         } catch {
           case oe => error("Error while electing leader for topic %s partition %d".format(topic,
partition), oe)
         }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala?rev=1411010&r1=1411009&r2=1411010&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala Sun
Nov 18 22:48:20 2012
@@ -13,7 +13,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- */
+*/
 
 package kafka.zk
 
@@ -35,7 +35,7 @@ class ZKEphemeralTest extends JUnit3Suit
     try {
       ZkUtils.createEphemeralPathExpectConflict(zkClient, "/tmp/zktest", "node created")
     } catch {                       
-      case e: Exception => println("Exception in creating ephemeral node")
+      case e: Exception =>
     }
 
     var testData: String = null



Mime
View raw message