kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject git commit: kafka-1412; transient unit test failure in ProducerSendTest.testAutoCreateTopic; patched by Jun Rao; reviewed by Guozhang Wang and Neha Narkhede
Date Thu, 01 May 2014 23:35:39 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 198c793ab -> 0062d4cc9


kafka-1412; transient unit test failure in ProducerSendTest.testAutoCreateTopic; patched by
Jun Rao; reviewed by Guozhang Wang and Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0062d4cc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0062d4cc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0062d4cc

Branch: refs/heads/trunk
Commit: 0062d4cc94843bfc67b1cb03bad708a8121f9d06
Parents: 198c793
Author: Jun Rao <junrao@gmail.com>
Authored: Thu May 1 16:34:50 2014 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Thu May 1 16:35:18 2014 -0700

----------------------------------------------------------------------
 .../kafka/api/ProducerFailureHandlingTest.scala | 21 +++++-------------
 .../kafka/api/ProducerSendTest.scala            | 18 +++++----------
 .../test/scala/unit/kafka/utils/TestUtils.scala | 23 ++++++++++++++++++++
 3 files changed, 33 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0062d4cc/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 839fd27..a993e8c 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -63,18 +63,6 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
   private val topic1 = "topic-1"
   private val topic2 = "topic-2"
 
-  // TODO: move this function to TestUtils after we have server dependant on clients
-  private def makeProducer(brokerList: String, acks: Int, metadataFetchTimeout: Long,
-                           blockOnBufferFull: Boolean, bufferSize: Long) : KafkaProducer
= {
-    val producerProps = new Properties()
-    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
-    producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString)
-    producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, metadataFetchTimeout.toString)
-    producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, blockOnBufferFull.toString)
-    producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString)
-    return new KafkaProducer(producerProps)
-  }
-
   override def setUp() {
     super.setUp()
     server1 = TestUtils.createServer(config1)
@@ -85,10 +73,11 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
     consumer1 = new SimpleConsumer("localhost", port1, 100, 1024*1024, "")
     consumer2 = new SimpleConsumer("localhost", port2, 100, 1024*1024, "")
 
-    producer1 = makeProducer(brokerList, 0, 3000, false, bufferSize); // produce with ack=0
-    producer2 = makeProducer(brokerList, 1, 3000, false, bufferSize); // produce with ack=1
-    producer3 = makeProducer(brokerList, -1, 3000, false, bufferSize); // produce with ack=-1
-    producer4 = makeProducer("localhost:8686,localhost:4242", 1, 3000, false, bufferSize);
// produce with incorrect broker list
+    producer1 = TestUtils.createNewProducer(brokerList, acks = 0, blockOnBufferFull = false,
bufferSize = bufferSize);
+    producer2 = TestUtils.createNewProducer(brokerList, acks = 1, blockOnBufferFull = false,
bufferSize = bufferSize)
+    producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false,
bufferSize = bufferSize)
+    // producer with incorrect broker list
+    producer4 = TestUtils.createNewProducer("localhost:8686,localhost:4242", acks = 1, blockOnBufferFull
= false, bufferSize = bufferSize)
   }
 
   override def tearDown() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0062d4cc/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
index 2230333..af11a49 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
@@ -90,9 +90,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
    */
   @Test
   def testSendOffset() {
-    val props = new Properties()
-    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1,
config2)))
-    var producer = new KafkaProducer(props)
+    var producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromConfigs(Seq(config1,
config2)))
 
     val callback = new CheckErrorCallback
 
@@ -148,9 +146,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
    */
   @Test
   def testClose() {
-    val props = new Properties()
-    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1,
config2)))
-    var producer = new KafkaProducer(props)
+    var producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromConfigs(Seq(config1,
config2)))
 
     try {
       // create topic
@@ -186,10 +182,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness
{
    */
   @Test
   def testSendToPartition() {
-    val props = new Properties()
-    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1,
config2)))
-    props.put(ProducerConfig.ACKS_CONFIG, "-1")
-    var producer = new KafkaProducer(props)
+    var producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromConfigs(Seq(config1,
config2)))
 
     try {
       // create topic
@@ -244,9 +237,8 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
    */
   @Test
   def testAutoCreateTopic() {
-    val props = new Properties()
-    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1,
config2)))
-    var producer = new KafkaProducer(props)
+    var producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromConfigs(Seq(config1,
config2)),
+                                            retries = 5)
 
     try {
       // Send a message to auto-create the topic

http://git-wip-us.apache.org/repos/asf/kafka/blob/0062d4cc/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 49c7790..00bfba4 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -42,6 +42,7 @@ import kafka.producer.ProducerConfig
 
 import junit.framework.AssertionFailedError
 import junit.framework.Assert._
+import org.apache.kafka.clients.producer.KafkaProducer
 
 /**
  * Utility functions to help with testing
@@ -355,6 +356,28 @@ object TestUtils extends Logging {
   }
 
   /**
+   * Create a (new) producer with a few pre-configured properties.
+   */
+  def createNewProducer(brokerList: String,
+                        acks: Int = -1,
+                        metadataFetchTimeout: Long = 3000L,
+                        blockOnBufferFull: Boolean = true,
+                        bufferSize: Long = 1024L * 1024L,
+                        retries: Int = 0) : KafkaProducer = {
+    import org.apache.kafka.clients.producer.ProducerConfig
+
+    val producerProps = new Properties()
+    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString)
+    producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, metadataFetchTimeout.toString)
+    producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, blockOnBufferFull.toString)
+    producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString)
+    producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)
+    producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000")
+    return new KafkaProducer(producerProps)
+  }
+
+  /**
    * Create a default producer config properties map with the given metadata broker list
    */
   def getProducerConfig(brokerList: String): Properties = {


Mime
View raw message