kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject git commit: kafka-1433; transient unit test failure in ZookeeperConsumerConnectorTest; patched by Jun Rao; reviewed by Guozhang Wang
Date Thu, 01 May 2014 15:54:09 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 3de97c7c4 -> 6b4217d19


kafka-1433; transient unit test failure in ZookeeperConsumerConnectorTest; patched by Jun
Rao; reviewed by Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6b4217d1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6b4217d1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6b4217d1

Branch: refs/heads/trunk
Commit: 6b4217d192275e35b6b6d1a73ef37dcba46d1ac1
Parents: 3de97c7
Author: Jun Rao <junrao@gmail.com>
Authored: Thu May 1 08:54:03 2014 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Thu May 1 08:54:03 2014 -0700

----------------------------------------------------------------------
 .../ZookeeperConsumerConnectorTest.scala        |  34 +++---
 .../kafka/integration/PrimitiveApiTest.scala    |  42 ++++---
 .../unit/kafka/producer/AsyncProducerTest.scala |  22 ++--
 .../unit/kafka/producer/ProducerTest.scala      | 117 ++++++++++---------
 .../test/scala/unit/kafka/utils/TestUtils.scala |  17 ++-
 5 files changed, 123 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4217d1/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index 40a25a2..96fa0bd 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -343,12 +343,14 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
                                     compression: CompressionCodec = NoCompressionCodec):
List[String] = {
     val header = "test-%d-%d".format(config.brokerId, partition)
     val props = new Properties()
-    props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
-    props.put("partitioner.class", "kafka.utils.FixedValuePartitioner")
     props.put("compression.codec", compression.codec.toString)
-    props.put("key.serializer.class", classOf[IntEncoder].getName.toString)
-    props.put("serializer.class", classOf[StringEncoder].getName.toString)
-    val producer: Producer[Int, String] = new Producer[Int, String](new ProducerConfig(props))
+    val producer: Producer[Int, String] =
+      createProducer(TestUtils.getBrokerListStrFromConfigs(configs),
+                     encoder = classOf[StringEncoder].getName,
+                     keyEncoder = classOf[IntEncoder].getName,
+                     partitioner = classOf[FixedValuePartitioner].getName,
+                     producerProps = props)
+
     val ms = 0.until(numMessages).map(x => header + config.brokerId + "-" + partition
+ "-" + x)
     producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*)
     debug("Sent %d messages to broker %d for partition [%s,%d]".format(ms.size, config.brokerId,
topic, partition))
@@ -363,11 +365,14 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
                    numParts: Int): List[String]= {
     var messages: List[String] = Nil
     val props = new Properties()
-    props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
-    props.put("partitioner.class", "kafka.utils.FixedValuePartitioner")
-    props.put("key.serializer.class", classOf[IntEncoder].getName.toString)
-    props.put("serializer.class", classOf[StringEncoder].getName)
-    val producer: Producer[Int, String] = new Producer[Int, String](new ProducerConfig(props))
+    props.put("compression.codec", compression.codec.toString)
+    val producer: Producer[Int, String] =
+      createProducer(brokerList = TestUtils.getBrokerListStrFromConfigs(configs),
+                     encoder = classOf[StringEncoder].getName,
+                     keyEncoder = classOf[IntEncoder].getName,
+                     partitioner = classOf[FixedValuePartitioner].getName,
+                     producerProps = props)
+
     for (partition <- 0 until numParts) {
       val ms = 0.until(messagesPerNode).map(x => header + config.brokerId + "-" + partition
+ "-" + x)
       producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*)
@@ -378,14 +383,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     messages
   }
 
