kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject svn commit: r1418155 - /kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
Date Fri, 07 Dec 2012 01:09:12 GMT
Author: jjkoshy
Date: Fri Dec  7 01:09:11 2012
New Revision: 1418155

URL: http://svn.apache.org/viewvc?rev=1418155&view=rev
Log:
KAFKA-633 Fix mostly consistent test failure in testShutdownBroker; patched by Joel Koshy;
reviewed by Jun Rao

Modified:
    kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala

Modified: kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala?rev=1418155&r1=1418154&r2=1418155&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala (original)
+++ kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala Fri Dec  7 01:09:11
2012
@@ -374,35 +374,38 @@ class AdminTest extends JUnit3Suite with
     var controllerId = ZkUtils.getController(zkClient)
     var controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
     var partitionsRemaining = controller.shutdownBroker(2)
-    assertEquals(0, partitionsRemaining)
-    var topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
-    var leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
-    assertTrue(leaderAfterShutdown != leaderBeforeShutdown)
-    assertEquals(2, topicMetadata.partitionsMetadata.head.isr.size)
-
-    leaderBeforeShutdown = leaderAfterShutdown
-    controllerId = ZkUtils.getController(zkClient)
-    controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
-    partitionsRemaining = controller.shutdownBroker(1)
-    assertEquals(0, partitionsRemaining)
-    topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
-    leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
-    assertTrue(leaderAfterShutdown != leaderBeforeShutdown)
-    assertEquals(1, topicMetadata.partitionsMetadata.head.isr.size)
-
-    leaderBeforeShutdown = leaderAfterShutdown
-    controllerId = ZkUtils.getController(zkClient)
-    controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
-    partitionsRemaining = controller.shutdownBroker(0)
-    assertEquals(1, partitionsRemaining)
-    topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
-    leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
-    assertTrue(leaderAfterShutdown == leaderBeforeShutdown)
-    assertEquals(1, topicMetadata.partitionsMetadata.head.isr.size)
-
-    servers.foreach(_.shutdown())
-
-
+    try {
+      assertEquals(0, partitionsRemaining)
+      var topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
+      var leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
+      assertTrue(leaderAfterShutdown != leaderBeforeShutdown)
+      // assertEquals(2, topicMetadata.partitionsMetadata.head.isr.size)
+      assertEquals(2, controller.controllerContext.allLeaders(TopicAndPartition("test", 1)).leaderAndIsr.isr.size)
+
+      leaderBeforeShutdown = leaderAfterShutdown
+      controllerId = ZkUtils.getController(zkClient)
+      controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
+      partitionsRemaining = controller.shutdownBroker(1)
+      assertEquals(0, partitionsRemaining)
+      topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
+      leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
+      assertTrue(leaderAfterShutdown != leaderBeforeShutdown)
+      // assertEquals(1, topicMetadata.partitionsMetadata.head.isr.size)
+      assertEquals(1, controller.controllerContext.allLeaders(TopicAndPartition("test", 1)).leaderAndIsr.isr.size)
+
+      leaderBeforeShutdown = leaderAfterShutdown
+      controllerId = ZkUtils.getController(zkClient)
+      controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
+      partitionsRemaining = controller.shutdownBroker(0)
+      assertEquals(1, partitionsRemaining)
+      topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
+      leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
+      assertTrue(leaderAfterShutdown == leaderBeforeShutdown)
+      assertEquals(1, controller.controllerContext.allLeaders(TopicAndPartition("test", 1)).leaderAndIsr.isr.size)
+    }
+    finally {
+      servers.foreach(_.shutdown())
+    }
   }
 
   private def checkIfReassignPartitionPathExists(): Boolean = {



Mime
View raw message