kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject git commit: kafka-1473; transient unit test failure in testRequestHandlingDuringDeleteTopic; patched by Guozhang Wang; reviewed by Jun Rao
Date Wed, 04 Jun 2014 20:33:28 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 3a048e80d -> df449a24a


kafka-1473; transient unit test failure in testRequestHandlingDuringDeleteTopic;  patched
by Guozhang Wang; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/df449a24
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/df449a24
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/df449a24

Branch: refs/heads/trunk
Commit: df449a24a7bd7b87502cab93c847a0dfea12c6db
Parents: 3a048e8
Author: Guozhang Wang <guwang@linkedin.com>
Authored: Wed Jun 4 13:33:23 2014 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed Jun 4 13:33:23 2014 -0700

----------------------------------------------------------------------
 .../unit/kafka/admin/DeleteTopicTest.scala      | 67 ++------------------
 1 file changed, 7 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/df449a24/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index a821d60..1b3c04e 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -76,7 +76,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
     val servers = createTestTopicAndCluster(topic)
     val controllerId = ZkUtils.getController(zkClient)
     val controller = servers.filter(s => s.config.brokerId == controllerId).head
-    var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
+    val leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
     val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get &&
s.config.brokerId != controllerId).last
     follower.shutdown()
 
@@ -97,62 +97,6 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
   }
 
   @Test
-  def testRequestHandlingDuringDeleteTopic() {
-    val topicAndPartition = TopicAndPartition("test", 0)
-    val topic = topicAndPartition.topic
-    val servers = createTestTopicAndCluster(topic)
-    // start topic deletion
-    AdminUtils.deleteTopic(zkClient, topic)
-    // shut down one follower replica
-    var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
-    assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
-    val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last
-    follower.shutdown()
-    // test if produce requests are failed with UnknownTopicOrPartitionException during delete
topic
-    val props1 = new Properties()
-    props1.put("metadata.broker.list", servers.map(s => s.config.hostName + ":" + s.config.port).mkString(","))
-    props1.put("serializer.class", "kafka.serializer.StringEncoder")
-    props1.put("request.required.acks", "1")
-    val producerConfig1 = new ProducerConfig(props1)
-    val producer1 = new Producer[String, String](producerConfig1)
-    try {
-      producer1.send(new KeyedMessage[String, String](topic, "test", "test1"))
-      fail("Test should fail because the topic is being deleted")
-    } catch {
-      case e: FailedToSendMessageException =>
-      case oe: Throwable => fail("fails with exception", oe)
-    } finally {
-      producer1.close()
-    }
-    // test if fetch requests fail during delete topic
-    val availableServers: Seq[KafkaServer] = servers.filter(s => s.config.brokerId !=
follower.config.brokerId).toSeq
-    availableServers.foreach {
-      server =>
-        val consumer = new SimpleConsumer(server.config.hostName, server.config.port, 1000000,
64 * 1024, "")
-        val request = new FetchRequestBuilder()
-          .clientId("test-client")
-          .addFetch(topic, 0, 0, 10000)
-          .build()
-        val fetched = consumer.fetch(request)
-        val fetchResponse = fetched.data(topicAndPartition)
-        assertEquals("Fetch should fail with UnknownTopicOrPartitionCode", ErrorMapping.UnknownTopicOrPartitionCode,
fetchResponse.error)
-    }
-    // test if offset requests fail during delete topic
-    availableServers.foreach {
-      server =>
-        val consumer = new SimpleConsumer(server.config.hostName, server.config.port, 1000000,
64 * 1024, "")
-        val offsetRequest = new OffsetRequest(Map(topicAndPartition -> new PartitionOffsetRequestInfo(OffsetRequest.LatestTime,
1)))
-        val offsetResponse = consumer.getOffsetsBefore(offsetRequest)
-        val errorCode = offsetResponse.partitionErrorAndOffsets(topicAndPartition).error
-        assertEquals("Offset request should fail with UnknownTopicOrPartitionCode", ErrorMapping.UnknownTopicOrPartitionCode,
errorCode)
-    }
-    // restart follower replica
-    follower.startup()
-    verifyTopicDeletion(topic, availableServers)
-    servers.foreach(_.shutdown())
-  }
-
-  @Test
   def testPartitionReassignmentDuringDeleteTopic() {
     val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
     val topic = "test"
@@ -168,7 +112,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
     TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) =>
       res && server.getLogManager().getLog(topicAndPartition).isDefined),
       "Replicas for topic test not created.")
-    var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
+    val leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
     assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
     val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last
     follower.shutdown()
@@ -202,7 +146,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
   def testDeleteTopicDuringAddPartition() {
     val topic = "test"
     val servers = createTestTopicAndCluster(topic)
-    var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
+    val leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
     assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
     val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last
     val newPartition = TopicAndPartition(topic, 1)
@@ -334,8 +278,11 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
       "Admin path /admin/delete_topic/test path not deleted even after a replica is restarted")
     TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)),
       "Topic path /brokers/topics/test not deleted after /admin/delete_topic/test path is
deleted")
+    // ensure that the topic-partition has been deleted from all brokers' replica managers
+    TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => res &&
server.replicaManager.getPartition(topic, 0) == None),
+      "Replica manager's should have deleted all of this topic's partitions")
     // ensure that logs from all replicas are deleted if delete topic is marked successful
in zookeeper
     assertTrue("Replica logs not deleted after delete topic is complete",
       servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty))
   }
-}
\ No newline at end of file
+}


Mime
View raw message