kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5371; Increase request timeout for producer used by testReachableServer
Date Sun, 04 Jun 2017 14:31:46 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 c4b6df975 -> 384293911


KAFKA-5371; Increase request timeout for producer used by testReachableServer

500ms is low for a shared Jenkins environment.

Also removed the try/catch blocks that simply obscured
the underlying error.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Apurva Mehta <apurva@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>

Closes #3225 from ijuma/kafka-5371-flaky-testReachableServer

(cherry picked from commit 9b58372dcce5dc96826d4786123513a4d8c7b39f)
Signed-off-by: Ismael Juma <ismael@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/38429391
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/38429391
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/38429391

Branch: refs/heads/0.11.0
Commit: 384293911ec561c4e96ba74ffedd2a79c75045bf
Parents: c4b6df9
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Sun Jun 4 15:28:06 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Sun Jun 4 15:31:42 2017 +0100

----------------------------------------------------------------------
 .../unit/kafka/producer/SyncProducerTest.scala  | 41 +++++++-------------
 .../test/scala/unit/kafka/utils/TestUtils.scala |  3 +-
 2 files changed, 16 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/38429391/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index 41a8a6c..cde49de 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -52,38 +52,25 @@ class SyncProducerTest extends KafkaServerTestHarness {
   @Test
   def testReachableServer() {
     val server = servers.head
-
     val props = TestUtils.getSyncProducerConfig(boundPort(server))
 
-
     val producer = new SyncProducer(new SyncProducerConfig(props))
+
     val firstStart = Time.SYSTEM.milliseconds
-    try {
-      val response = producer.send(produceRequest("test", 0,
-        new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)),
acks = 1))
-      assertNotNull(response)
-    } catch {
-      case e: Exception => fail("Unexpected failure sending message to broker. " + e.getMessage)
-    }
-    val firstEnd = Time.SYSTEM.milliseconds
-    assertTrue((firstEnd-firstStart) < 2000)
+    var response = producer.send(produceRequest("test", 0,
+      new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)),
acks = 1))
+    assertNotNull(response)
+    assertTrue((Time.SYSTEM.milliseconds - firstStart) < 12000)
+
     val secondStart = Time.SYSTEM.milliseconds
-    try {
-      val response = producer.send(produceRequest("test", 0,
-        new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)),
acks = 1))
-      assertNotNull(response)
-    } catch {
-      case e: Exception => fail("Unexpected failure sending message to broker. " + e.getMessage)
-    }
-    val secondEnd = Time.SYSTEM.milliseconds
-    assertTrue((secondEnd-secondStart) < 2000)
-    try {
-      val response = producer.send(produceRequest("test", 0,
-        new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)),
acks = 1))
-      assertNotNull(response)
-    } catch {
-      case e: Exception => fail("Unexpected failure sending message to broker. " + e.getMessage)
-    }
+    response = producer.send(produceRequest("test", 0,
+      new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)),
acks = 1))
+    assertNotNull(response)
+    assertTrue((Time.SYSTEM.milliseconds - secondStart) < 12000)
+
+    response = producer.send(produceRequest("test", 0,
+      new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)),
acks = 1))
+    assertNotNull(response)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/38429391/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index a0f4762..aae58cc 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -643,11 +643,12 @@ object TestUtils extends Logging {
     props
   }
 
+  @deprecated("This method has been deprecated and will be removed in a future release",
"0.11.0.0")
   def getSyncProducerConfig(port: Int): Properties = {
     val props = new Properties()
     props.put("host", "localhost")
     props.put("port", port.toString)
-    props.put("request.timeout.ms", "500")
+    props.put("request.timeout.ms", "10000")
     props.put("request.required.acks", "1")
     props.put("serializer.class", classOf[StringEncoder].getName)
     props


Mime
View raw message