-  def sendMessages(messagesPerNode: Int, header: String, compression: CompressionCodec =
NoCompressionCodec): List[String]= {
-    var messages: List[String] = Nil
-    for(conf <- configs)
-      messages ++= sendMessages(conf, messagesPerNode, header, compression, numParts)
-    messages
-  }
-
-  def getMessages(nMessagesPerThread: Int, 
+  def getMessages(nMessagesPerThread: Int,
                   topicMessageStreams: Map[String,List[KafkaStream[String, String]]]): List[String]=
{
     var messages: List[String] = Nil
     for((topic, messageStreams) <- topicMessageStreams) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4217d1/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index 97e3b14..9f04bd3 100644
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
 import junit.framework.Assert._
 import kafka.api.{PartitionFetchInfo, FetchRequest, FetchRequestBuilder}
 import kafka.server.{KafkaRequestHandler, KafkaConfig}
-import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
+import kafka.producer.{KeyedMessage, Producer}
 import org.apache.log4j.{Level, Logger}
 import org.I0Itec.zkclient.ZkClient
 import kafka.zk.ZooKeeperTestHarness
@@ -29,7 +29,9 @@ import org.scalatest.junit.JUnit3Suite
 import scala.collection._
 import kafka.admin.AdminUtils
 import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException}
-import kafka.utils.{TestUtils, Utils}
+import kafka.utils.{StaticPartitioner, TestUtils, Utils}
+import kafka.serializer.StringEncoder
+import java.util.Properties
 
 /**
  * End to end tests of the primitive apis against a local server
@@ -67,11 +69,8 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness
with
 
   def testDefaultEncoderProducerAndFetch() {
     val topic = "test-topic"
-    val props = producer.config.props.props
-    val config = new ProducerConfig(props)
 
-    val stringProducer1 = new Producer[String, String](config)
-    stringProducer1.send(new KeyedMessage[String, String](topic, "test-message"))
+    producer.send(new KeyedMessage[String, String](topic, "test-message"))
 
     val replica = servers.head.replicaManager.getReplica(topic, 0).get
     assertTrue("HighWatermark should equal logEndOffset with just 1 replica",
@@ -93,11 +92,16 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness
with
 
   def testDefaultEncoderProducerAndFetchWithCompression() {
     val topic = "test-topic"
-    val props = producer.config.props.props
+    val props = new Properties()
     props.put("compression.codec", "gzip")
-    val config = new ProducerConfig(props)
 
-    val stringProducer1 = new Producer[String, String](config)
+    val stringProducer1 = TestUtils.createProducer[String, String](
+      TestUtils.getBrokerListStrFromConfigs(configs),
+      encoder = classOf[StringEncoder].getName,
+      keyEncoder = classOf[StringEncoder].getName,
+      partitioner = classOf[StaticPartitioner].getName,
+      producerProps = props)
+
     stringProducer1.send(new KeyedMessage[String, String](topic, "test-message"))
 
     val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
@@ -172,10 +176,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness
with
   }
 
   def testProduceAndMultiFetch() {
-    val props = producer.config.props.props
-    val config = new ProducerConfig(props)
-    val noCompressionProducer = new Producer[String, String](config)
-    produceAndMultiFetch(noCompressionProducer)
+    produceAndMultiFetch(producer)
   }
 
   private def multiProduce(producer: Producer[String, String]) {
@@ -201,10 +202,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness
with
   }
 
   def testMultiProduce() {
-    val props = producer.config.props.props
-    val config = new ProducerConfig(props)
-    val noCompressionProducer = new Producer[String, String](config)
-    multiProduce(noCompressionProducer)
+    multiProduce(producer)
   }
 
   def testConsumerEmptyTopic() {
@@ -218,9 +216,15 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness
with
   def testPipelinedProduceRequests() {
     val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0)
     createSimpleTopicsAndAwaitLeader(zkClient, topics.keys)
-    val props = producer.config.props.props
+    val props = new Properties()
     props.put("request.required.acks", "0")
-    val pipelinedProducer: Producer[String, String] = new Producer(new ProducerConfig(props))
+    val pipelinedProducer: Producer[String, String] =
+      TestUtils.createProducer[String, String](
+        TestUtils.getBrokerListStrFromConfigs(configs),
+        encoder = classOf[StringEncoder].getName,
+        keyEncoder = classOf[StringEncoder].getName,
+        partitioner = classOf[StaticPartitioner].getName,
+        producerProps = props)
 
     // send some messages
     val messages = new mutable.HashMap[String, Seq[String]]

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4217d1/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 6c3feac..906600c 100644
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -84,15 +84,11 @@ class AsyncProducerTest extends JUnit3Suite {
 
   @Test
   def testProduceAfterClosed() {
-    val props = new Properties()
-    props.put("serializer.class", "kafka.serializer.StringEncoder")
-    props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
-    props.put("producer.type", "async")
-    props.put("batch.num.messages", "1")
-
-    val config = new ProducerConfig(props)
     val produceData = getProduceData(10)
-    val producer = new Producer[String, String](config)
+    val producer = createProducer[String, String](
+      getBrokerListStrFromConfigs(configs),
+      encoder = classOf[StringEncoder].getName)
+
     producer.close
 
     try {
@@ -303,10 +299,14 @@ class AsyncProducerTest extends JUnit3Suite {
   @Test
   def testIncompatibleEncoder() {
     val props = new Properties()
-    props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
-    val config = new ProducerConfig(props)
+    // no need to retry since the send will always fail
+    props.put("message.send.max.retries", "0")
+    val producer= createProducer[String, String](
+      brokerList = getBrokerListStrFromConfigs(configs),
+      encoder = classOf[DefaultEncoder].getName,
+      keyEncoder = classOf[DefaultEncoder].getName,
+      producerProps = props)
 
-    val producer=new Producer[String, String](config)
     try {
       producer.send(getProduceData(1): _*)
       fail("Should fail with ClassCastException due to incompatible Encoder")

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4217d1/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index c3da69d..b61c0b8 100644
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -34,7 +34,7 @@ import org.junit.Assert.assertTrue
 import org.junit.Assert.assertFalse
 import org.junit.Assert.assertEquals
 import kafka.common.{ErrorMapping, FailedToSendMessageException}
-
+import kafka.serializer.StringEncoder
 
 class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
   private val brokerId1 = 0
@@ -94,26 +94,30 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with
Logging{
     val topic = "new-topic"
     TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 2, servers
= servers)
 
-    val props1 = new util.Properties()
-    props1.put("metadata.broker.list", "localhost:80,localhost:81")
-    props1.put("serializer.class", "kafka.serializer.StringEncoder")
-    val producerConfig1 = new ProducerConfig(props1)
-    val producer1 = new Producer[String, String](producerConfig1)
+    val props = new Properties()
+    // no need to retry since the send will always fail
+    props.put("message.send.max.retries", "0")
+    val producer1 = TestUtils.createProducer[String, String](
+        brokerList = "localhost:80,localhost:81",
+        encoder = classOf[StringEncoder].getName,
+        keyEncoder = classOf[StringEncoder].getName,
+        producerProps = props)
+
     try{
       producer1.send(new KeyedMessage[String, String](topic, "test", "test1"))
       fail("Test should fail because the broker list provided are not valid")
     } catch {
-      case e: FailedToSendMessageException =>
+      case e: FailedToSendMessageException => // this is expected
       case oe: Throwable => fail("fails with exception", oe)
     } finally {
       producer1.close()
     }
 
-    val props2 = new util.Properties()
-    props2.put("metadata.broker.list", "localhost:80," + TestUtils.getBrokerListStrFromConfigs(Seq(
config1)))
-    props2.put("serializer.class", "kafka.serializer.StringEncoder")
-    val producerConfig2= new ProducerConfig(props2)
-    val producer2 = new Producer[String, String](producerConfig2)
+    val producer2 = TestUtils.createProducer[String, String](
+      brokerList = "localhost:80," + TestUtils.getBrokerListStrFromConfigs(Seq(config1)),
+      encoder = classOf[StringEncoder].getName,
+      keyEncoder = classOf[StringEncoder].getName)
+
     try{
       producer2.send(new KeyedMessage[String, String](topic, "test", "test1"))
     } catch {
@@ -122,11 +126,11 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with
Logging{
       producer2.close()
     }
 
-    val props3 = new util.Properties()
-    props3.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1,
config2)))
-    props3.put("serializer.class", "kafka.serializer.StringEncoder")
-    val producerConfig3 = new ProducerConfig(props3)
-    val producer3 = new Producer[String, String](producerConfig3)
+    val producer3 =  TestUtils.createProducer[String, String](
+      brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)),
+      encoder = classOf[StringEncoder].getName,
+      keyEncoder = classOf[StringEncoder].getName)
+
     try{
       producer3.send(new KeyedMessage[String, String](topic, "test", "test1"))
     } catch {
@@ -139,26 +143,19 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with
Logging{
   @Test
   def testSendToNewTopic() {
     val props1 = new util.Properties()
-    props1.put("serializer.class", "kafka.serializer.StringEncoder")
-    props1.put("partitioner.class", "kafka.utils.StaticPartitioner")
-    props1.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1,
config2)))
     props1.put("request.required.acks", "2")
-    props1.put("request.timeout.ms", "1000")
-
-    val props2 = new util.Properties()
-    props2.putAll(props1)
-    props2.put("request.required.acks", "3")
-    props2.put("request.timeout.ms", "1000")
-
-    val producerConfig1 = new ProducerConfig(props1)
-    val producerConfig2 = new ProducerConfig(props2)
 
     val topic = "new-topic"
     // create topic with 1 partition and await leadership
     TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 2, servers
= servers)
 
-    val producer1 = new Producer[String, String](producerConfig1)
-    val producer2 = new Producer[String, String](producerConfig2)
+    val producer1 = TestUtils.createProducer[String, String](
+      brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)),
+      encoder = classOf[StringEncoder].getName,
+      keyEncoder = classOf[StringEncoder].getName,
+      partitioner = classOf[StaticPartitioner].getName,
+      producerProps = props1)
+
     // Available partition ids should be 0.
     producer1.send(new KeyedMessage[String, String](topic, "test", "test1"))
     producer1.send(new KeyedMessage[String, String](topic, "test", "test2"))
@@ -179,6 +176,18 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with
Logging{
     assertEquals(new Message(bytes = "test2".getBytes, key = "test".getBytes), messageSet(1).message)
     producer1.close()
 
+    val props2 = new util.Properties()
+    props2.put("request.required.acks", "3")
+    // no need to retry since the send will always fail
+    props2.put("message.send.max.retries", "0")
+
+    val producer2 = TestUtils.createProducer[String, String](
+      brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)),
+      encoder = classOf[StringEncoder].getName,
+      keyEncoder = classOf[StringEncoder].getName,
+      partitioner = classOf[StaticPartitioner].getName,
+      producerProps = props2)
+
     try {
       producer2.send(new KeyedMessage[String, String](topic, "test", "test2"))
       fail("Should have timed out for 3 acks.")
@@ -197,19 +206,23 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with
Logging{
   @Test
   def testSendWithDeadBroker() {
     val props = new Properties()
-    props.put("serializer.class", "kafka.serializer.StringEncoder")
-    props.put("partitioner.class", "kafka.utils.StaticPartitioner")
-    props.put("request.timeout.ms", "2000")
     props.put("request.required.acks", "1")
-    props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1,
config2)))
+    // No need to retry since the topic will be created beforehand and normal send will succeed
on the first try.
+    // Reducing the retries will save the time on the subsequent failure test.
+    props.put("message.send.max.retries", "0")
 
     val topic = "new-topic"
     // create topic
     TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0),
1->Seq(0), 2->Seq(0), 3->Seq(0)),
                           servers = servers)
 
-    val config = new ProducerConfig(props)
-    val producer = new Producer[String, String](config)
+    val producer = TestUtils.createProducer[String, String](
+      brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)),
+      encoder = classOf[StringEncoder].getName,
+      keyEncoder = classOf[StringEncoder].getName,
+      partitioner = classOf[StaticPartitioner].getName,
+      producerProps = props)
+
     try {
       // Available partition ids should be 0, 1, 2 and 3, all lead and hosted only
       // on broker 0
@@ -253,14 +266,16 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with
Logging{
   def testAsyncSendCanCorrectlyFailWithTimeout() {
     val timeoutMs = 500
     val props = new Properties()
-    props.put("serializer.class", "kafka.serializer.StringEncoder")
-    props.put("partitioner.class", "kafka.utils.StaticPartitioner")
     props.put("request.timeout.ms", String.valueOf(timeoutMs))
-    props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1,
config2)))
     props.put("request.required.acks", "1")
+    props.put("message.send.max.retries", "0")
     props.put("client.id","ProducerTest-testAsyncSendCanCorrectlyFailWithTimeout")
-    val config = new ProducerConfig(props)
-    val producer = new Producer[String, String](config)
+    val producer = TestUtils.createProducer[String, String](
+      brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)),
+      encoder = classOf[StringEncoder].getName,
+      keyEncoder = classOf[StringEncoder].getName,
+      partitioner = classOf[StaticPartitioner].getName,
+      producerProps = props)
 
     val topic = "new-topic"
     // create topics in ZK
@@ -296,20 +311,18 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with
Logging{
     }
     val t2 = SystemTime.milliseconds
 
-    // make sure we don't wait fewer than numRetries*timeoutMs milliseconds
-    // we do this because the DefaultEventHandler retries a number of times
-    assertTrue((t2-t1) >= timeoutMs*config.messageSendMaxRetries)
+    // make sure we don't wait fewer than timeoutMs
+    assertTrue((t2-t1) >= timeoutMs)
   }
   
   @Test
   def testSendNullMessage() {
-    val props = new Properties()
-    props.put("serializer.class", "kafka.serializer.StringEncoder")
-    props.put("partitioner.class", "kafka.utils.StaticPartitioner")
-    props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1,
config2)))
-    
-    val config = new ProducerConfig(props)
-    val producer = new Producer[String, String](config)
+    val producer = TestUtils.createProducer[String, String](
+      brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)),
+      encoder = classOf[StringEncoder].getName,
+      keyEncoder = classOf[StringEncoder].getName,
+      partitioner = classOf[StaticPartitioner].getName)
+
     try {
 
       // create topic

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4217d1/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 384c74e..49c7790 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -342,13 +342,12 @@ object TestUtils extends Logging {
                            keyEncoder: String = classOf[DefaultEncoder].getName,
                            partitioner: String = classOf[DefaultPartitioner].getName,
                            producerProps: Properties = null): Producer[K, V] = {
-    val props: Properties =
-    if (producerProps == null) {
-      getProducerConfig(brokerList)
-    } else {
-      producerProps.put("metadata.broker.list", brokerList)
-      producerProps
-    }
+    val props: Properties = getProducerConfig(brokerList)
+
+    //override any explicitly specified properties
+    if (producerProps != null)
+      props.putAll(producerProps)
+
     props.put("serializer.class", encoder)
     props.put("key.serializer.class", keyEncoder)
     props.put("partitioner.class", partitioner)
@@ -361,9 +360,9 @@ object TestUtils extends Logging {
   def getProducerConfig(brokerList: String): Properties = {
     val props = new Properties()
     props.put("metadata.broker.list", brokerList)
-    props.put("message.send.max.retries", "3")
+    props.put("message.send.max.retries", "5")
     props.put("retry.backoff.ms", "1000")
-    props.put("request.timeout.ms", "500")
+    props.put("request.timeout.ms", "2000")
     props.put("request.required.acks", "-1")
     props.put("send.buffer.bytes", "65536")
     props.put("connect.timeout.ms", "100000")


Mime
View raw message