kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-3217: Close producers in unit tests
Date Tue, 09 Feb 2016 17:49:43 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 9cac38c02 -> 9f5a1f876


KAFKA-3217: Close producers in unit tests

Producers that are not closed auto-create topics in subsequent tests when Kafka server port
is reused. Added missing close().

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #882 from rajinisivaram/KAFKA-3217


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9f5a1f87
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9f5a1f87
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9f5a1f87

Branch: refs/heads/trunk
Commit: 9f5a1f87667c23db557a712d51c45541372f3c5d
Parents: 9cac38c
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Authored: Tue Feb 9 09:49:32 2016 -0800
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Tue Feb 9 09:49:32 2016 -0800

----------------------------------------------------------------------
 .../kafka/api/AuthorizerIntegrationTest.scala            |  2 ++
 .../integration/kafka/api/BaseProducerSendTest.scala     | 11 +++++++++--
 2 files changed, 11 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9f5a1f87/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index a54cbef..db2040f 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -154,6 +154,8 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
 
   @After
   override def tearDown() = {
+    producers.foreach(_.close())
+    consumers.foreach(_.close())
     removeAllAcls
     super.tearDown()
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9f5a1f87/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 29291d4..42928a3 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.clients.producer._
 import org.apache.kafka.common.errors.SerializationException
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
+import scala.collection.mutable.Buffer
 
 abstract class BaseProducerSendTest extends KafkaServerTestHarness {
 
@@ -43,6 +44,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
 
   private var consumer1: SimpleConsumer = null
   private var consumer2: SimpleConsumer = null
+  private val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
 
   private val topic = "topic"
   private val numRecords = 100
@@ -60,13 +62,18 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
   override def tearDown() {
     consumer1.close()
     consumer2.close()
+    // Ensure that all producers are closed since unclosed producers impact other tests when
Kafka server ports are reused
+    producers.foreach(_.close())
 
     super.tearDown()
   }
 
-  private def createProducer(brokerList: String, retries: Int = 0, lingerMs: Long = 0, props:
Option[Properties] = None): KafkaProducer[Array[Byte],Array[Byte]] =
-    TestUtils.createNewProducer(brokerList, securityProtocol = securityProtocol, trustStoreFile
= trustStoreFile,
+  private def createProducer(brokerList: String, retries: Int = 0, lingerMs: Long = 0, props:
Option[Properties] = None): KafkaProducer[Array[Byte],Array[Byte]] = {
+    val producer = TestUtils.createNewProducer(brokerList, securityProtocol = securityProtocol,
trustStoreFile = trustStoreFile,
       retries = retries, lingerMs = lingerMs, props = props)
+    producers += producer
+    producer
+  }
 
   /**
    * testSendOffset checks the basic send API behavior


Mime
View raw message