kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4528: Fix failure in ProducerTest.testAsyncSendCanCorrectlyFailWithTimeout
Date Tue, 03 Jan 2017 17:11:13 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 9e9e6d875 -> 7f22ab655


KAFKA-4528: Fix failure in ProducerTest.testAsyncSendCanCorrectlyFailWithTimeout

I was able to reproduce the failure in less than 10 runs before the change. With the change,
the test passed 70 times consecutively.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Guozhang Wang

Closes #2298 from ijuma/kafka-4528-fix-test-async-send-timeout


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7f22ab65
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7f22ab65
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7f22ab65

Branch: refs/heads/trunk
Commit: 7f22ab6550813124ce2bb5254a925c0da3eb1ab9
Parents: 9e9e6d8
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Tue Jan 3 09:11:07 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Jan 3 09:11:07 2017 -0800

----------------------------------------------------------------------
 .../unit/kafka/producer/ProducerTest.scala      | 30 +++++++++++---------
 1 file changed, 16 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7f22ab65/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index ec51e20..769ea33 100755
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -25,7 +25,7 @@ import kafka.admin.AdminUtils
 import kafka.api.FetchRequestBuilder
 import kafka.common.FailedToSendMessageException
 import kafka.consumer.SimpleConsumer
-import kafka.message.Message
+import kafka.message.{Message, MessageAndOffset}
 import kafka.serializer.StringEncoder
 import kafka.server.{KafkaConfig, KafkaRequestHandler, KafkaServer}
 import kafka.utils._
@@ -77,10 +77,6 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
     server2 = TestUtils.createServer(config2)
     servers = List(server1,server2)
 
-    val props = new Properties()
-    props.put("host", "localhost")
-    props.put("port", server1.boundPort().toString)
-
     // temporarily set request handler logger to a higher level
     requestHandlerLogger.setLevel(Level.FATAL)
   }
@@ -288,11 +284,11 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
   def testAsyncSendCanCorrectlyFailWithTimeout() {
     val topic = "new-topic"
     // create topics in ZK
-    TestUtils.createTopic(zkUtils, topic, partitionReplicaAssignment = Map(0->Seq(0,1)),
servers = servers)
+    TestUtils.createTopic(zkUtils, topic, partitionReplicaAssignment = Map(0->Seq(0, 1)),
servers = servers)
 
     val timeoutMs = 500
     val props = new Properties()
-    props.put("request.timeout.ms", String.valueOf(timeoutMs))
+    props.put("request.timeout.ms", timeoutMs.toString)
     props.put("request.required.acks", "1")
     props.put("message.send.max.retries", "0")
     props.put("client.id","ProducerTest-testAsyncSendCanCorrectlyFailWithTimeout")
@@ -306,11 +302,16 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
     // do a simple test to make sure plumbing is okay
     try {
       // this message should be assigned to partition 0 whose leader is on broker 0
-      producer.send(new KeyedMessage[String, String](topic, "test", "test"))
-      // cross check if brokers got the messages
-      val response1 = getConsumer1().fetch(new FetchRequestBuilder().addFetch(topic, 0, 0,
10000).build())
-      val messageSet1 = response1.messageSet("new-topic", 0).iterator
-      assertTrue("Message set should have 1 message", messageSet1.hasNext)
+      producer.send(new KeyedMessage(topic, "test", "test"))
+      // cross check if the broker received the messages
+      // we need the loop because the broker won't return the message until it has been replicated
and the producer is
+      // using acks=1
+      var messageSet1: Iterator[MessageAndOffset] = null
+      TestUtils.waitUntilTrue(() => {
+        val response1 = getConsumer1().fetch(new FetchRequestBuilder().addFetch(topic, 0,
0, 10000).build())
+        messageSet1 = response1.messageSet(topic, 0).iterator
+        messageSet1.hasNext
+      }, "Message set should have 1 message")
       assertEquals(ByteBuffer.wrap("test".getBytes), messageSet1.next.message.payload)
 
       // stop IO threads and request handling, but leave networking operational
@@ -320,8 +321,9 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
       val t1 = Time.SYSTEM.milliseconds
       try {
         // this message should be assigned to partition 0 whose leader is on broker 0, but
-        // broker 0 will not response within timeoutMs millis.
-        producer.send(new KeyedMessage[String, String](topic, "test", "test"))
+        // broker 0 will not respond within timeoutMs millis.
+        producer.send(new KeyedMessage(topic, "test", "test"))
+        fail("Exception should have been thrown")
       } catch {
         case _: FailedToSendMessageException => /* success */
       }


Mime
View raw message