kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: MINOR: Cleanup in tests to avoid threads being left behind
Date Sat, 27 May 2017 09:18:00 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 0bc4f75ee -> eb3aae7a0


MINOR: Cleanup in tests to avoid threads being left behind

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #3146 from rajinisivaram/MINOR-test-cleanup


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/eb3aae7a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/eb3aae7a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/eb3aae7a

Branch: refs/heads/trunk
Commit: eb3aae7a058845852024fb0a551e1d2917ac121c
Parents: 0bc4f75
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Authored: Sat May 27 10:17:57 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Sat May 27 10:17:57 2017 +0100

----------------------------------------------------------------------
 .../admin/ResetConsumerGroupOffsetTest.scala    | 95 ++++++++++----------
 .../controller/ControllerEventManagerTest.scala |  3 +-
 2 files changed, 52 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/eb3aae7a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
index d58231e..67d03e9 100644
--- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
@@ -25,7 +25,8 @@ import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.common.errors.WakeupException
 import org.apache.kafka.common.serialization.StringDeserializer
-import org.junit.{Before, Test}
+import org.junit.{Before, After, Test}
+import scala.collection.mutable.ArrayBuffer
 
 /**
   * Test cases by:
@@ -44,6 +45,8 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
   val topic2 = "foo2"
   val group = "test.group"
   val props = new Properties
+  val consumerGroupServices = new ArrayBuffer[KafkaConsumerGroupService]
+  val executors = new ArrayBuffer[ConsumerGroupExecutor]
 
   /**
     * Implementations must override this method to return a set of KafkaConfigs. This method
will be invoked for every
@@ -58,20 +61,29 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     props.setProperty("group.id", group)
   }
 
+  @After
+  override def tearDown() {
+    try {
+      executors.foreach(_.shutdown())
+      consumerGroupServices.foreach(_.close())
+    } finally {
+      super.tearDown()
+    }
+  }
+
   @Test
   def testResetOffsetsNotExistingGroup() {
-    new ConsumerGroupExecutor(brokerList, 1, group, topic1)
+    createConsumerGroupExecutor(brokerList, 1, group, topic1)
 
     val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", "missing.group",
"--all-topics")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+    val consumerGroupCommand = createConsumerGroupService(opts)
 
     TestUtils.waitUntilTrue(() => {
       val assignmentsToReset = consumerGroupCommand.resetOffsets()
       assignmentsToReset == Map.empty
     }, "Expected to have an empty assignations map.")
 
-    consumerGroupCommand.close()
   }
 
   @Test
@@ -88,9 +100,9 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
     val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group,
"--all-topics")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+    val consumerGroupCommand = createConsumerGroupService(opts)
 
-    val executor = new ConsumerGroupExecutor(brokerList, 1, group, topic1)
+    val executor = createConsumerGroupExecutor(brokerList, 1, group, topic1)
 
     TestUtils.waitUntilTrue(() => {
       val (_, assignmentsOption) = consumerGroupCommand.describeGroup()
@@ -109,7 +121,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
     val cgcArgs1 = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group",
group, "--all-topics", "--to-datetime", format.format(calendar.getTime), "--execute")
     val opts1 = new ConsumerGroupCommandOptions(cgcArgs1)
-    val consumerGroupCommand1 = new KafkaConsumerGroupService(opts1)
+    val consumerGroupCommand1 = createConsumerGroupService(opts1)
 
     TestUtils.waitUntilTrue(() => {
       val assignmentsToReset = consumerGroupCommand1.resetOffsets()
@@ -119,7 +131,6 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     printConsumerGroup()
 
     AdminUtils.deleteTopic(zkUtils, topic1)
-    consumerGroupCommand.close()
   }
 
   @Test
@@ -134,9 +145,9 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
     val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group,
"--all-topics")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+    val consumerGroupCommand = createConsumerGroupService(opts)
 
-    val executor = new ConsumerGroupExecutor(brokerList, 1, group, topic1)
+    val executor = createConsumerGroupExecutor(brokerList, 1, group, topic1)
 
     TestUtils.waitUntilTrue(() => {
       val (_, assignmentsOption) = consumerGroupCommand.describeGroup()
@@ -154,7 +165,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
     val cgcArgs1 = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group",
group, "--all-topics", "--to-datetime", format.format(checkpoint), "--execute")
     val opts1 = new ConsumerGroupCommandOptions(cgcArgs1)
-    val consumerGroupCommand1 = new KafkaConsumerGroupService(opts1)
+    val consumerGroupCommand1 = createConsumerGroupService(opts1)
 
     TestUtils.waitUntilTrue(() => {
       val assignmentsToReset = consumerGroupCommand1.resetOffsets()
@@ -164,14 +175,13 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     printConsumerGroup()
 
     AdminUtils.deleteTopic(zkUtils, topic1)
-    consumerGroupCommand.close()
   }
 
   @Test
   def testResetOffsetsByDuration() {
     val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group,
"--all-topics", "--by-duration", "PT1M", "--execute")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+    val consumerGroupCommand = createConsumerGroupService(opts)
 
     AdminUtils.createTopic(zkUtils, topic1, 1, 1)
 
@@ -185,14 +195,13 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     printConsumerGroup()
 
     AdminUtils.deleteTopic(zkUtils, topic1)
-    consumerGroupCommand.close()
   }
 
   @Test
   def testResetOffsetsByDurationToEarliest() {
     val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group,
"--all-topics", "--by-duration", "PT0.1S", "--execute")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+    val consumerGroupCommand = createConsumerGroupService(opts)
 
     AdminUtils.createTopic(zkUtils, topic1, 1, 1)
 
@@ -205,14 +214,13 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
     printConsumerGroup()
     AdminUtils.deleteTopic(zkUtils, topic1)
-    consumerGroupCommand.close()
   }
 
   @Test
   def testResetOffsetsToEarliest() {
     val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group,
"--all-topics", "--to-earliest", "--execute")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+    val consumerGroupCommand = createConsumerGroupService(opts)
 
     AdminUtils.createTopic(zkUtils, topic1, 1, 1)
 
@@ -225,14 +233,13 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
     printConsumerGroup()
     AdminUtils.deleteTopic(zkUtils, topic1)
-    consumerGroupCommand.close()
   }
 
   @Test
   def testResetOffsetsToLatest() {
     val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group,
"--all-topics", "--to-latest", "--execute")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+    val consumerGroupCommand = createConsumerGroupService(opts)
 
     AdminUtils.createTopic(zkUtils, topic1, 1, 1)
 
@@ -248,14 +255,13 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
     printConsumerGroup()
     AdminUtils.deleteTopic(zkUtils, topic1)
-    consumerGroupCommand.close()
   }
 
   @Test
   def testResetOffsetsToCurrentOffset() {
     val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group,
"--all-topics", "--execute")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+    val consumerGroupCommand = createConsumerGroupService(opts)
 
     AdminUtils.createTopic(zkUtils, topic1, 1, 1)
 
@@ -271,12 +277,11 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
     printConsumerGroup()
     AdminUtils.deleteTopic(zkUtils, topic1)
-    consumerGroupCommand.close()
   }
 
   private def produceConsumeAndShutdown(consumerGroupCommand: KafkaConsumerGroupService,
numConsumers: Int = 1, topic: String, totalMessages: Int) {
     TestUtils.produceMessages(servers, topic, totalMessages, acks = 1, 100 * 1000)
-    val executor = new ConsumerGroupExecutor(brokerList, numConsumers, group, topic)
+    val executor = createConsumerGroupExecutor(brokerList, numConsumers, group, topic)
 
 
     TestUtils.waitUntilTrue(() => {
@@ -299,7 +304,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
   def testResetOffsetsToSpecificOffset() {
     val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group,
"--all-topics", "--to-offset", "1", "--execute")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+    val consumerGroupCommand = createConsumerGroupService(opts)
 
     AdminUtils.createTopic(zkUtils, topic1, 1, 1)
 
@@ -313,14 +318,13 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
     printConsumerGroup()
     AdminUtils.deleteTopic(zkUtils, topic1)
-    consumerGroupCommand.close()
   }
 
   @Test
   def testResetOffsetsShiftPlus() {
     val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group,
"--all-topics", "--shift-by", "50", "--execute")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+    val consumerGroupCommand = createConsumerGroupService(opts)
 
     AdminUtils.createTopic(zkUtils, topic1, 1, 1)
 
@@ -335,14 +339,13 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
     printConsumerGroup()
     AdminUtils.deleteTopic(zkUtils, topic1)
-    consumerGroupCommand.close()
   }
 
   @Test
   def testResetOffsetsShiftMinus() {
     val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group,
"--all-topics", "--shift-by", "-50", "--execute")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+    val consumerGroupCommand = createConsumerGroupService(opts)
 
     AdminUtils.createTopic(zkUtils, topic1, 1, 1)
 
@@ -358,14 +361,13 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
     printConsumerGroup()
     AdminUtils.deleteTopic(zkUtils, topic1)
-    consumerGroupCommand.close()
   }
 
   @Test
   def testResetOffsetsShiftByLowerThanEarliest() {
     val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group,
"--all-topics", "--shift-by", "-150", "--execute")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+    val consumerGroupCommand = createConsumerGroupService(opts)
 
     AdminUtils.createTopic(zkUtils, topic1, 1, 1)
 
@@ -380,14 +382,13 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
     printConsumerGroup()
     AdminUtils.deleteTopic(zkUtils, topic1)
-    consumerGroupCommand.close()
   }
 
   @Test
   def testResetOffsetsShiftByHigherThanLatest() {
     val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group,
"--all-topics", "--shift-by", "150", "--execute")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+    val consumerGroupCommand = createConsumerGroupService(opts)
 
     AdminUtils.createTopic(zkUtils, topic1, 1, 1)
 
@@ -402,14 +403,13 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
     printConsumerGroup()
     AdminUtils.deleteTopic(zkUtils, topic1)
-    consumerGroupCommand.close()
   }
 
   @Test
   def testResetOffsetsToEarliestOnOneTopic() {
     val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group,
"--topic", topic1, "--to-earliest", "--execute")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+    val consumerGroupCommand = createConsumerGroupService(opts)
 
     AdminUtils.createTopic(zkUtils, topic1, 1, 1)
 
@@ -422,14 +422,13 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
     printConsumerGroup()
     AdminUtils.deleteTopic(zkUtils, topic1)
-    consumerGroupCommand.close()
   }
 
   @Test
   def testResetOffsetsToEarliestOnOneTopicAndPartition() {
     val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group,
"--topic", String.format("%s:1", topic1), "--to-earliest", "--execute")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+    val consumerGroupCommand = createConsumerGroupService(opts)
 
     AdminUtils.createTopic(zkUtils, topic1, 2, 1)
 
@@ -442,7 +441,6 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
     printConsumerGroup()
     AdminUtils.deleteTopic(zkUtils, topic1)
-    consumerGroupCommand.close()
   }
 
   @Test
@@ -453,7 +451,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
       "--topic", topic2,
       "--to-earliest", "--execute")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+    val consumerGroupCommand = createConsumerGroupService(opts)
 
     AdminUtils.createTopic(zkUtils, topic1, 1, 1)
     AdminUtils.createTopic(zkUtils, topic2, 1, 1)
@@ -470,7 +468,6 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     printConsumerGroup()
     AdminUtils.deleteTopic(zkUtils, topic1)
     AdminUtils.deleteTopic(zkUtils, topic2)
-    consumerGroupCommand.close()
   }
 
   @Test
@@ -481,7 +478,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
       "--topic", String.format("%s:1", topic2),
       "--to-earliest", "--execute")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+    val consumerGroupCommand = createConsumerGroupService(opts)
 
     AdminUtils.createTopic(zkUtils, topic1, 2, 1)
     AdminUtils.createTopic(zkUtils, topic2, 2, 1)
@@ -498,14 +495,13 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     printConsumerGroup()
     AdminUtils.deleteTopic(zkUtils, topic1)
     AdminUtils.deleteTopic(zkUtils, topic2)
-    consumerGroupCommand.close()
   }
 
   @Test
   def testResetOffsetsExportImportPlan() {
     val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group,
"--all-topics", "--to-earliest", "--export")
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
+    val consumerGroupCommand = createConsumerGroupService(opts)
 
     AdminUtils.createTopic(zkUtils, topic1, 2, 1)
 
@@ -524,8 +520,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
     val cgcArgsExec = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group",
group, "--all-topics", "--to-earliest", "--from-file", file.getCanonicalPath)
     val optsExec = new ConsumerGroupCommandOptions(cgcArgsExec)
-    val consumerGroupCommandExec = new KafkaConsumerGroupService(optsExec)
-
+    val consumerGroupCommandExec = createConsumerGroupService(optsExec)
 
     TestUtils.waitUntilTrue(() => {
         val assignmentsToReset = consumerGroupCommandExec.resetOffsets()
@@ -536,7 +531,6 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
     printConsumerGroup()
     AdminUtils.deleteTopic(zkUtils, topic1)
-    consumerGroupCommand.close()
   }
 
   private def printConsumerGroup() {
@@ -544,6 +538,17 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     ConsumerGroupCommand.main(cgcArgs)
   }
 
+  private def createConsumerGroupExecutor(brokerList: String, numConsumers: Int, groupId:
String, topic: String): ConsumerGroupExecutor = {
+    val executor = new ConsumerGroupExecutor(brokerList, numConsumers, groupId, topic)
+    executors += executor
+    executor
+  }
+
+  private def createConsumerGroupService(opts: ConsumerGroupCommandOptions): KafkaConsumerGroupService
= {
+    val service = new KafkaConsumerGroupService(opts)
+    consumerGroupServices += service
+    service
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/eb3aae7a/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
index fccb566..727c4f3 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
@@ -24,7 +24,7 @@ import com.yammer.metrics.Metrics
 import com.yammer.metrics.core.Timer
 import kafka.utils.TestUtils
 import org.easymock.{EasyMock, IAnswer}
-import org.junit.Test
+import org.junit.{After, Test}
 import org.junit.Assert.{assertEquals, fail}
 
 import scala.collection.JavaConverters._
@@ -33,6 +33,7 @@ class ControllerEventManagerTest {
 
   private var controllerEventManager: ControllerEventManager = _
 
+  @After
   def tearDown(): Unit = {
     if (controllerEventManager != null)
       controllerEventManager.close()


Mime
View raw message