kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject git commit: kafka-1396; fix transient unit test ProducerFailureHandlingTest.testBrokerFailure; patched by Guozhang Wang; reviewed by Jun Rao
Date Wed, 14 May 2014 00:33:03 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 1d35cce79 -> 31e32b386


kafka-1396; fix transient unit test ProducerFailureHandlingTest.testBrokerFailure;  patched
by Guozhang Wang; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/31e32b38
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/31e32b38
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/31e32b38

Branch: refs/heads/trunk
Commit: 31e32b386b7c31ca6b69f883a94268902e2cf17b
Parents: 1d35cce
Author: Guozhang Wang <guwang@linkedin.com>
Authored: Tue May 13 17:32:10 2014 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Tue May 13 17:32:10 2014 -0700

----------------------------------------------------------------------
 .../kafka/api/ProducerFailureHandlingTest.scala | 45 +++++++-------------
 .../kafka/api/ProducerSendTest.scala            |  2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala | 20 +++++++--
 3 files changed, 34 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/31e32b38/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index a993e8c..cd4ca2f 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -249,22 +249,6 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
     // re-close producer is fine
   }
 
-  /* Temporarily disables the test since it hangs occasionally on the following stacktrace.
Also, the test takes too long.
-"Test worker" prio=5 tid=7fb23bb48800 nid=0x10dc79000 waiting for monitor entry [10dc76000]
-   java.lang.Thread.State: BLOCKED (on object monitor)
-        at java.nio.HeapByteBuffer.slice(HeapByteBuffer.java:80)
-        at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:165)
-        at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:191)
-        at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:145)
-        at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
-        at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
-        at scala.collection.Iterator$class.foreach(Iterator.scala:631)
-        at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
-        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
-        at scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:399)
-        at kafka.utils.IteratorTemplate.toList(IteratorTemplate.scala:32)
-        at kafka.api.ProducerFailureHandlingTest.testBrokerFailure(ProducerFailureHandlingTest.scala:305)
-
   /**
    * With replication, producer should able able to find new leader after it detects broker
failure
    */
@@ -273,14 +257,13 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
     // create topic
     val leaders = TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
     val partition = 0
-    var leader = leaders(partition)
-    assertTrue("Leader of partition 0 of the topic should exist", leader.isDefined)
+    assertTrue("Leader of partition 0 of the topic should exist", leaders(partition).isDefined)
 
     val scheduler = new ProducerScheduler()
     scheduler.start
 
     // rolling bounce brokers
-    for (i <- 0 until 5) {
+    for (i <- 0 until 2) {
       server1.shutdown()
       server1.awaitShutdown()
       server1.startup
@@ -293,13 +276,24 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
 
       Thread.sleep(2000)
 
+      // Make sure the producer do not see any exception
+      // in returned metadata due to broker failures
       assertTrue(scheduler.failed == false)
+
+      // Make sure the leader still exists after bouncing brokers
+      TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, partition)
     }
 
     scheduler.shutdown
-    leader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, partition, 500)
 
-    val fetchResponse = if(leader.get == server1.config.brokerId) {
+    // Make sure the producer do not see any exception
+    // when draining the left messages on shutdown
+    assertTrue(scheduler.failed == false)
+
+    // double check that the leader info has been propagated after consecutive bounces
+    val leader = TestUtils.waitUntilMetadataIsPropagated(servers, topic1, partition)
+
+    val fetchResponse = if(leader == server1.config.brokerId) {
       consumer1.fetch(new FetchRequestBuilder().addFetch(topic1, partition, 0, Int.MaxValue).build()).messageSet(topic1,
partition)
     } else {
       consumer2.fetch(new FetchRequestBuilder().addFetch(topic1, partition, 0, Int.MaxValue).build()).messageSet(topic1,
partition)
@@ -311,7 +305,6 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
 
     assertEquals("Should have fetched " + scheduler.sent + " unique messages", scheduler.sent,
uniqueMessageSize)
   }
-  */
 
   private class ProducerScheduler extends ShutdownableThread("daemon-producer", false)
   {
@@ -319,13 +312,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
     var sent = 0
     var failed = false
 
-    val producerProps = new Properties()
-    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
-    producerProps.put(ProducerConfig.ACKS_CONFIG, (-1).toString)
-    producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString)
-    producerProps.put(ProducerConfig.RETRIES_CONFIG, 10.toString)
-    producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000.toString)
-    val producer = new KafkaProducer(producerProps)
+    val producer = TestUtils.createNewProducer(brokerList, bufferSize = bufferSize, retries
= 10)
 
     override def doWork(): Unit = {
       val responses =

http://git-wip-us.apache.org/repos/asf/kafka/blob/31e32b38/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
index af11a49..3c2bf36 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
@@ -246,7 +246,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
       assertEquals("Should have offset 0", 0L, producer.send(record).get.offset)
 
       // double check that the topic is created with leader elected
-     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
+      TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
 
     } finally {
       if (producer != null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/31e32b38/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 034f361..4da0f2c 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -506,7 +506,7 @@ object TestUtils extends Logging {
                                         oldLeaderOpt: Option[Int] = None, newLeaderOpt: Option[Int]
= None): Option[Int] = {
     require(!(oldLeaderOpt.isDefined && newLeaderOpt.isDefined), "Can't define both
the old and the new leader")
     val startTime = System.currentTimeMillis()
-    var isLeaderElectedOrChanged = false;
+    var isLeaderElectedOrChanged = false
 
     trace("Waiting for leader to be elected or changed for partition [%s,%d], older leader
is %s, new leader is %s"
           .format(topic, partition, oldLeaderOpt, newLeaderOpt))
@@ -603,7 +603,18 @@ object TestUtils extends Logging {
     byteBuffer
   }
 
-  def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition:
Int, timeout: Long = 5000L) = {
+
+  /**
+   * Wait until a valid leader is propagated to the metadata cache in each broker.
+   * It assumes that the leader propagated to each broker is the same.
+   * @param servers The list of servers that the metadata should reach to
+   * @param topic The topic name
+   * @param partition The partition Id
+   * @param timeout The amount of time waiting on this condition before assert to fail
+   * @return The leader of the partition.
+   */
+  def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition:
Int, timeout: Long = 5000L): Int = {
+    var leader: Int = -1
     TestUtils.waitUntilTrue(() =>
       servers.foldLeft(true) {
         (result, server) =>
@@ -611,11 +622,14 @@ object TestUtils extends Logging {
           partitionStateOpt match {
             case None => false
             case Some(partitionState) =>
-            result && Request.isValidBrokerId(partitionState.leaderIsrAndControllerEpoch.leaderAndIsr.leader)
+              leader = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr.leader
+              result && Request.isValidBrokerId(leader)
           }
       },
       "Partition [%s,%d] metadata not propagated after %d ms".format(topic, partition, timeout),
       waitTime = timeout)
+
+    leader
   }
   
   def writeNonsenseToFile(fileName: File, position: Long, size: Int) {


Mime
View raw message