kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: MINOR: Use explicit construction of clients in IntegrationTestHarness (#5443)
Date Tue, 14 Aug 2018 21:34:49 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7a9631e  MINOR: Use explicit construction of clients in IntegrationTestHarness (#5443)
7a9631e is described below

commit 7a9631e6349239b2dff3728d9237e6a17aa5ad81
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Tue Aug 14 14:34:42 2018 -0700

    MINOR: Use explicit construction of clients in IntegrationTestHarness (#5443)
    
    Pre-initialization of clients in IntegrationTestHarness is a cause of significant confusion and has resulted in a bunch of inconsistent client creation patterns. This patch requires test cases to create needed clients explicitly and makes the creation logic more consistent.
    
    Reviewers: Manikumar Reddy O <manikumar.reddy@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>
---
 .../kafka/api/AdminClientIntegrationTest.scala     |   50 +-
 .../kafka/api/AuthorizerIntegrationTest.scala      |  280 +++---
 .../integration/kafka/api/BaseConsumerTest.scala   |   41 +-
 .../kafka/api/BaseProducerSendTest.scala           |   17 +-
 .../integration/kafka/api/BaseQuotaTest.scala      |   20 +-
 .../integration/kafka/api/ClientIdQuotaTest.scala  |    5 +-
 .../integration/kafka/api/ConsumerBounceTest.scala |  102 +-
 .../kafka/api/CustomQuotaCallbackTest.scala        |   27 +-
 .../kafka/api/EndToEndAuthorizationTest.scala      |  102 +-
 .../api/GroupCoordinatorIntegrationTest.scala      |    4 +-
 .../kafka/api/IntegrationTestHarness.scala         |   99 +-
 .../kafka/api/LegacyAdminClientTest.scala          |   14 +-
 .../integration/kafka/api/LogAppendTimeTest.scala  |    4 +-
 .../scala/integration/kafka/api/MetricsTest.scala  |   38 +-
 .../kafka/api/PlaintextConsumerTest.scala          | 1023 ++++++++++----------
 .../api/PlaintextEndToEndAuthorizationTest.scala   |    3 +-
 .../kafka/api/PlaintextProducerSendTest.scala      |   16 +-
 .../kafka/api/ProducerCompressionTest.scala        |    3 +-
 .../SaslClientsWithInvalidCredentialsTest.scala    |   34 +-
 .../kafka/api/SaslEndToEndAuthorizationTest.scala  |   20 +-
 .../kafka/api/SaslMultiMechanismConsumerTest.scala |   18 +-
 .../kafka/api/TransactionsBounceTest.scala         |   22 +-
 .../integration/kafka/api/TransactionsTest.scala   |   21 +-
 .../kafka/api/UserClientIdQuotaTest.scala          |    5 +-
 .../integration/kafka/api/UserQuotaTest.scala      |    5 +-
 .../server/DynamicBrokerReconfigurationTest.scala  |   39 +-
 .../kafka/server/GssapiAuthenticationTest.scala    |    3 -
 .../kafka/server/ScramServerStartupTest.scala      |    3 +-
 .../group/GroupMetadataManagerTest.scala           |    3 -
 .../integration/UncleanLeaderElectionTest.scala    |    9 +-
 .../server/AlterReplicaLogDirsRequestTest.scala    |    5 +-
 .../scala/unit/kafka/server/BaseRequestTest.scala  |    9 +-
 .../kafka/server/DescribeLogDirsRequestTest.scala  |    5 +-
 .../scala/unit/kafka/server/FetchRequestTest.scala |   14 +-
 .../unit/kafka/server/LogDirFailureTest.scala      |   24 +-
 .../unit/kafka/server/MetadataRequestTest.scala    |    7 +-
 ...chDrivenReplicationProtocolAcceptanceTest.scala |   12 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  144 +--
 38 files changed, 1081 insertions(+), 1169 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 9055e68..20cf038 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -724,10 +724,11 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
 
     client = AdminClient.create(createConfig)
 
-    val consumer = consumers.head
+    val consumer = createConsumer()
     subscribeAndWaitForAssignment(topic, consumer)
 
-    sendRecords(producers.head, 10, topicPartition)
+    val producer = createProducer()
+    sendRecords(producer, 10, topicPartition)
     consumer.seekToBeginning(Collections.singleton(topicPartition))
     assertEquals(0L, consumer.position(topicPartition))
 
@@ -752,9 +753,11 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
 
     client = AdminClient.create(createConfig)
 
-    subscribeAndWaitForAssignment(topic, consumers.head)
+    val consumer = createConsumer()
+    subscribeAndWaitForAssignment(topic, consumer)
 
-    sendRecords(producers.head, 10, topicPartition)
+    val producer = createProducer()
+    sendRecords(producer, 10, topicPartition)
     var result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(5L)).asJava)
     var lowWatermark: Option[Long] = Some(result.lowWatermarks.get(topicPartition).get.lowWatermark)
     assertEquals(Some(5), lowWatermark)
@@ -790,9 +793,12 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
 
     client = AdminClient.create(createConfig)
 
-    subscribeAndWaitForAssignment(topic, consumers.head)
+    val consumer = createConsumer()
+    subscribeAndWaitForAssignment(topic, consumer)
+
+    val producer = createProducer()
+    sendRecords(producer, 10, topicPartition)
 
-    sendRecords(producers.head, 10, topicPartition)
     val result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(3L)).asJava)
     val lowWatermark = result.lowWatermarks.get(topicPartition).get.lowWatermark
     assertEquals(3L, lowWatermark)
@@ -824,7 +830,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     killBroker(followerIndex)
 
     client = AdminClient.create(createConfig)
-    sendRecords(producers.head, 100, topicPartition)
+    val producer = createProducer()
+    sendRecords(producer, 100, topicPartition)
 
     val result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(3L)).asJava)
     result.all().get()
@@ -840,7 +847,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
 
     // kill the same follower again, produce more records, and delete records beyond follower's LOE
     killBroker(followerIndex)
-    sendRecords(producers.head, 100, topicPartition)
+    sendRecords(producer, 100, topicPartition)
     val result1 = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(117L)).asJava)
     result1.all().get()
     restartDeadBrokers()
@@ -852,7 +859,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     client = AdminClient.create(createConfig)
     createTopic(topic, numPartitions = 1, replicationFactor = serverCount)
     val expectedLEO = 100
-    sendRecords(producers.head, expectedLEO, topicPartition)
+    val producer = createProducer()
+    sendRecords(producer, expectedLEO, topicPartition)
 
     // delete records to move log start offset
     val result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(3L)).asJava)
@@ -884,10 +892,11 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
 
     client = AdminClient.create(createConfig)
 
-    val consumer = consumers.head
+    val consumer = createConsumer()
     subscribeAndWaitForAssignment(topic, consumer)
 
-    sendRecords(producers.head, 10, topicPartition)
+    val producer = createProducer()
+    sendRecords(producer, 10, topicPartition)
     assertEquals(0L, consumer.offsetsForTimes(Map(topicPartition -> JLong.valueOf(0L)).asJava).get(topicPartition).offset())
 
     var result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(5L)).asJava)
@@ -901,12 +910,13 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
 
   @Test
   def testConsumeAfterDeleteRecords(): Unit = {
-    val consumer = consumers.head
+    val consumer = createConsumer()
     subscribeAndWaitForAssignment(topic, consumer)
 
     client = AdminClient.create(createConfig)
 
-    sendRecords(producers.head, 10, topicPartition)
+    val producer = createProducer()
+    sendRecords(producer, 10, topicPartition)
     var messageCount = 0
     TestUtils.waitUntilTrue(() => {
       messageCount += consumer.poll(0).count
@@ -932,11 +942,13 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
 
   @Test
   def testDeleteRecordsWithException(): Unit = {
-    subscribeAndWaitForAssignment(topic, consumers.head)
+    val consumer = createConsumer()
+    subscribeAndWaitForAssignment(topic, consumer)
 
     client = AdminClient.create(createConfig)
 
-    sendRecords(producers.head, 10, topicPartition)
+    val producer = createProducer()
+    sendRecords(producer, 10, topicPartition)
 
     assertEquals(5L, client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(5L)).asJava)
       .lowWatermarks.get(topicPartition).get.lowWatermark)
@@ -1107,7 +1119,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
         new NewTopic(testTopicName, testNumPartitions, 1))).all().get()
       waitForTopics(client, List(testTopicName), List())
 
-      val producer = createProducer
+      val producer = createProducer()
       try {
         producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
       } finally {
@@ -1119,11 +1131,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
       val newConsumerConfig = new Properties(consumerConfig)
       newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId)
       newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId)
-      val consumer = TestUtils.createConsumer(brokerList,
-        securityProtocol = this.securityProtocol,
-        trustStoreFile = this.trustStoreFile,
-        saslProperties = this.clientSaslProperties,
-        props = Some(newConsumerConfig))
+      val consumer = createConsumer(configOverrides = newConsumerConfig)
       try {
         // Start a consumer in a thread that will subscribe to a new group.
         val consumerThread = new Thread {
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index faad071..7c341e6 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -95,17 +95,13 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   val transactionIdWriteAcl = Map(transactionalIdResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)))
   val transactionalIdDescribeAcl = Map(transactionalIdResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)))
 
-
-  val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
-  val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
-
-  val producerCount = 1
-  val consumerCount = 2
-  val producerConfig = new Properties
   val numRecords = 1
-
   val adminClients = Buffer[AdminClient]()
 
+  producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "1")
+  producerConfig.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "3000")
+  consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, group)
+
   override def propertyOverrides(properties: Properties): Unit = {
     properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName)
     properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
@@ -230,25 +226,16 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.ALTER_REPLICA_LOG_DIRS -> clusterAlterAcl,
     ApiKeys.DESCRIBE_LOG_DIRS -> clusterDescribeAcl,
     ApiKeys.CREATE_PARTITIONS -> topicAlterAcl
-
   )
 
   @Before
   override def setUp() {
-    super.setUp()
+    doSetup(createOffsetsTopic = false)
 
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, ClusterAction)), Resource.ClusterResource)
 
-    for (_ <- 0 until producerCount)
-      producers += TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
-        maxBlockMs = 3000,
-        acks = 1)
-
-    for (_ <- 0 until consumerCount)
-      consumers += TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT)
+    TestUtils.createOffsetsTopic(zkClient, servers)
 
-    // create the consumer offset topic
-    createTopic(GROUP_METADATA_TOPIC_NAME, topicConfig = servers.head.groupCoordinator.offsetsTopicConfigs)
     // create the test topic with all the brokers as replicas
     createTopic(topic)
     createTopic(deleteTopic)
@@ -256,9 +243,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @After
   override def tearDown() = {
-    producers.foreach(_.close())
-    consumers.foreach(_.wakeup())
-    consumers.foreach(_.close())
     adminClients.foreach(_.close())
     removeAllAcls()
     super.tearDown()
@@ -531,7 +515,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   @Test
   def testProduceWithNoTopicAccess() {
     try {
-      sendRecords(numRecords, tp)
+      val producer = createProducer()
+      sendRecords(producer, numRecords, tp)
       fail("should have thrown exception")
     } catch {
       case _: TopicAuthorizationException => //expected
@@ -542,7 +527,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   def testProduceWithTopicDescribe() {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
     try {
-      sendRecords(numRecords, tp)
+      val producer = createProducer()
+      sendRecords(producer, numRecords, tp)
       fail("should have thrown exception")
     } catch {
       case e: TopicAuthorizationException =>
@@ -554,7 +540,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   def testProduceWithTopicRead() {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
     try {
-      sendRecords(numRecords, tp)
+      val producer = createProducer()
+      sendRecords(producer, numRecords, tp)
       fail("should have thrown exception")
     } catch {
       case e: TopicAuthorizationException =>
@@ -565,7 +552,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   @Test
   def testProduceWithTopicWrite() {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
-    sendRecords(numRecords, tp)
+    val producer = createProducer()
+    sendRecords(producer, numRecords, tp)
   }
 
   @Test
@@ -582,8 +570,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     val topicPartition = new TopicPartition(createTopic, 0)
     val newTopicResource = Resource(Topic, createTopic, LITERAL)
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), newTopicResource)
+    val producer = createProducer()
     try {
-      sendRecords(numRecords, topicPartition)
+      sendRecords(producer, numRecords, topicPartition)
       Assert.fail("should have thrown exception")
     } catch {
       case e: TopicAuthorizationException =>
@@ -593,30 +582,35 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     val resource = if (resType == Topic) newTopicResource else Resource.ClusterResource
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), resource)
 
-    sendRecords(numRecords, topicPartition)
+    sendRecords(producer, numRecords, topicPartition)
   }
 
   @Test(expected = classOf[TopicAuthorizationException])
   def testConsumeUsingAssignWithNoAccess(): Unit = {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
-    sendRecords(1, tp)
+    val producer = createProducer()
+    sendRecords(producer, 1, tp)
     removeAllAcls()
-    this.consumers.head.assign(List(tp).asJava)
-    consumeRecords(this.consumers.head)
+    
+    val consumer = createConsumer()
+    consumer.assign(List(tp).asJava)
+    consumeRecords(consumer)
   }
 
   @Test
   def testSimpleConsumeWithOffsetLookupAndNoGroupAccess(): Unit = {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
-    sendRecords(1, tp)
+    val producer = createProducer()
+    sendRecords(producer, 1, tp)
     removeAllAcls()
 
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
     try {
       // note this still depends on group access because we haven't set offsets explicitly, which means
       // they will first be fetched from the consumer coordinator (which requires group access)
-      this.consumers.head.assign(List(tp).asJava)
-      consumeRecords(this.consumers.head)
+      val consumer = createConsumer()
+      consumer.assign(List(tp).asJava)
+      consumeRecords(consumer)
       Assert.fail("should have thrown exception")
     } catch {
       case e: GroupAuthorizationException => assertEquals(group, e.groupId())
@@ -626,42 +620,48 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   @Test
   def testSimpleConsumeWithExplicitSeekAndNoGroupAccess(): Unit = {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
-    sendRecords(1, tp)
+    val producer = createProducer()
+    sendRecords(producer, 1, tp)
     removeAllAcls()
 
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
 
     // in this case, we do an explicit seek, so there should be no need to query the coordinator at all
-    this.consumers.head.assign(List(tp).asJava)
-    this.consumers.head.seekToBeginning(List(tp).asJava)
-    consumeRecords(this.consumers.head)
+    val consumer = createConsumer()
+    consumer.assign(List(tp).asJava)
+    consumer.seekToBeginning(List(tp).asJava)
+    consumeRecords(consumer)
   }
 
   @Test(expected = classOf[KafkaException])
   def testConsumeWithoutTopicDescribeAccess() {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
-    sendRecords(1, tp)
+    val producer = createProducer()
+    sendRecords(producer, 1, tp)
     removeAllAcls()
 
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
-    this.consumers.head.assign(List(tp).asJava)
+    val consumer = createConsumer()
+    consumer.assign(List(tp).asJava)
 
     // the consumer should raise an exception if it receives UNKNOWN_TOPIC_OR_PARTITION
     // from the ListOffsets response when looking up the initial position.
-    consumeRecords(this.consumers.head)
+    consumeRecords(consumer)
   }
 
   @Test
   def testConsumeWithTopicDescribe() {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
-    sendRecords(1, tp)
+    val producer = createProducer()
+    sendRecords(producer, 1, tp)
     removeAllAcls()
 
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
     try {
-      this.consumers.head.assign(List(tp).asJava)
-      consumeRecords(this.consumers.head)
+      val consumer = createConsumer()
+      consumer.assign(List(tp).asJava)
+      consumeRecords(consumer)
       Assert.fail("should have thrown exception")
     } catch {
       case e: TopicAuthorizationException => assertEquals(Collections.singleton(topic), e.unauthorizedTopics())
@@ -671,14 +671,16 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   @Test
   def testConsumeWithTopicWrite() {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
-    sendRecords(1, tp)
+    val producer = createProducer()
+    sendRecords(producer, 1, tp)
     removeAllAcls()
 
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
     try {
-      this.consumers.head.assign(List(tp).asJava)
-      consumeRecords(this.consumers.head)
+      val consumer = createConsumer()
+      consumer.assign(List(tp).asJava)
+      consumeRecords(consumer)
       Assert.fail("should have thrown exception")
     } catch {
       case e: TopicAuthorizationException =>
@@ -689,36 +691,43 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   @Test
   def testConsumeWithTopicAndGroupRead() {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
-    sendRecords(1, tp)
+    val producer = createProducer()
+    sendRecords(producer, 1, tp)
     removeAllAcls()
 
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
-    this.consumers.head.assign(List(tp).asJava)
-    consumeRecords(this.consumers.head)
+
+    val consumer = createConsumer()
+    consumer.assign(List(tp).asJava)
+    consumeRecords(consumer)
   }
 
   @Test
   def testPatternSubscriptionWithNoTopicAccess() {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
-    sendRecords(1, tp)
+    val producer = createProducer()
+    sendRecords(producer, 1, tp)
     removeAllAcls()
 
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
-    this.consumers.head.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener)
-    this.consumers.head.poll(50)
-    assertTrue(this.consumers.head.subscription.isEmpty)
+
+    val consumer = createConsumer()
+    consumer.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener)
+    consumer.poll(50)
+    assertTrue(consumer.subscription.isEmpty)
   }
 
   @Test
   def testPatternSubscriptionWithTopicDescribeOnlyAndGroupRead() {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
-    sendRecords(1, tp)
+    val producer = createProducer()
+    sendRecords(producer, 1, tp)
     removeAllAcls()
 
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
-    val consumer = consumers.head
+    val consumer = createConsumer()
     consumer.subscribe(Pattern.compile(topicPattern))
     try {
       consumeRecords(consumer)
@@ -731,18 +740,19 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   @Test
   def testPatternSubscriptionWithTopicAndGroupRead() {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
-    sendRecords(1, tp)
+    val producer = createProducer()
+    sendRecords(producer, 1, tp)
 
     // create an unmatched topic
     val unmatchedTopic = "unmatched"
     createTopic(unmatchedTopic)
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)),  Resource(Topic, unmatchedTopic, LITERAL))
-    sendRecords(1, new TopicPartition(unmatchedTopic, part))
+    sendRecords(producer, 1, new TopicPartition(unmatchedTopic, part))
     removeAllAcls()
 
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
-    val consumer = consumers.head
+    val consumer = createConsumer()
     consumer.subscribe(Pattern.compile(topicPattern))
     consumeRecords(consumer)
 
@@ -759,35 +769,33 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   @Test
   def testPatternSubscriptionMatchingInternalTopic() {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
-    sendRecords(1, tp)
+    val producer = createProducer()
+    sendRecords(producer, 1, tp)
     removeAllAcls()
 
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
 
-    val consumerConfig = new Properties
     consumerConfig.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false")
-    val consumer = TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group,
-      securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(consumerConfig))
-    try {
-      // ensure that internal topics are not included if no permission
-      consumer.subscribe(Pattern.compile(".*"))
-      consumeRecords(consumer)
-      assertEquals(Set(topic).asJava, consumer.subscription)
-
-      // now authorize the user for the internal topic and verify that we can subscribe
-      addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), Resource(Topic,
-        GROUP_METADATA_TOPIC_NAME, LITERAL))
-      consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME))
-      consumer.poll(0)
-      assertEquals(Set(GROUP_METADATA_TOPIC_NAME), consumer.subscription.asScala)
-    } finally consumer.close()
+    val consumer = createConsumer()
+    // ensure that internal topics are not included if no permission
+    consumer.subscribe(Pattern.compile(".*"))
+    consumeRecords(consumer)
+    assertEquals(Set(topic).asJava, consumer.subscription)
+
+    // now authorize the user for the internal topic and verify that we can subscribe
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), Resource(Topic,
+      GROUP_METADATA_TOPIC_NAME, LITERAL))
+    consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME))
+    consumer.poll(0)
+    assertEquals(Set(GROUP_METADATA_TOPIC_NAME), consumer.subscription.asScala)
   }
 
   @Test
   def testPatternSubscriptionMatchingInternalTopicWithDescribeOnlyPermission() {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
-    sendRecords(1, tp)
+    val producer = createProducer()
+    sendRecords(producer, 1, tp)
     removeAllAcls()
 
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
@@ -795,10 +803,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     val internalTopicResource = Resource(Topic, GROUP_METADATA_TOPIC_NAME, LITERAL)
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), internalTopicResource)
 
-    val consumerConfig = new Properties
     consumerConfig.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false")
-    val consumer = TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group,
-      securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(consumerConfig))
+    val consumer = createConsumer()
     try {
       consumer.subscribe(Pattern.compile(".*"))
       // It is possible that the first call returns records of "topic" and the second call throws TopicAuthorizationException
@@ -807,22 +813,21 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       Assert.fail("Expected TopicAuthorizationException")
     } catch {
       case _: TopicAuthorizationException => //expected
-    } finally consumer.close()
+    }
   }
 
   @Test
   def testPatternSubscriptionNotMatchingInternalTopic() {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
-    sendRecords(1, tp)
+    val producer = createProducer()
+    sendRecords(producer, 1, tp)
     removeAllAcls()
 
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
 
-    val consumerConfig = new Properties
     consumerConfig.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false")
-    val consumer = TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group,
-      securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(consumerConfig))
+    val consumer = createConsumer()
     try {
       consumer.subscribe(Pattern.compile(topicPattern))
       consumeRecords(consumer)
@@ -848,9 +853,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     val newTopicResource = Resource(Topic, newTopic, LITERAL)
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), newTopicResource)
     addAndVerifyAcls(groupReadAcl(groupResource), groupResource)
-    this.consumers.head.assign(List(topicPartition).asJava)
+    val consumer = createConsumer()
+    consumer.assign(List(topicPartition).asJava)
     val unauthorizedTopics = intercept[TopicAuthorizationException] {
-      (0 until 10).foreach(_ => consumers.head.poll(Duration.ofMillis(50L)))
+      (0 until 10).foreach(_ => consumer.poll(Duration.ofMillis(50L)))
     }.unauthorizedTopics
     assertEquals(Collections.singleton(newTopic), unauthorizedTopics)
 
@@ -858,7 +864,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     addAndVerifyAcls(acls, resource)
 
     TestUtils.waitUntilTrue(() => {
-      this.consumers.head.poll(Duration.ofMillis(50L))
+      consumer.poll(Duration.ofMillis(50L))
       this.zkClient.topicExists(newTopic)
     }, "Expected topic was not created")
   }
@@ -875,7 +881,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     val metadataRequest = new MetadataRequest.Builder(List(topic, createTopic).asJava, true).build()
     val metadataResponse = MetadataResponse.parse(connectAndSend(metadataRequest, ApiKeys.METADATA), ApiKeys.METADATA.latestVersion)
 
-    assertEquals(Set(topic).asJava, metadataResponse.topicsByError(Errors.NONE));
+    assertEquals(Set(topic).asJava, metadataResponse.topicsByError(Errors.NONE))
     assertEquals(Set(createTopic).asJava, metadataResponse.topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED))
 
     val createAcls = topicCreateAcl.get(createTopicResource).get
@@ -890,60 +896,69 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test(expected = classOf[AuthorizationException])
   def testCommitWithNoAccess() {
-    this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
+    val consumer = createConsumer()
+    consumer.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
   }
 
   @Test(expected = classOf[KafkaException])
   def testCommitWithNoTopicAccess() {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
-    this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
+    val consumer = createConsumer()
+    consumer.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
   }
 
   @Test(expected = classOf[TopicAuthorizationException])
   def testCommitWithTopicWrite() {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
-    this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
+    val consumer = createConsumer()
+    consumer.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
   }
 
   @Test(expected = classOf[TopicAuthorizationException])
   def testCommitWithTopicDescribe() {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
-    this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
+    val consumer = createConsumer()
+    consumer.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
   }
 
   @Test(expected = classOf[GroupAuthorizationException])
   def testCommitWithNoGroupAccess() {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
-    this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
+    val consumer = createConsumer()
+    consumer.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
   }
 
   @Test
   def testCommitWithTopicAndGroupRead() {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
-    this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
+    val consumer = createConsumer()
+    consumer.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
   }
 
   @Test(expected = classOf[AuthorizationException])
   def testOffsetFetchWithNoAccess() {
-    this.consumers.head.assign(List(tp).asJava)
-    this.consumers.head.position(tp)
+    val consumer = createConsumer()
+    consumer.assign(List(tp).asJava)
+    consumer.position(tp)
   }
 
   @Test(expected = classOf[GroupAuthorizationException])
   def testOffsetFetchWithNoGroupAccess() {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
-    this.consumers.head.assign(List(tp).asJava)
-    this.consumers.head.position(tp)
+    val consumer = createConsumer()
+    consumer.assign(List(tp).asJava)
+    consumer.position(tp)
   }
 
   @Test(expected = classOf[KafkaException])
   def testOffsetFetchWithNoTopicAccess() {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
-    this.consumers.head.assign(List(tp).asJava)
-    this.consumers.head.position(tp)
+    val consumer = createConsumer()
+    consumer.assign(List(tp).asJava)
+    consumer.position(tp)
   }
 
   @Test
@@ -951,8 +966,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     val offset = 15L
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
-    this.consumers.head.assign(List(tp).asJava)
-    this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(offset)).asJava)
+    val consumer = createConsumer()
+    consumer.assign(List(tp).asJava)
+    consumer.commitSync(Map(tp -> new OffsetAndMetadata(offset)).asJava)
 
     removeAllAcls()
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
@@ -978,27 +994,31 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   def testOffsetFetchTopicDescribe() {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), groupResource)
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
-    this.consumers.head.assign(List(tp).asJava)
-    this.consumers.head.position(tp)
+    val consumer = createConsumer()
+    consumer.assign(List(tp).asJava)
+    consumer.position(tp)
   }
 
   @Test
   def testOffsetFetchWithTopicAndGroupRead() {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
-    this.consumers.head.assign(List(tp).asJava)
-    this.consumers.head.position(tp)
+    val consumer = createConsumer()
+    consumer.assign(List(tp).asJava)
+    consumer.position(tp)
   }
 
   @Test(expected = classOf[TopicAuthorizationException])
   def testListOffsetsWithNoTopicAccess() {
-    this.consumers.head.partitionsFor(topic)
+    val consumer = createConsumer()
+    consumer.partitionsFor(topic)
   }
 
   @Test
   def testListOffsetsWithTopicDescribe() {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
-    this.consumers.head.partitionsFor(topic)
+    val consumer = createConsumer()
+    consumer.partitionsFor(topic)
   }
 
   @Test
@@ -1031,15 +1051,17 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   def testListGroupApiWithAndWithoutListGroupAcls() {
     // write some record to the topic
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
-    sendRecords(1, tp)
+    val producer = createProducer()
+    sendRecords(producer, numRecords = 1, tp)
 
     // use two consumers to write to two different groups
     val group2 = "other group"
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), Resource(Group, group2, LITERAL))
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
-    this.consumers.head.subscribe(Collections.singleton(topic))
-    consumeRecords(this.consumers.head)
+    val consumer = createConsumer()
+    consumer.subscribe(Collections.singleton(topic))
+    consumeRecords(consumer)
 
     val otherConsumer = TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group2, securityProtocol = SecurityProtocol.PLAINTEXT)
     otherConsumer.subscribe(Collections.singleton(topic))
@@ -1074,8 +1096,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)), groupResource)
-    this.consumers.head.assign(List(tp).asJava)
-    this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava)
+    val consumer = createConsumer()
+    consumer.assign(List(tp).asJava)
+    consumer.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava)
     createAdminClient().deleteConsumerGroups(Seq(group).asJava).deletedGroups().get(group).get()
   }
 
@@ -1083,8 +1106,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   def testDeleteGroupApiWithNoDeleteGroupAcl() {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
-    this.consumers.head.assign(List(tp).asJava)
-    this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava)
+    val consumer = createConsumer()
+    consumer.assign(List(tp).asJava)
+    consumer.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava)
     val result = createAdminClient().deleteConsumerGroups(Seq(group).asJava)
     TestUtils.assertFutureExceptionTypeEquals(result.deletedGroups().get(group), classOf[GroupAuthorizationException])
   }
@@ -1436,9 +1460,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     response
   }
 
-  private def sendRecords(numRecords: Int, tp: TopicPartition) {
+  private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]],
+                          numRecords: Int,
+                          tp: TopicPartition) {
     val futures = (0 until numRecords).map { i =>
-      this.producers.head.send(new ProducerRecord(tp.topic(), tp.partition(), i.toString.getBytes, i.toString.getBytes))
+      producer.send(new ProducerRecord(tp.topic(), tp.partition(), i.toString.getBytes, i.toString.getBytes))
     }
     try {
       futures.foreach(_.get)
@@ -1481,21 +1507,15 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   private def buildTransactionalProducer(): KafkaProducer[Array[Byte], Array[Byte]] = {
-    val transactionalProperties = new Properties()
-    transactionalProperties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId)
-    val producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
-      props = Some(transactionalProperties))
-    producers += producer
-    producer
+    producerConfig.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId)
+    producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
+    createProducer()
   }
 
   private def buildIdempotentProducer(): KafkaProducer[Array[Byte], Array[Byte]] = {
-    val idempotentProperties = new Properties()
-    idempotentProperties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
-    val producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
-      props = Some(idempotentProperties))
-    producers += producer
-    producer
+    producerConfig.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
+    producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
+    createProducer()
   }
 
   private def createAdminClient(): AdminClient = {
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 59683b8..488874d 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -17,7 +17,6 @@ import java.util
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.record.TimestampType
-import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.{PartitionInfo, TopicPartition}
 import kafka.utils.ShutdownableThread
 import kafka.server.KafkaConfig
@@ -36,8 +35,6 @@ import org.apache.kafka.common.internals.Topic
 abstract class BaseConsumerTest extends IntegrationTestHarness {
 
   val epsilon = 0.1
-  val producerCount = 1
-  val consumerCount = 2
   val serverCount = 3
 
   val topic = "topic"
@@ -74,19 +71,21 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
   @Test
   def testSimpleConsumption() {
     val numRecords = 10000
-    sendRecords(numRecords)
+    val producer = createProducer()
+    sendRecords(producer, numRecords, tp)
 
-    assertEquals(0, this.consumers.head.assignment.size)
-    this.consumers.head.assign(List(tp).asJava)
-    assertEquals(1, this.consumers.head.assignment.size)
+    val consumer = createConsumer()
+    assertEquals(0, consumer.assignment.size)
+    consumer.assign(List(tp).asJava)
+    assertEquals(1, consumer.assignment.size)
 
-    this.consumers.head.seek(tp, 0)
-    consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = numRecords, startingOffset = 0)
+    consumer.seek(tp, 0)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, startingOffset = 0)
 
     // check async commit callbacks
     val commitCallback = new CountConsumerCommitCallback()
-    this.consumers.head.commitAsync(commitCallback)
-    awaitCommitCallback(this.consumers.head, commitCallback)
+    consumer.commitAsync(commitCallback)
+    awaitCommitCallback(consumer, commitCallback)
   }
 
   @Test
@@ -94,20 +93,19 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
     val listener = new TestConsumerReassignmentListener()
     this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5000")
     this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "2000")
-    val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
-    consumers += consumer0
+    val consumer = createConsumer()
 
-    consumer0.subscribe(List(topic).asJava, listener)
+    consumer.subscribe(List(topic).asJava, listener)
 
     // the initial subscription should cause a callback execution
-    consumer0.poll(2000)
+    consumer.poll(2000)
 
     assertEquals(1, listener.callsToAssigned)
 
     // get metadata for the topic
     var parts: Seq[PartitionInfo] = null
     while (parts == null)
-      parts = consumer0.partitionsFor(Topic.GROUP_METADATA_TOPIC_NAME).asScala
+      parts = consumer.partitionsFor(Topic.GROUP_METADATA_TOPIC_NAME).asScala
     assertEquals(1, parts.size)
     assertNotNull(parts.head.leader())
 
@@ -115,7 +113,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
     val coordinator = parts.head.leader().id()
     this.servers(coordinator).shutdown()
 
-    consumer0.poll(5000)
+    consumer.poll(5000)
 
     // the failover should not cause a rebalance
     assertEquals(1, listener.callsToAssigned)
@@ -137,12 +135,6 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
     }
   }
 
-  protected def sendRecords(numRecords: Int): Seq[ProducerRecord[Array[Byte], Array[Byte]]] =
-    sendRecords(numRecords, tp)
-
-  protected def sendRecords(numRecords: Int, tp: TopicPartition): Seq[ProducerRecord[Array[Byte], Array[Byte]]] =
-    sendRecords(this.producers.head, numRecords, tp)
-
   protected def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]], numRecords: Int,
                             tp: TopicPartition): Seq[ProducerRecord[Array[Byte], Array[Byte]]] = {
     val records = (0 until numRecords).map { i =>
@@ -302,9 +294,10 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
    */
   def isPartitionAssignmentValid(assignments: Buffer[Set[TopicPartition]],
                                  partitions: Set[TopicPartition]): Boolean = {
-    val allNonEmptyAssignments = assignments forall (assignment => assignment.nonEmpty)
+    val allNonEmptyAssignments = assignments.forall(assignment => assignment.nonEmpty)
     if (!allNonEmptyAssignments) {
       // at least one consumer got empty assignment
+      val uniqueAssignedPartitions = (Set[TopicPartition]() /: assignments) (_ ++ _)
       return false
     }
 
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index ad44425..7a2394e 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -70,9 +70,14 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
 
   protected def createProducer(brokerList: String,
                                lingerMs: Int = 0,
-                               props: Option[Properties] = None): KafkaProducer[Array[Byte],Array[Byte]] = {
-    val producer = TestUtils.createProducer(brokerList, securityProtocol = securityProtocol, trustStoreFile = trustStoreFile,
-      saslProperties = clientSaslProperties, lingerMs = lingerMs, props = props)
+                               batchSize: Int = 16384,
+                               compressionType: String = "none"): KafkaProducer[Array[Byte],Array[Byte]] = {
+    val producer = TestUtils.createProducer(brokerList,
+      compressionType = compressionType,
+      securityProtocol = securityProtocol,
+      trustStoreFile = trustStoreFile,
+      saslProperties = clientSaslProperties,
+      lingerMs = lingerMs)
     registerProducer(producer)
   }
 
@@ -170,9 +175,9 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
 
   @Test
   def testSendCompressedMessageWithCreateTime() {
-    val producerProps = new Properties()
-    producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip")
-    val producer = createProducer(brokerList = brokerList, lingerMs = Int.MaxValue, props = Some(producerProps))
+    val producer = createProducer(brokerList = brokerList,
+      compressionType = "gzip",
+      lingerMs = Int.MaxValue)
     sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME)
   }
 
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index b265182..7ac3c66 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -33,8 +33,6 @@ import scala.collection.JavaConverters._
 abstract class BaseQuotaTest extends IntegrationTestHarness {
 
   override val serverCount = 2
-  val producerCount = 1
-  val consumerCount = 1
 
   protected def producerClientId = "QuotasTestProducer-1"
   protected def consumerClientId = "QuotasTestConsumer-1"
@@ -46,7 +44,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
   this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100")
   this.serverConfig.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, "30000")
   this.serverConfig.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
-  this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "0")
+  this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "-1")
   this.producerConfig.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "300000")
   this.producerConfig.setProperty(ProducerConfig.CLIENT_ID_CONFIG, producerClientId)
   this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "QuotasTest")
@@ -79,7 +77,6 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
 
   @Test
   def testThrottledProducerConsumer() {
-
     val numRecords = 1000
     val produced = quotaTestClients.produceUntilThrottled(numRecords)
     quotaTestClients.verifyProduceThrottle(expectThrottle = true)
@@ -128,18 +125,17 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
 
     // Since producer may have been throttled after producing a couple of records,
     // consume from beginning till throttled
-    consumers.head.seekToBeginning(Collections.singleton(new TopicPartition(topic1, 0)))
+    quotaTestClients.consumer.seekToBeginning(Collections.singleton(new TopicPartition(topic1, 0)))
     quotaTestClients.consumeUntilThrottled(numRecords + produced)
     quotaTestClients.verifyConsumeThrottle(expectThrottle = true)
   }
 
   @Test
   def testThrottledRequest() {
-
     quotaTestClients.overrideQuotas(Long.MaxValue, Long.MaxValue, 0.1)
     quotaTestClients.waitForQuotaUpdate(Long.MaxValue, Long.MaxValue, 0.1)
 
-    val consumer = consumers.head
+    val consumer = quotaTestClients.consumer
     consumer.subscribe(Collections.singleton(topic1))
     val endTimeMs = System.currentTimeMillis + 10000
     var throttled = false
@@ -167,10 +163,10 @@ abstract class QuotaTestClients(topic: String,
                                 leaderNode: KafkaServer,
                                 producerClientId: String,
                                 consumerClientId: String,
-                                producer: KafkaProducer[Array[Byte], Array[Byte]],
-                                consumer: KafkaConsumer[Array[Byte], Array[Byte]]) {
+                                val producer: KafkaProducer[Array[Byte], Array[Byte]],
+                                val consumer: KafkaConsumer[Array[Byte], Array[Byte]]) {
 
-  def userPrincipal : KafkaPrincipal
+  def userPrincipal: KafkaPrincipal
   def overrideQuotas(producerQuota: Long, consumerQuota: Long, requestQuota: Double)
   def removeQuotaOverrides()
 
@@ -230,9 +226,9 @@ abstract class QuotaTestClients(topic: String,
   def verifyThrottleTimeMetric(quotaType: QuotaType, clientId: String, expectThrottle: Boolean): Unit = {
     val throttleMetricValue = metricValue(throttleMetric(quotaType, clientId))
     if (expectThrottle) {
-      assertTrue("Should have been throttled", throttleMetricValue > 0)
+      assertTrue(s"Client with id=$clientId should have been throttled", throttleMetricValue > 0)
     } else {
-      assertEquals("Should not have been throttled", 0.0, throttleMetricValue, 0.0)
+      assertEquals(s"Client with id=$clientId should not have been throttled", 0.0, throttleMetricValue, 0.0)
     }
   }
 
diff --git a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
index b084b3c..ef8fb41 100644
--- a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
@@ -34,7 +34,10 @@ class ClientIdQuotaTest extends BaseQuotaTest {
   }
 
   override def createQuotaTestClients(topic: String, leaderNode: KafkaServer): QuotaTestClients = {
-    new QuotaTestClients(topic, leaderNode, producerClientId, consumerClientId, producers.head, consumers.head) {
+    val producer = createProducer()
+    val consumer = createConsumer()
+
+    new QuotaTestClients(topic, leaderNode, producerClientId, consumerClientId, producer, consumer) {
       override def userPrincipal: KafkaPrincipal = KafkaPrincipal.ANONYMOUS
       override def quotaMetricTags(clientId: String): Map[String, String] = {
         Map("user" -> "", "client-id" -> clientId)
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 07cbf0c..dabe5a4 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -19,31 +19,19 @@ import java.util.{Collection, Collections, Properties}
 import kafka.server.{BaseRequestTest, KafkaConfig}
 import kafka.utils.{CoreUtils, Logging, ShutdownableThread, TestUtils}
 import org.apache.kafka.clients.consumer._
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.requests.{FindCoordinatorRequest, FindCoordinatorResponse}
-import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.junit.Assert._
 import org.junit.{After, Before, Ignore, Test}
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.Buffer
-
 
 /**
  * Integration tests for the consumer that cover basic usage as well as server failures
  */
 class ConsumerBounceTest extends BaseRequestTest with Logging {
-
-  override def numBrokers: Int = 3
-
-  val producerCount = 1
-  val consumerCount = 2
-
-  val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
-  val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
-
   val topic = "topic"
   val part = 0
   val tp = new TopicPartition(topic, part)
@@ -52,16 +40,7 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
   val gracefulCloseTimeMs = 1000
   val executor = Executors.newScheduledThreadPool(2)
 
-  val producerConfig = new Properties
-  val consumerConfig = new Properties
-  this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
-  this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
-  this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
-  this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000")
-  this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000")
-  this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
-
-  def serverConfig(): Properties = {
+  override def generateConfigs = {
     val properties = new Properties
     properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset
     properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
@@ -69,48 +48,25 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
     properties.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
     properties.put(KafkaConfig.UncleanLeaderElectionEnableProp, "true")
     properties.put(KafkaConfig.AutoCreateTopicsEnableProp, "false")
-    properties
-  }
 
-  override def generateConfigs = {
     FixedPortTestUtils.createBrokerConfigs(numBrokers, zkConnect, enableControlledShutdown = false)
-      .map(KafkaConfig.fromProps(_, serverConfig))
+      .map(KafkaConfig.fromProps(_, properties))
   }
 
   @Before
   override def setUp() {
     super.setUp()
 
-    for (_ <- 0 until producerCount)
-      producers += createProducer
-
-    for (_ <- 0 until consumerCount)
-      consumers += createConsumer
-
     // create the test topic with all the brokers as replicas
     createTopic(topic, 1, numBrokers)
   }
 
-  def createProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
-    TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
-        securityProtocol = SecurityProtocol.PLAINTEXT,
-        props = Some(producerConfig))
-  }
-
-  def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = {
-    TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers),
-        securityProtocol = SecurityProtocol.PLAINTEXT,
-        props = Some(consumerConfig))
-  }
-
   @After
   override def tearDown() {
     try {
       executor.shutdownNow()
       // Wait for any active tasks to terminate to ensure consumer is not closed while being used from another thread
       assertTrue("Executor did not terminate", executor.awaitTermination(5000, TimeUnit.MILLISECONDS))
-      producers.foreach(_.close())
-      consumers.foreach(_.close())
     } finally {
       super.tearDown()
     }
@@ -126,11 +82,11 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
    */
   def consumeWithBrokerFailures(numIters: Int) {
     val numRecords = 1000
-    sendRecords(numRecords)
-    this.producers.foreach(_.close)
+    val producer = createProducer()
+    sendRecords(producer, numRecords)
 
     var consumed = 0L
-    val consumer = this.consumers.head
+    val consumer = createConsumer()
 
     consumer.subscribe(Collections.singletonList(topic))
 
@@ -164,10 +120,10 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
 
   def seekAndCommitWithBrokerFailures(numIters: Int) {
     val numRecords = 1000
-    sendRecords(numRecords)
-    this.producers.foreach(_.close)
+    val producer = createProducer()
+    sendRecords(producer, numRecords)
 
-    val consumer = this.consumers.head
+    val consumer = createConsumer()
     consumer.assign(Collections.singletonList(tp))
     consumer.seek(tp, 0)
 
@@ -203,19 +159,21 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
     val numRecords = 1000
     val newtopic = "newtopic"
 
-    val consumer = this.consumers.head
+    val consumer = createConsumer()
     consumer.subscribe(Collections.singleton(newtopic))
     executor.schedule(new Runnable {
         def run() = createTopic(newtopic, numPartitions = numBrokers, replicationFactor = numBrokers)
       }, 2, TimeUnit.SECONDS)
     consumer.poll(0)
 
+    val producer = createProducer()
+
     def sendRecords(numRecords: Int, topic: String) {
       var remainingRecords = numRecords
       val endTimeMs = System.currentTimeMillis + 20000
       while (remainingRecords > 0 && System.currentTimeMillis < endTimeMs) {
         val futures = (0 until remainingRecords).map { i =>
-          this.producers.head.send(new ProducerRecord(topic, part, i.toString.getBytes, i.toString.getBytes))
+          producer.send(new ProducerRecord(topic, part, i.toString.getBytes, i.toString.getBytes))
         }
         futures.map { future =>
           try {
@@ -243,11 +201,11 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
     future.get
   }
 
-
   @Test
   def testClose() {
     val numRecords = 10
-    sendRecords(numRecords)
+    val producer = createProducer()
+    sendRecords(producer, numRecords)
 
     checkCloseGoodPath(numRecords, "group1")
     checkCloseWithCoordinatorFailure(numRecords, "group2", "group3")
@@ -296,7 +254,6 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
     response.node().id()
   }
 
-
   /**
    * Consumer is closed while all brokers are unavailable. Cannot rebalance or commit offsets since
    * there is no coordinator, but close should timeout and return. If close is invoked with a very
@@ -357,7 +314,7 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
     }
 
     def createConsumerToRebalance(): Future[Any] = {
-      val consumer = createConsumer(groupId)
+      val consumer = createConsumerWithGroupId(groupId)
       val rebalanceSemaphore = new Semaphore(0)
       val future = subscribeAndPoll(consumer, Some(rebalanceSemaphore))
       // Wait for consumer to poll and trigger rebalance
@@ -367,9 +324,9 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
       future
     }
 
-    val consumer1 = createConsumer(groupId)
+    val consumer1 = createConsumerWithGroupId(groupId)
     waitForRebalance(2000, subscribeAndPoll(consumer1))
-    val consumer2 = createConsumer(groupId)
+    val consumer2 = createConsumerWithGroupId(groupId)
     waitForRebalance(2000, subscribeAndPoll(consumer2), consumer1)
     val rebalanceFuture = createConsumerToRebalance()
 
@@ -392,15 +349,13 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
     closeFuture2.get(2000, TimeUnit.MILLISECONDS)
   }
 
-  private def createConsumer(groupId: String) : KafkaConsumer[Array[Byte], Array[Byte]] = {
-    this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
-    val consumer = createConsumer
-    consumers += consumer
-    consumer
+  private def createConsumerWithGroupId(groupId: String): KafkaConsumer[Array[Byte], Array[Byte]] = {
+    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
+    createConsumer()
   }
 
-  private def createConsumerAndReceive(groupId: String, manualAssign: Boolean, numRecords: Int) : KafkaConsumer[Array[Byte], Array[Byte]] = {
-    val consumer = createConsumer(groupId)
+  private def createConsumerAndReceive(groupId: String, manualAssign: Boolean, numRecords: Int): KafkaConsumer[Array[Byte], Array[Byte]] = {
+    val consumer = createConsumerWithGroupId(groupId)
     if (manualAssign)
       consumer.assign(Collections.singleton(tp))
     else
@@ -439,7 +394,7 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
     // Check that close was graceful with offsets committed and leave group sent.
     // New instance of consumer should be assigned partitions immediately and should see committed offsets.
     val assignSemaphore = new Semaphore(0)
-    val consumer = createConsumer(groupId)
+    val consumer = createConsumerWithGroupId(groupId)
     consumer.subscribe(Collections.singletonList(topic),  new ConsumerRebalanceListener {
       def onPartitionsAssigned(partitions: Collection[TopicPartition]) {
         assignSemaphore.release()
@@ -447,7 +402,7 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
       def onPartitionsRevoked(partitions: Collection[TopicPartition]) {
       }})
     consumer.poll(3000)
-    assertTrue("Assigment did not complete on time", assignSemaphore.tryAcquire(1, TimeUnit.SECONDS))
+    assertTrue("Assignment did not complete on time", assignSemaphore.tryAcquire(1, TimeUnit.SECONDS))
     if (committedRecords > 0)
       assertEquals(committedRecords, consumer.committed(tp).offset)
     consumer.close()
@@ -470,12 +425,13 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
     }
   }
 
-  private def sendRecords(numRecords: Int, topic: String = this.topic) {
+  private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]],
+                          numRecords: Int,
+                          topic: String = this.topic) {
     val futures = (0 until numRecords).map { i =>
-      this.producers.head.send(new ProducerRecord(topic, part, i.toString.getBytes, i.toString.getBytes))
+      producer.send(new ProducerRecord(topic, part, i.toString.getBytes, i.toString.getBytes))
     }
     futures.map(_.get)
   }
 
-
 }
diff --git a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
index eb8f11d..fed5380 100644
--- a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
+++ b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
@@ -16,7 +16,7 @@ package kafka.api
 
 import java.io.File
 import java.{lang, util}
-import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 import java.util.{Collections, Properties}
 
@@ -48,8 +48,6 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
   override protected def interBrokerListenerName: ListenerName = new ListenerName("BROKER")
 
   override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
-  override val consumerCount: Int = 0
-  override val producerCount: Int = 0
   override val serverCount: Int = 2
 
   private val kafkaServerSaslMechanisms = Seq("SCRAM-SHA-256")
@@ -77,18 +75,11 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
 
     producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG,
       ScramLoginModule(JaasTestUtils.KafkaScramAdmin, JaasTestUtils.KafkaScramAdminPassword).toString)
-    producerWithoutQuota = createProducer
-    producers += producerWithoutQuota
+    producerWithoutQuota = createProducer()
   }
 
   @After
   override def tearDown(): Unit = {
-    // Close producers and consumers without waiting for requests to complete
-    // to avoid waiting for throttled responses
-    producers.foreach(_.close(0, TimeUnit.MILLISECONDS))
-    producers.clear()
-    consumers.foreach(_.close(0, TimeUnit.MILLISECONDS))
-    consumers.clear()
     adminClients.foreach(_.close())
     super.tearDown()
   }
@@ -211,7 +202,6 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
   }
 
   private def addUser(user: String, leader: Int): GroupedUser = {
-
     val password = s"$user:secret"
     createScramCredentials(zkConnect, user, password)
     servers.foreach { server =>
@@ -222,26 +212,25 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
     val userGroup = group(user)
     val topic = s"${userGroup}_topic"
     val producerClientId = s"$user:producer-client-id"
-    val consumerClientId = s"$user:producer-client-id"
+    val consumerClientId = s"$user:consumer-client-id"
 
     producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, producerClientId)
     producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, ScramLoginModule(user, password).toString)
-    val producer = createProducer
-    producers += producer
+    val producer = createProducer()
 
     consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientId)
+    consumerConfig.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
     consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, s"$user-group")
     consumerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, ScramLoginModule(user, password).toString)
-    val consumer = createConsumer
-    consumers += consumer
+    val consumer = createConsumer()
 
     GroupedUser(user, userGroup, topic, servers(leader), producerClientId, consumerClientId, producer, consumer)
   }
 
   case class GroupedUser(user: String, userGroup: String, topic: String, leaderNode: KafkaServer,
                          producerClientId: String, consumerClientId: String,
-                         producer: KafkaProducer[Array[Byte], Array[Byte]],
-                         consumer: KafkaConsumer[Array[Byte], Array[Byte]]) extends
+                         override val producer: KafkaProducer[Array[Byte], Array[Byte]],
+                         override val consumer: KafkaConsumer[Array[Byte], Array[Byte]]) extends
     QuotaTestClients(topic, leaderNode, producerClientId, consumerClientId, producer, consumer) {
 
     override def userPrincipal: KafkaPrincipal = GroupedUserPrincipal(user, userGroup)
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 7ea761f..d53a94d 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -55,8 +55,6 @@ import scala.collection.JavaConverters._
   * would end up with ZooKeeperTestHarness twice.
   */
 abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
-  override val producerCount = 1
-  override val consumerCount = 2
   override val serverCount = 3
 
   override def configureSecurityBeforeServersStart() {
@@ -187,21 +185,11 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     createTopic(topic, 1, 3)
   }
 
-  override def createProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
-    TestUtils.createProducer(brokerList,
-      maxBlockMs = 3000L,
-      securityProtocol = this.securityProtocol,
-      trustStoreFile = this.trustStoreFile,
-      saslProperties = this.clientSaslProperties,
-      props = Some(producerConfig))
-  }
-
   /**
     * Closes MiniKDC last when tearing down.
     */
   @After
   override def tearDown() {
-    consumers.foreach(_.wakeup())
     super.tearDown()
     closeSasl()
   }
@@ -212,31 +200,37 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
   @Test
   def testProduceConsumeViaAssign(): Unit = {
     setAclsAndProduce(tp)
-    consumers.head.assign(List(tp).asJava)
-    consumeRecords(this.consumers.head, numRecords)
+    val consumer = createConsumer()
+    consumer.assign(List(tp).asJava)
+    consumeRecords(consumer, numRecords)
   }
 
   @Test
   def testProduceConsumeViaSubscribe(): Unit = {
     setAclsAndProduce(tp)
-    consumers.head.subscribe(List(topic).asJava)
-    consumeRecords(this.consumers.head, numRecords)
+    val consumer = createConsumer()
+    consumer.subscribe(List(topic).asJava)
+    consumeRecords(consumer, numRecords)
   }
 
   @Test
   def testProduceConsumeWithWildcardAcls(): Unit = {
     setWildcardResourceAcls()
-    sendRecords(numRecords, tp)
-    consumers.head.subscribe(List(topic).asJava)
-    consumeRecords(this.consumers.head, numRecords)
+    val producer = createProducer()
+    sendRecords(producer, numRecords, tp)
+    val consumer = createConsumer()
+    consumer.subscribe(List(topic).asJava)
+    consumeRecords(consumer, numRecords)
   }
 
   @Test
   def testProduceConsumeWithPrefixedAcls(): Unit = {
     setPrefixedResourceAcls()
-    sendRecords(numRecords, tp)
-    consumers.head.subscribe(List(topic).asJava)
-    consumeRecords(this.consumers.head, numRecords)
+    val producer = createProducer()
+    sendRecords(producer, numRecords, tp)
+    val consumer = createConsumer()
+    consumer.subscribe(List(topic).asJava)
+    consumeRecords(consumer, numRecords)
   }
 
   @Test
@@ -244,8 +238,9 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     // topic2 is not created on setup()
     val tp2 = new TopicPartition("topic2", 0)
     setAclsAndProduce(tp2)
-    consumers.head.assign(List(tp2).asJava)
-    consumeRecords(this.consumers.head, numRecords, topic = tp2.topic)
+    val consumer = createConsumer()
+    consumer.assign(List(tp2).asJava)
+    consumeRecords(consumer, numRecords, topic = tp2.topic)
   }
 
   private def setWildcardResourceAcls() {
@@ -271,7 +266,8 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
       TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, new Resource(Topic, tp.topic))
       TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource)
     }
-    sendRecords(numRecords, tp)
+    val producer = createProducer()
+    sendRecords(producer, numRecords, tp)
   }
 
   /**
@@ -280,7 +276,8 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     */
   @Test(expected = classOf[TopicAuthorizationException])
   def testNoProduceWithoutDescribeAcl(): Unit = {
-    sendRecords(numRecords, tp)
+    val producer = createProducer()
+    sendRecords(producer, numRecords, tp)
   }
 
   @Test
@@ -290,7 +287,8 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
       TestUtils.waitAndVerifyAcls(TopicDescribeAcl, s.apis.authorizer.get, topicResource)
     }
     try{
-      sendRecords(numRecords, tp)
+      val producer = createProducer()
+      sendRecords(producer, numRecords, tp)
       fail("exception expected")
     } catch {
       case e: TopicAuthorizationException =>
@@ -305,17 +303,19 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
   @Test(expected = classOf[KafkaException])
   def testNoConsumeWithoutDescribeAclViaAssign(): Unit = {
     noConsumeWithoutDescribeAclSetup()
-    consumers.head.assign(List(tp).asJava)
+    val consumer = createConsumer()
+    consumer.assign(List(tp).asJava)
     // the exception is expected when the consumer attempts to lookup offsets
-    consumeRecords(this.consumers.head)
+    consumeRecords(consumer)
   }
   
   @Test(expected = classOf[TopicAuthorizationException])
   def testNoConsumeWithoutDescribeAclViaSubscribe(): Unit = {
     noConsumeWithoutDescribeAclSetup()
-    consumers.head.subscribe(List(topic).asJava)
+    val consumer = createConsumer()
+    consumer.subscribe(List(topic).asJava)
     // this should timeout since the consumer will not be able to fetch any metadata for the topic
-    consumeRecords(this.consumers.head, timeout = 3000)
+    consumeRecords(consumer, timeout = 3000)
   }
   
   private def noConsumeWithoutDescribeAclSetup(): Unit = {
@@ -326,7 +326,8 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
       TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource)
     }
 
-    sendRecords(numRecords, tp)
+    val producer = createProducer()
+    sendRecords(producer, numRecords, tp)
 
     AclCommand.main(deleteDescribeAclArgs)
     AclCommand.main(deleteWriteAclArgs)
@@ -338,10 +339,11 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
   @Test
   def testNoConsumeWithDescribeAclViaAssign(): Unit = {
     noConsumeWithDescribeAclSetup()
-    consumers.head.assign(List(tp).asJava)
+    val consumer = createConsumer()
+    consumer.assign(List(tp).asJava)
 
     try {
-      consumeRecords(this.consumers.head)
+      consumeRecords(consumer)
       fail("Topic authorization exception expected")
     } catch {
       case e: TopicAuthorizationException =>
@@ -352,10 +354,11 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
   @Test
   def testNoConsumeWithDescribeAclViaSubscribe(): Unit = {
     noConsumeWithDescribeAclSetup()
-    consumers.head.subscribe(List(topic).asJava)
+    val consumer = createConsumer()
+    consumer.subscribe(List(topic).asJava)
 
     try {
-      consumeRecords(this.consumers.head)
+      consumeRecords(consumer)
       fail("Topic authorization exception expected")
     } catch {
       case e: TopicAuthorizationException =>
@@ -370,7 +373,8 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
       TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, topicResource)
       TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource)
     }
-    sendRecords(numRecords, tp)
+    val producer = createProducer()
+    sendRecords(producer, numRecords, tp)
   }
 
   /**
@@ -383,10 +387,13 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     servers.foreach { s =>
       TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, topicResource)
     }
-    sendRecords(numRecords, tp)
-    consumers.head.assign(List(tp).asJava)
+    val producer = createProducer()
+    sendRecords(producer, numRecords, tp)
+
+    val consumer = createConsumer()
+    consumer.assign(List(tp).asJava)
     try {
-      consumeRecords(this.consumers.head)
+      consumeRecords(consumer)
       fail("Topic authorization exception expected")
     } catch {
       case e: GroupAuthorizationException =>
@@ -394,11 +401,12 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     }
   }
 
-  protected final def sendRecords(numRecords: Int, tp: TopicPartition) {
+  protected final def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]],
+                                  numRecords: Int, tp: TopicPartition) {
     val futures = (0 until numRecords).map { i =>
       val record = new ProducerRecord(tp.topic(), tp.partition(), s"$i".getBytes, s"$i".getBytes)
       debug(s"Sending this record: $record")
-      this.producers.head.send(record)
+      producer.send(record)
     }
     try {
       futures.foreach(_.get)
@@ -408,11 +416,11 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
   }
 
   protected final def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]],
-                             numRecords: Int = 1,
-                             startingOffset: Int = 0,
-                             topic: String = topic,
-                             part: Int = part,
-                             timeout: Long = 10000) {
+                                     numRecords: Int = 1,
+                                     startingOffset: Int = 0,
+                                     topic: String = topic,
+                                     part: Int = part,
+                                     timeout: Long = 10000) {
     val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]()
 
     val deadlineMs = System.currentTimeMillis() + timeout
diff --git a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
index b0e9570..158fe57 100644
--- a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
@@ -26,7 +26,6 @@ import java.util.Properties
 
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.record.CompressionType
-import org.apache.kafka.common.security.auth.SecurityProtocol
 
 class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness {
   val offsetsTopicCompressionCodec = CompressionType.GZIP
@@ -40,8 +39,7 @@ class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness {
 
   @Test
   def testGroupCoordinatorPropagatesOfffsetsTopicCompressionCodec() {
-    val consumer = TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers),
-                                               securityProtocol = SecurityProtocol.PLAINTEXT)
+    val consumer = TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers))
     val offsetMap = Map(
       new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0) -> new OffsetAndMetadata(10, "")
     ).asJava
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 4601417..0e2797a 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -17,39 +17,40 @@
 
 package kafka.api
 
-import org.apache.kafka.clients.producer.ProducerConfig
+import java.time.Duration
+
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import kafka.utils.TestUtils
 import kafka.utils.Implicits._
 import java.util.Properties
+import java.util.concurrent.TimeUnit
 
-import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
 import kafka.server.KafkaConfig
 import kafka.integration.KafkaServerTestHarness
 import org.apache.kafka.common.network.{ListenerName, Mode}
+import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, Serializer}
 import org.junit.{After, Before}
 
-import scala.collection.mutable.Buffer
+import scala.collection.mutable
 
 /**
  * A helper class for writing integration tests that involve producers, consumers, and servers
  */
 abstract class IntegrationTestHarness extends KafkaServerTestHarness {
+  protected def serverCount: Int
+  protected def logDirCount: Int = 1
 
-  val producerCount: Int
-  val consumerCount: Int
-  val serverCount: Int
-  var logDirCount: Int = 1
-  lazy val producerConfig = new Properties
-  lazy val consumerConfig = new Properties
-  lazy val serverConfig = new Properties
+  val producerConfig = new Properties
+  val consumerConfig = new Properties
+  val serverConfig = new Properties
 
-  val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
-  val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
+  private val consumers = mutable.Buffer[KafkaConsumer[_, _]]()
+  private val producers = mutable.Buffer[KafkaProducer[_, _]]()
 
   protected def interBrokerListenerName: ListenerName = listenerName
 
-  override def generateConfigs = {
+  override def generateConfigs: Seq[KafkaConfig] = {
     val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
       trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = logDirCount)
     cfgs.foreach { config =>
@@ -69,22 +70,29 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
 
   @Before
   override def setUp() {
-    val producerSecurityProps = clientSecurityProps("producer")
-    val consumerSecurityProps = clientSecurityProps("consumer")
+    doSetup(createOffsetsTopic = true)
+  }
+
+  def doSetup(createOffsetsTopic: Boolean): Unit = {
+    // Generate client security properties before starting the brokers in case certs are needed
+    producerConfig ++= clientSecurityProps("producer")
+    consumerConfig ++= clientSecurityProps("consumer")
+
     super.setUp()
-    producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
-    producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
-    producerConfig ++= producerSecurityProps
-    consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
-    consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
-    consumerConfig ++= consumerSecurityProps
-    for (_ <- 0 until producerCount)
-      producers += createProducer
-    for (_ <- 0 until consumerCount) {
-      consumers += createConsumer
-    }
 
-    TestUtils.createOffsetsTopic(zkClient, servers)
+    producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    producerConfig.putIfAbsent(ProducerConfig.ACKS_CONFIG, "-1")
+    producerConfig.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName)
+    producerConfig.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName)
+
+    consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    consumerConfig.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+    consumerConfig.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group")
+    consumerConfig.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName)
+    consumerConfig.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer].getName)
+
+    if (createOffsetsTopic)
+      TestUtils.createOffsetsTopic(zkClient, servers)
   }
 
   def clientSecurityProps(certAlias: String): Properties = {
@@ -92,26 +100,35 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
       clientSaslProperties)
   }
 
-  def createProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
-    TestUtils.createProducer(brokerList,
-      securityProtocol = this.securityProtocol,
-      trustStoreFile = this.trustStoreFile,
-      saslProperties = this.clientSaslProperties,
-      props = Some(producerConfig))
+  def createProducer[K, V](keySerializer: Serializer[K] = new ByteArraySerializer,
+                           valueSerializer: Serializer[V] = new ByteArraySerializer,
+                           configOverrides: Properties = new Properties): KafkaProducer[K, V] = {
+    val props = new Properties
+    props ++= producerConfig
+    props ++= configOverrides
+    val producer = new KafkaProducer[K, V](props, keySerializer, valueSerializer)
+    producers += producer
+    producer
   }
 
-  def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = {
-    TestUtils.createConsumer(brokerList,
-      securityProtocol = this.securityProtocol,
-      trustStoreFile = this.trustStoreFile,
-      saslProperties = this.clientSaslProperties,
-      props = Some(consumerConfig))
+  def createConsumer[K, V](keyDeserializer: Deserializer[K] = new ByteArrayDeserializer,
+                           valueDeserializer: Deserializer[V] = new ByteArrayDeserializer,
+                           configOverrides: Properties = new Properties): KafkaConsumer[K, V] = {
+    val props = new Properties
+    props ++= consumerConfig
+    props ++= configOverrides
+    val consumer = new KafkaConsumer[K, V](props, keyDeserializer, valueDeserializer)
+    consumers += consumer
+    consumer
   }
 
   @After
   override def tearDown() {
-    producers.foreach(_.close())
-    consumers.foreach(_.close())
+    producers.foreach(_.close(0, TimeUnit.MILLISECONDS))
+    consumers.foreach(_.wakeup())
+    consumers.foreach(_.close(Duration.ZERO))
+    producers.clear()
+    consumers.clear()
     super.tearDown()
   }
 
diff --git a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
index 115ec05..f1eca54 100644
--- a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
@@ -79,13 +79,14 @@ class LegacyAdminClientTest extends IntegrationTestHarness with Logging {
 
   @Test
   def testOffsetsForTimesWhenOffsetNotFound() {
-    val consumer = consumers.head
+    val consumer = createConsumer()
     assertNull(consumer.offsetsForTimes(Map(tp -> JLong.valueOf(0L)).asJava).get(tp))
   }
 
   @Test
   def testListGroups() {
-    subscribeAndWaitForAssignment(topic, consumers.head)
+    val consumer = createConsumer()
+    subscribeAndWaitForAssignment(topic, consumer)
 
     val groups = client.listAllGroupsFlattened
     assertFalse(groups.isEmpty)
@@ -96,7 +97,8 @@ class LegacyAdminClientTest extends IntegrationTestHarness with Logging {
 
   @Test
   def testListAllBrokerVersionInfo() {
-    subscribeAndWaitForAssignment(topic, consumers.head)
+    val consumer = createConsumer()
+    subscribeAndWaitForAssignment(topic, consumer)
 
     val brokerVersionInfos = client.listAllBrokerVersionInfo
     val brokers = brokerList.split(",")
@@ -111,7 +113,8 @@ class LegacyAdminClientTest extends IntegrationTestHarness with Logging {
 
   @Test
   def testGetConsumerGroupSummary() {
-    subscribeAndWaitForAssignment(topic, consumers.head)
+    val consumer = createConsumer()
+    subscribeAndWaitForAssignment(topic, consumer)
 
     val group = client.describeConsumerGroup(groupId)
     assertEquals("range", group.assignmentStrategy)
@@ -126,7 +129,8 @@ class LegacyAdminClientTest extends IntegrationTestHarness with Logging {
 
   @Test
   def testDescribeConsumerGroup() {
-    subscribeAndWaitForAssignment(topic, consumers.head)
+    val consumer = createConsumer()
+    subscribeAndWaitForAssignment(topic, consumer)
 
     val consumerGroupSummary = client.describeConsumerGroup(groupId)
     assertEquals(1, consumerGroupSummary.consumers.get.size)
diff --git a/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala b/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
index eaa4a23..c36a3f1 100644
--- a/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
+++ b/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
@@ -53,7 +53,7 @@ class LogAppendTimeTest extends IntegrationTestHarness {
 
   @Test
   def testProduceConsume() {
-    val producer = producers.head
+    val producer = createProducer()
     val now = System.currentTimeMillis()
     val createTime = now - TimeUnit.DAYS.toMillis(1)
     val producerRecords = (1 to 10).map(i => new ProducerRecord(topic, null, createTime, s"key$i".getBytes,
@@ -64,7 +64,7 @@ class LogAppendTimeTest extends IntegrationTestHarness {
       assertTrue(recordMetadata.timestamp < now + TimeUnit.SECONDS.toMillis(60))
     }
 
-    val consumer = consumers.head
+    val consumer = createConsumer()
     consumer.subscribe(Collections.singleton(topic))
     val consumerRecords = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]
     TestUtils.waitUntilTrue(() => {
diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
index 494bcce..814754d 100644
--- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
@@ -19,6 +19,7 @@ import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.utils.{JaasTestUtils, TestUtils}
 import com.yammer.metrics.Metrics
 import com.yammer.metrics.core.{Gauge, Histogram, Meter}
+import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.{Metric, MetricName, TopicPartition}
 import org.apache.kafka.common.config.SaslConfigs
@@ -32,8 +33,6 @@ import scala.collection.JavaConverters._
 
 class MetricsTest extends IntegrationTestHarness with SaslSetup {
 
-  override val producerCount = 1
-  override val consumerCount = 1
   override val serverCount = 1
 
   override protected def listenerName = new ListenerName("CLIENT")
@@ -80,17 +79,17 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
     // Produce and consume some records
     val numRecords = 10
     val recordSize = 100000
-    val producer = producers.head
+    val producer = createProducer()
     sendRecords(producer, numRecords, recordSize, tp)
 
-    val consumer = this.consumers.head
+    val consumer = createConsumer()
     consumer.assign(List(tp).asJava)
     consumer.seek(tp, 0)
     TestUtils.consumeRecords(consumer, numRecords)
 
-    verifyKafkaRateMetricsHaveCumulativeCount()
+    verifyKafkaRateMetricsHaveCumulativeCount(producer, consumer)
     verifyClientVersionMetrics(consumer.metrics, "Consumer")
-    verifyClientVersionMetrics(this.producers.head.metrics, "Producer")
+    verifyClientVersionMetrics(producer.metrics, "Producer")
 
     val server = servers.head
     verifyBrokerMessageConversionMetrics(server, recordSize, tp)
@@ -112,16 +111,17 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
 
   // Create a producer that fails authentication to verify authentication failure metrics
   private def generateAuthenticationFailure(tp: TopicPartition): Unit = {
-    val producerProps = new Properties()
     val saslProps = new Properties()
      // Temporary limit to reduce blocking before KIP-152 client-side changes are merged
-    saslProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000")
-    saslProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000")
     saslProps.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256")
     // Use acks=0 to verify error metric when connection is closed without a response
-    saslProps.put(ProducerConfig.ACKS_CONFIG, "0")
-    val producer = TestUtils.createProducer(brokerList, securityProtocol = securityProtocol,
-        trustStoreFile = trustStoreFile, saslProperties = Some(saslProps), props = Some(producerProps))
+    val producer = TestUtils.createProducer(brokerList,
+      acks = 0,
+      requestTimeoutMs = 1000,
+      maxBlockMs = 1000,
+      securityProtocol = securityProtocol,
+      trustStoreFile = trustStoreFile,
+      saslProperties = Some(saslProps))
 
     try {
       producer.send(new ProducerRecord(tp.topic, tp.partition, "key".getBytes, "value".getBytes)).get
@@ -132,7 +132,8 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
     }
   }
 
-  private def verifyKafkaRateMetricsHaveCumulativeCount(): Unit =  {
+  private def verifyKafkaRateMetricsHaveCumulativeCount(producer: KafkaProducer[Array[Byte], Array[Byte]],
+                                                        consumer: KafkaConsumer[Array[Byte], Array[Byte]]): Unit =  {
 
     def exists(name: String, rateMetricName: MetricName, allMetricNames: Set[MetricName]): Boolean = {
       allMetricNames.contains(new MetricName(name, rateMetricName.group, "", rateMetricName.tags))
@@ -146,12 +147,10 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
           totalExists || totalTimeExists)
     }
 
-    val consumer = this.consumers.head
     val consumerMetricNames = consumer.metrics.keySet.asScala.toSet
     consumerMetricNames.filter(_.name.endsWith("-rate"))
         .foreach(verify(_, consumerMetricNames))
 
-    val producer = this.producers.head
     val producerMetricNames = producer.metrics.keySet.asScala.toSet
     val producerExclusions = Set("compression-rate") // compression-rate is an Average metric, not Rate
     producerMetricNames.filter(_.name.endsWith("-rate"))
@@ -198,7 +197,6 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
         tempBytes >= recordSize)
 
     verifyYammerMetricRecorded(s"kafka.server:type=BrokerTopicMetrics,name=ProduceMessageConversionsPerSec")
-
     verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=MessageConversionsTimeMs,request=Produce", value => value > 0.0)
     verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Fetch")
     verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=TemporaryMemoryBytes,request=Fetch", value => value == 0.0)
@@ -230,7 +228,8 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
     verifyYammerMetricRecorded(s"$errorMetricPrefix,request=Metadata,error=NONE")
 
     try {
-      consumers.head.partitionsFor("12{}!")
+      val consumer = createConsumer()
+      consumer.partitionsFor("12{}!")
     } catch {
       case _: InvalidTopicException => // expected
     }
@@ -242,7 +241,8 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
     assertTrue(s"Too many error metrics $currentErrorMetricCount" , currentErrorMetricCount < 10)
 
     // Verify that error metric is updated with producer acks=0 when no response is sent
-    sendRecords(producers.head, 1, 100, new TopicPartition("non-existent", 0))
+    val producer = createProducer()
+    sendRecords(producer, numRecords = 1, recordSize = 100, new TopicPartition("non-existent", 0))
     verifyYammerMetricRecorded(s"$errorMetricPrefix,request=Metadata,error=LEADER_NOT_AVAILABLE")
   }
 
@@ -251,7 +251,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
     val matchingMetrics = metrics.asScala.filter {
       case (metricName, _) => metricName.name == name && group.forall(_ == metricName.group)
     }
-    assertTrue(s"Metric not found $name", matchingMetrics.size > 0)
+    assertTrue(s"Metric not found $name", matchingMetrics.nonEmpty)
     verify(matchingMetrics.values)
   }
 
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 0c8c771..983b63c 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -46,14 +46,16 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     record.headers().add("headerKey", "headerValue".getBytes)
 
-    this.producers.head.send(record)
+    val producer = createProducer()
+    producer.send(record)
 
-    assertEquals(0, this.consumers.head.assignment.size)
-    this.consumers.head.assign(List(tp).asJava)
-    assertEquals(1, this.consumers.head.assignment.size)
+    val consumer = createConsumer()
+    assertEquals(0, consumer.assignment.size)
+    consumer.assign(List(tp).asJava)
+    assertEquals(1, consumer.assignment.size)
 
-    this.consumers.head.seek(tp, 0)
-    val records = consumeRecords(consumer = this.consumers.head, numRecords = numRecords)
+    consumer.seek(tp, 0)
+    val records = consumeRecords(consumer = consumer, numRecords = numRecords)
 
     assertEquals(numRecords, records.size)
 
@@ -111,19 +113,20 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     }
 
-    val producer0 = new KafkaProducer(this.producerConfig, new ByteArraySerializer(), extendedSerializer)
-    producers += producer0
-    producer0.send(record)
-
-    val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), extendedDeserializer)
-    consumers += consumer0
+    val producer = createProducer(
+      keySerializer = new ByteArraySerializer,
+      valueSerializer = extendedSerializer)
+    producer.send(record)
 
-    assertEquals(0, consumer0.assignment.size)
-    consumer0.assign(List(tp).asJava)
-    assertEquals(1, consumer0.assignment.size)
+    val consumer = createConsumer(
+      keyDeserializer = new ByteArrayDeserializer,
+      valueDeserializer = extendedDeserializer)
+    assertEquals(0, consumer.assignment.size)
+    consumer.assign(List(tp).asJava)
+    assertEquals(1, consumer.assignment.size)
 
-    consumer0.seek(tp, 0)
-    val records = consumeRecords(consumer = consumer0, numRecords = numRecords)
+    consumer.seek(tp, 0)
+    val records = consumeRecords(consumer = consumer, numRecords = numRecords)
 
     assertEquals(numRecords, records.size)
   }
@@ -133,16 +136,13 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     val maxPollRecords = 2
     val numRecords = 10000
 
-    sendRecords(numRecords)
+    val producer = createProducer()
+    sendRecords(producer, numRecords, tp)
 
     this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords.toString)
-    val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
-    consumers += consumer0
-
-    consumer0.assign(List(tp).asJava)
-
-    consumeAndVerifyRecords(consumer0, numRecords = numRecords, startingOffset = 0,
-      maxPollRecords = maxPollRecords)
+    val consumer = createConsumer()
+    consumer.assign(List(tp).asJava)
+    consumeAndVerifyRecords(consumer, numRecords = numRecords, startingOffset = 0, maxPollRecords = maxPollRecords)
   }
 
   @Test
@@ -151,21 +151,20 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500.toString)
     this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 2000.toString)
 
-    val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
-    consumers += consumer0
+    val consumer = createConsumer()
 
     val listener = new TestConsumerReassignmentListener()
-    consumer0.subscribe(List(topic).asJava, listener)
+    consumer.subscribe(List(topic).asJava, listener)
 
     // poll once to get the initial assignment
-    consumer0.poll(0)
+    consumer.poll(0)
     assertEquals(1, listener.callsToAssigned)
     assertEquals(1, listener.callsToRevoked)
 
     Thread.sleep(3500)
 
     // we should fall out of the group and need to rebalance
-    consumer0.poll(0)
+    consumer.poll(0)
     assertEquals(2, listener.callsToAssigned)
     assertEquals(2, listener.callsToRevoked)
   }
@@ -177,9 +176,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000.toString)
     this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false.toString)
 
-    val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
-    consumers += consumer0
-
+    val consumer = createConsumer()
     var commitCompleted = false
     var committedPosition: Long = -1
 
@@ -190,22 +187,22 @@ class PlaintextConsumerTest extends BaseConsumerTest {
           // than session timeout and then try a commit. We should still be in the group,
           // so the commit should succeed
           Utils.sleep(1500)
-          committedPosition = consumer0.position(tp)
-          consumer0.commitSync(Map(tp -> new OffsetAndMetadata(committedPosition)).asJava)
+          committedPosition = consumer.position(tp)
+          consumer.commitSync(Map(tp -> new OffsetAndMetadata(committedPosition)).asJava)
           commitCompleted = true
         }
         super.onPartitionsRevoked(partitions)
       }
     }
 
-    consumer0.subscribe(List(topic).asJava, listener)
+    consumer.subscribe(List(topic).asJava, listener)
 
     // poll once to join the group and get the initial assignment
-    consumer0.poll(0)
+    consumer.poll(0)
 
     // force a rebalance to trigger an invocation of the revocation callback while in the group
-    consumer0.subscribe(List("otherTopic").asJava, listener)
-    consumer0.poll(0)
+    consumer.subscribe(List("otherTopic").asJava, listener)
+    consumer.poll(0)
 
     assertEquals(0, committedPosition)
     assertTrue(commitCompleted)
@@ -218,9 +215,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000.toString)
     this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false.toString)
 
-    val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
-    consumers += consumer0
-
+    val consumer = createConsumer()
     val listener = new TestConsumerReassignmentListener {
       override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = {
         // sleep longer than the session timeout, we should still be in the group after invocation
@@ -228,13 +223,13 @@ class PlaintextConsumerTest extends BaseConsumerTest {
         super.onPartitionsAssigned(partitions)
       }
     }
-    consumer0.subscribe(List(topic).asJava, listener)
+    consumer.subscribe(List(topic).asJava, listener)
 
     // poll once to join the group and get the initial assignment
-    consumer0.poll(0)
+    consumer.poll(0)
 
     // we should still be in the group after this invocation
-    consumer0.poll(0)
+    consumer.poll(0)
 
     assertEquals(1, listener.callsToAssigned)
     assertEquals(1, listener.callsToRevoked)
@@ -243,71 +238,81 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   @Test
   def testAutoCommitOnClose() {
     this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
-    val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+    val consumer = createConsumer()
 
     val numRecords = 10000
-    sendRecords(numRecords)
+    val producer = createProducer()
+    sendRecords(producer, numRecords, tp)
 
-    consumer0.subscribe(List(topic).asJava)
+    consumer.subscribe(List(topic).asJava)
 
     val assignment = Set(tp, tp2)
     TestUtils.waitUntilTrue(() => {
-      consumer0.poll(50)
-      consumer0.assignment() == assignment.asJava
-    }, s"Expected partitions ${assignment.asJava} but actually got ${consumer0.assignment()}")
+      consumer.poll(50)
+      consumer.assignment() == assignment.asJava
+    }, s"Expected partitions ${assignment.asJava} but actually got ${consumer.assignment()}")
 
     // should auto-commit seeked positions before closing
-    consumer0.seek(tp, 300)
-    consumer0.seek(tp2, 500)
-    consumer0.close()
+    consumer.seek(tp, 300)
+    consumer.seek(tp2, 500)
+    consumer.close()
 
     // now we should see the committed positions from another consumer
-    assertEquals(300, this.consumers.head.committed(tp).offset)
-    assertEquals(500, this.consumers.head.committed(tp2).offset)
+    val anotherConsumer = createConsumer()
+    assertEquals(300, anotherConsumer.committed(tp).offset)
+    assertEquals(500, anotherConsumer.committed(tp2).offset)
   }
 
   @Test
   def testAutoCommitOnCloseAfterWakeup() {
     this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
-    val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+    val consumer = createConsumer()
 
     val numRecords = 10000
-    sendRecords(numRecords)
+    val producer = createProducer()
+    sendRecords(producer, numRecords, tp)
 
-    consumer0.subscribe(List(topic).asJava)
+    consumer.subscribe(List(topic).asJava)
 
     val assignment = Set(tp, tp2)
     TestUtils.waitUntilTrue(() => {
-      consumer0.poll(50)
-      consumer0.assignment() == assignment.asJava
-    }, s"Expected partitions ${assignment.asJava} but actually got ${consumer0.assignment()}")
+      consumer.poll(50)
+      consumer.assignment() == assignment.asJava
+    }, s"Expected partitions ${assignment.asJava} but actually got ${consumer.assignment()}")
 
     // should auto-commit seeked positions before closing
-    consumer0.seek(tp, 300)
-    consumer0.seek(tp2, 500)
+    consumer.seek(tp, 300)
+    consumer.seek(tp2, 500)
 
     // wakeup the consumer before closing to simulate trying to break a poll
     // loop from another thread
-    consumer0.wakeup()
-    consumer0.close()
+    consumer.wakeup()
+    consumer.close()
 
     // now we should see the committed positions from another consumer
-    assertEquals(300, this.consumers.head.committed(tp).offset)
-    assertEquals(500, this.consumers.head.committed(tp2).offset)
+    val anotherConsumer = createConsumer()
+    assertEquals(300, anotherConsumer.committed(tp).offset)
+    assertEquals(500, anotherConsumer.committed(tp2).offset)
   }
 
   @Test
   def testAutoOffsetReset() {
-    sendRecords(1)
-    this.consumers.head.assign(List(tp).asJava)
-    consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = 1, startingOffset = 0)
+    val producer = createProducer()
+    sendRecords(producer, numRecords = 1, tp)
+
+    val consumer = createConsumer()
+    consumer.assign(List(tp).asJava)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 1, startingOffset = 0)
   }
 
   @Test
   def testGroupConsumption() {
-    sendRecords(10)
-    this.consumers.head.subscribe(List(topic).asJava)
-    consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = 1, startingOffset = 0)
+    val producer = createProducer()
+    sendRecords(producer, numRecords = 10, tp)
+
+    val consumer = createConsumer()
+    consumer.subscribe(List(topic).asJava)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 1, startingOffset = 0)
   }
 
   /**
@@ -322,28 +327,30 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   @Test
   def testPatternSubscription() {
     val numRecords = 10000
-    sendRecords(numRecords)
+    val producer = createProducer()
+    sendRecords(producer, numRecords, tp)
 
     val topic1 = "tblablac" // matches subscribed pattern
     createTopic(topic1, 2, serverCount)
-    sendRecords(1000, new TopicPartition(topic1, 0))
-    sendRecords(1000, new TopicPartition(topic1, 1))
+    sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 0))
+    sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 1))
 
     val topic2 = "tblablak" // does not match subscribed pattern
     createTopic(topic2, 2, serverCount)
-    sendRecords(1000, new TopicPartition(topic2, 0))
-    sendRecords(1000, new TopicPartition(topic2, 1))
+    sendRecords(producer,numRecords = 1000, new TopicPartition(topic2, 0))
+    sendRecords(producer, numRecords = 1000, new TopicPartition(topic2, 1))
 
     val topic3 = "tblab1" // does not match subscribed pattern
     createTopic(topic3, 2, serverCount)
-    sendRecords(1000, new TopicPartition(topic3, 0))
-    sendRecords(1000, new TopicPartition(topic3, 1))
+    sendRecords(producer, numRecords = 1000, new TopicPartition(topic3, 0))
+    sendRecords(producer, numRecords = 1000, new TopicPartition(topic3, 1))
 
-    assertEquals(0, this.consumers.head.assignment().size)
+    val consumer = createConsumer()
+    assertEquals(0, consumer.assignment().size)
 
     val pattern = Pattern.compile("t.*c")
-    this.consumers.head.subscribe(pattern, new TestConsumerReassignmentListener)
-    this.consumers.head.poll(50)
+    consumer.subscribe(pattern, new TestConsumerReassignmentListener)
+    consumer.poll(50)
 
     var subscriptions = Set(
       new TopicPartition(topic, 0),
@@ -352,27 +359,26 @@ class PlaintextConsumerTest extends BaseConsumerTest {
       new TopicPartition(topic1, 1))
 
     TestUtils.waitUntilTrue(() => {
-      this.consumers.head.poll(50)
-      this.consumers.head.assignment() == subscriptions.asJava
-    }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers.head.assignment()}")
+      consumer.poll(50)
+      consumer.assignment() == subscriptions.asJava
+    }, s"Expected partitions ${subscriptions.asJava} but actually got ${consumer.assignment()}")
 
     val topic4 = "tsomec" // matches subscribed pattern
     createTopic(topic4, 2, serverCount)
-    sendRecords(1000, new TopicPartition(topic4, 0))
-    sendRecords(1000, new TopicPartition(topic4, 1))
+    sendRecords(producer, numRecords = 1000, new TopicPartition(topic4, 0))
+    sendRecords(producer, numRecords = 1000, new TopicPartition(topic4, 1))
 
     subscriptions ++= Set(
       new TopicPartition(topic4, 0),
       new TopicPartition(topic4, 1))
 
-
     TestUtils.waitUntilTrue(() => {
-      this.consumers.head.poll(50)
-      this.consumers.head.assignment() == subscriptions.asJava
-    }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers.head.assignment()}")
+      consumer.poll(50)
+      consumer.assignment() == subscriptions.asJava
+    }, s"Expected partitions ${subscriptions.asJava} but actually got ${consumer.assignment()}")
 
-    this.consumers.head.unsubscribe()
-    assertEquals(0, this.consumers.head.assignment().size)
+    consumer.unsubscribe()
+    assertEquals(0, consumer.assignment().size)
   }
 
   /**
@@ -387,23 +393,23 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   @Test
   def testSubsequentPatternSubscription() {
     this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "30000")
-    val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
-    consumers += consumer0
+    val consumer = createConsumer()
 
     val numRecords = 10000
-    sendRecords(numRecords)
+    val producer = createProducer()
+    sendRecords(producer, numRecords = numRecords, tp)
 
     // the first topic ('topic')  matches first subscription pattern only
 
     val fooTopic = "foo" // matches both subscription patterns
     createTopic(fooTopic, 1, serverCount)
-    sendRecords(1000, new TopicPartition(fooTopic, 0))
+    sendRecords(producer, numRecords = 1000, new TopicPartition(fooTopic, 0))
 
-    assertEquals(0, consumer0.assignment().size)
+    assertEquals(0, consumer.assignment().size)
 
     val pattern1 = Pattern.compile(".*o.*") // only 'topic' and 'foo' match this
-    consumer0.subscribe(pattern1, new TestConsumerReassignmentListener)
-    consumer0.poll(50)
+    consumer.subscribe(pattern1, new TestConsumerReassignmentListener)
+    consumer.poll(50)
 
     var subscriptions = Set(
       new TopicPartition(topic, 0),
@@ -411,17 +417,17 @@ class PlaintextConsumerTest extends BaseConsumerTest {
       new TopicPartition(fooTopic, 0))
 
     TestUtils.waitUntilTrue(() => {
-      consumer0.poll(50)
-      consumer0.assignment() == subscriptions.asJava
-    }, s"Expected partitions ${subscriptions.asJava} but actually got ${consumer0.assignment()}")
+      consumer.poll(50)
+      consumer.assignment() == subscriptions.asJava
+    }, s"Expected partitions ${subscriptions.asJava} but actually got ${consumer.assignment()}")
 
     val barTopic = "bar" // matches the next subscription pattern
     createTopic(barTopic, 1, serverCount)
-    sendRecords(1000, new TopicPartition(barTopic, 0))
+    sendRecords(producer, numRecords = 1000, new TopicPartition(barTopic, 0))
 
     val pattern2 = Pattern.compile("...") // only 'foo' and 'bar' match this
-    consumer0.subscribe(pattern2, new TestConsumerReassignmentListener)
-    consumer0.poll(50)
+    consumer.subscribe(pattern2, new TestConsumerReassignmentListener)
+    consumer.poll(50)
 
     subscriptions --= Set(
       new TopicPartition(topic, 0),
@@ -431,12 +437,12 @@ class PlaintextConsumerTest extends BaseConsumerTest {
       new TopicPartition(barTopic, 0))
 
     TestUtils.waitUntilTrue(() => {
-      consumer0.poll(50)
-      consumer0.assignment() == subscriptions.asJava
-    }, s"Expected partitions ${subscriptions.asJava} but actually got ${consumer0.assignment()}")
+      consumer.poll(50)
+      consumer.assignment() == subscriptions.asJava
+    }, s"Expected partitions ${subscriptions.asJava} but actually got ${consumer.assignment()}")
 
-    consumer0.unsubscribe()
-    assertEquals(0, consumer0.assignment().size)
+    consumer.unsubscribe()
+    assertEquals(0, consumer.assignment().size)
   }
 
   /**
@@ -450,17 +456,19 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   @Test
   def testPatternUnsubscription() {
     val numRecords = 10000
-    sendRecords(numRecords)
+    val producer = createProducer()
+    sendRecords(producer, numRecords, tp)
 
     val topic1 = "tblablac" // matches the subscription pattern
     createTopic(topic1, 2, serverCount)
-    sendRecords(1000, new TopicPartition(topic1, 0))
-    sendRecords(1000, new TopicPartition(topic1, 1))
+    sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 0))
+    sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 1))
 
-    assertEquals(0, this.consumers.head.assignment().size)
+    val consumer = createConsumer()
+    assertEquals(0, consumer.assignment().size)
 
-    this.consumers.head.subscribe(Pattern.compile("t.*c"), new TestConsumerReassignmentListener)
-    this.consumers.head.poll(50)
+    consumer.subscribe(Pattern.compile("t.*c"), new TestConsumerReassignmentListener)
+    consumer.poll(50)
 
     val subscriptions = Set(
       new TopicPartition(topic, 0),
@@ -469,39 +477,40 @@ class PlaintextConsumerTest extends BaseConsumerTest {
       new TopicPartition(topic1, 1))
 
     TestUtils.waitUntilTrue(() => {
-      this.consumers.head.poll(50)
-      this.consumers.head.assignment() == subscriptions.asJava
-    }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers.head.assignment()}")
+      consumer.poll(50)
+      consumer.assignment() == subscriptions.asJava
+    }, s"Expected partitions ${subscriptions.asJava} but actually got ${consumer.assignment()}")
 
-    this.consumers.head.unsubscribe()
-    assertEquals(0, this.consumers.head.assignment().size)
+    consumer.unsubscribe()
+    assertEquals(0, consumer.assignment().size)
   }
 
   @Test
   def testCommitMetadata() {
-    this.consumers.head.assign(List(tp).asJava)
+    val consumer = createConsumer()
+    consumer.assign(List(tp).asJava)
 
     // sync commit
     val syncMetadata = new OffsetAndMetadata(5, "foo")
-    this.consumers.head.commitSync(Map((tp, syncMetadata)).asJava)
-    assertEquals(syncMetadata, this.consumers.head.committed(tp))
+    consumer.commitSync(Map((tp, syncMetadata)).asJava)
+    assertEquals(syncMetadata, consumer.committed(tp))
 
     // async commit
     val asyncMetadata = new OffsetAndMetadata(10, "bar")
     val callback = new CountConsumerCommitCallback
-    this.consumers.head.commitAsync(Map((tp, asyncMetadata)).asJava, callback)
-    awaitCommitCallback(this.consumers.head, callback)
-    assertEquals(asyncMetadata, this.consumers.head.committed(tp))
+    consumer.commitAsync(Map((tp, asyncMetadata)).asJava, callback)
+    awaitCommitCallback(consumer, callback)
+    assertEquals(asyncMetadata, consumer.committed(tp))
 
     // handle null metadata
     val nullMetadata = new OffsetAndMetadata(5, null)
-    this.consumers.head.commitSync(Map((tp, nullMetadata)).asJava)
-    assertEquals(nullMetadata, this.consumers.head.committed(tp))
+    consumer.commitSync(Map((tp, nullMetadata)).asJava)
+    assertEquals(nullMetadata, consumer.committed(tp))
   }
 
   @Test
   def testAsyncCommit() {
-    val consumer = this.consumers.head
+    val consumer = createConsumer()
     consumer.assign(List(tp).asJava)
     consumer.poll(0)
 
@@ -519,18 +528,19 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     val otherTopic = "other"
     val subscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1))
     val expandedSubscriptions = subscriptions ++ Set(new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1))
-    this.consumers.head.subscribe(List(topic).asJava)
+    val consumer = createConsumer()
+    consumer.subscribe(List(topic).asJava)
     TestUtils.waitUntilTrue(() => {
-      this.consumers.head.poll(50)
-      this.consumers.head.assignment == subscriptions.asJava
-    }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers.head.assignment}")
+      consumer.poll(50)
+      consumer.assignment == subscriptions.asJava
+    }, s"Expected partitions ${subscriptions.asJava} but actually got ${consumer.assignment}")
 
     createTopic(otherTopic, 2, serverCount)
-    this.consumers.head.subscribe(List(topic, otherTopic).asJava)
+    consumer.subscribe(List(topic, otherTopic).asJava)
     TestUtils.waitUntilTrue(() => {
-      this.consumers.head.poll(50)
-      this.consumers.head.assignment == expandedSubscriptions.asJava
-    }, s"Expected partitions ${expandedSubscriptions.asJava} but actually got ${this.consumers.head.assignment}")
+      consumer.poll(50)
+      consumer.assignment == expandedSubscriptions.asJava
+    }, s"Expected partitions ${expandedSubscriptions.asJava} but actually got ${consumer.assignment}")
   }
 
   @Test
@@ -539,47 +549,52 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     createTopic(otherTopic, 2, serverCount)
     val subscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1))
     val shrunkenSubscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1))
-    this.consumers.head.subscribe(List(topic, otherTopic).asJava)
+    val consumer = createConsumer()
+    consumer.subscribe(List(topic, otherTopic).asJava)
     TestUtils.waitUntilTrue(() => {
-      this.consumers.head.poll(50)
-      this.consumers.head.assignment == subscriptions.asJava
-    }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers.head.assignment}")
+      consumer.poll(50)
+      consumer.assignment == subscriptions.asJava
+    }, s"Expected partitions ${subscriptions.asJava} but actually got ${consumer.assignment}")
 
-    this.consumers.head.subscribe(List(topic).asJava)
+    consumer.subscribe(List(topic).asJava)
     TestUtils.waitUntilTrue(() => {
-      this.consumers.head.poll(50)
-      this.consumers.head.assignment == shrunkenSubscriptions.asJava
-    }, s"Expected partitions ${shrunkenSubscriptions.asJava} but actually got ${this.consumers.head.assignment}")
+      consumer.poll(50)
+      consumer.assignment == shrunkenSubscriptions.asJava
+    }, s"Expected partitions ${shrunkenSubscriptions.asJava} but actually got ${consumer.assignment}")
   }
 
   @Test
   def testPartitionsFor() {
     val numParts = 2
     createTopic("part-test", numParts, 1)
-    val parts = this.consumers.head.partitionsFor("part-test")
+    val consumer = createConsumer()
+    val parts = consumer.partitionsFor("part-test")
     assertNotNull(parts)
     assertEquals(2, parts.size)
   }
 
   @Test
   def testPartitionsForAutoCreate() {
-    val partitions = this.consumers.head.partitionsFor("non-exist-topic")
+    val consumer = createConsumer()
+    val partitions = consumer.partitionsFor("non-exist-topic")
     assertFalse(partitions.isEmpty)
   }
 
   @Test(expected = classOf[InvalidTopicException])
   def testPartitionsForInvalidTopic() {
-    this.consumers.head.partitionsFor(";3# ads,{234")
+    val consumer = createConsumer()
+    consumer.partitionsFor(";3# ads,{234")
   }
 
   @Test
   def testSeek() {
-    val consumer = this.consumers.head
+    val consumer = createConsumer()
     val totalRecords = 50L
     val mid = totalRecords / 2
 
     // Test seek non-compressed message
-    sendRecords(totalRecords.toInt, tp)
+    val producer = createProducer()
+    sendRecords(producer, totalRecords.toInt, tp)
     consumer.assign(List(tp).asJava)
 
     consumer.seekToEnd(List(tp).asJava)
@@ -618,8 +633,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     val producerProps = new Properties()
     producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.GZIP.name)
     producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, Int.MaxValue.toString)
-    val producer = TestUtils.createProducer(brokerList, securityProtocol = securityProtocol, trustStoreFile = trustStoreFile,
-        saslProperties = clientSaslProperties, lingerMs = Int.MaxValue, props = Some(producerProps))
+    val producer = createProducer(configOverrides = producerProps)
     (0 until numRecords).foreach { i =>
       producer.send(new ProducerRecord(tp.topic, tp.partition, i.toLong, s"key $i".getBytes, s"value $i".getBytes))
     }
@@ -628,67 +642,73 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
   @Test
   def testPositionAndCommit() {
-    sendRecords(5)
+    val producer = createProducer()
+    sendRecords(producer, numRecords = 5, tp)
 
-    assertNull(this.consumers.head.committed(new TopicPartition(topic, 15)))
+    val consumer = createConsumer()
+    assertNull(consumer.committed(new TopicPartition(topic, 15)))
 
     // position() on a partition that we aren't subscribed to throws an exception
     intercept[IllegalStateException] {
-      this.consumers.head.position(new TopicPartition(topic, 15))
+      consumer.position(new TopicPartition(topic, 15))
     }
 
-    this.consumers.head.assign(List(tp).asJava)
+    consumer.assign(List(tp).asJava)
 
-    assertEquals("position() on a partition that we are subscribed to should reset the offset", 0L, this.consumers.head.position(tp))
-    this.consumers.head.commitSync()
-    assertEquals(0L, this.consumers.head.committed(tp).offset)
+    assertEquals("position() on a partition that we are subscribed to should reset the offset", 0L, consumer.position(tp))
+    consumer.commitSync()
+    assertEquals(0L, consumer.committed(tp).offset)
 
-    consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = 5, startingOffset = 0)
-    assertEquals("After consuming 5 records, position should be 5", 5L, this.consumers.head.position(tp))
-    this.consumers.head.commitSync()
-    assertEquals("Committed offset should be returned", 5L, this.consumers.head.committed(tp).offset)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 5, startingOffset = 0)
+    assertEquals("After consuming 5 records, position should be 5", 5L, consumer.position(tp))
+    consumer.commitSync()
+    assertEquals("Committed offset should be returned", 5L, consumer.committed(tp).offset)
 
-    sendRecords(1)
+    sendRecords(producer, numRecords = 1, tp)
 
     // another consumer in the same group should get the same position
-    this.consumers(1).assign(List(tp).asJava)
-    consumeAndVerifyRecords(consumer = this.consumers(1), numRecords = 1, startingOffset = 5)
+    val otherConsumer = createConsumer()
+    otherConsumer.assign(List(tp).asJava)
+    consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, startingOffset = 5)
   }
 
   @Test
   def testPartitionPauseAndResume() {
     val partitions = List(tp).asJava
-    sendRecords(5)
-    this.consumers.head.assign(partitions)
-    consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = 5, startingOffset = 0)
-    this.consumers.head.pause(partitions)
-    sendRecords(5)
-    assertTrue(this.consumers.head.poll(0).isEmpty)
-    this.consumers.head.resume(partitions)
-    consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = 5, startingOffset = 5)
+    val producer = createProducer()
+    sendRecords(producer, numRecords = 5, tp)
+
+    val consumer = createConsumer()
+    consumer.assign(partitions)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 5, startingOffset = 0)
+    consumer.pause(partitions)
+    sendRecords(producer, numRecords = 5, tp)
+    assertTrue(consumer.poll(0).isEmpty)
+    consumer.resume(partitions)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 5, startingOffset = 5)
   }
 
   @Test
   def testFetchInvalidOffset() {
     this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
-    val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
-    consumers += consumer0
+    val consumer = createConsumer()
 
     // produce one record
     val totalRecords = 2
-    sendRecords(totalRecords, tp)
-    consumer0.assign(List(tp).asJava)
+    val producer = createProducer()
+    sendRecords(producer, totalRecords, tp)
+    consumer.assign(List(tp).asJava)
 
     // poll should fail because there is no offset reset strategy set
     intercept[NoOffsetForPartitionException] {
-      consumer0.poll(50)
+      consumer.poll(50)
     }
 
     // seek to out of range position
     val outOfRangePos = totalRecords + 1
-    consumer0.seek(tp, outOfRangePos)
+    consumer.seek(tp, outOfRangePos)
     val e = intercept[OffsetOutOfRangeException] {
-      consumer0.poll(20000)
+      consumer.poll(20000)
     }
     val outOfRangePartitions = e.offsetOutOfRangePartitions()
     assertNotNull(outOfRangePartitions)
@@ -704,17 +724,17 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   }
 
   private def checkLargeRecord(producerRecordSize: Int): Unit = {
-    val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
-    consumers += consumer0
+    val consumer = createConsumer()
 
     // produce a record that is larger than the configured fetch size
     val record = new ProducerRecord(tp.topic(), tp.partition(), "key".getBytes,
       new Array[Byte](producerRecordSize))
-    this.producers.head.send(record)
+    val producer = createProducer()
+    producer.send(record)
 
     // consuming a record that is too large should succeed since KIP-74
-    consumer0.assign(List(tp).asJava)
-    val records = consumer0.poll(20000)
+    consumer.assign(List(tp).asJava)
+    val records = consumer.poll(20000)
     assertEquals(1, records.count)
     val consumerRecord = records.iterator().next()
     assertEquals(0L, consumerRecord.offset)
@@ -733,20 +753,20 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   }
 
   private def checkFetchHonoursSizeIfLargeRecordNotFirst(largeProducerRecordSize: Int): Unit = {
-    val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
-    consumers += consumer0
+    val consumer = createConsumer()
 
     val smallRecord = new ProducerRecord(tp.topic(), tp.partition(), "small".getBytes,
       "value".getBytes)
     val largeRecord = new ProducerRecord(tp.topic(), tp.partition(), "large".getBytes,
       new Array[Byte](largeProducerRecordSize))
 
-    this.producers.head.send(smallRecord).get
-    this.producers.head.send(largeRecord).get
+    val producer = createProducer()
+    producer.send(smallRecord).get
+    producer.send(largeRecord).get
 
     // we should only get the small record in the first `poll`
-    consumer0.assign(List(tp).asJava)
-    val records = consumer0.poll(20000)
+    consumer.assign(List(tp).asJava)
+    val records = consumer.poll(20000)
     assertEquals(1, records.count)
     val consumerRecord = records.iterator().next()
     assertEquals(0L, consumerRecord.offset)
@@ -779,8 +799,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     // this behaves a little different than when remaining limit bytes is 0 and it's important to test it
     this.consumerConfig.setProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "500")
     this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "100")
-    val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
-    consumers += consumer0
+    val consumer = createConsumer()
 
     val topic1 = "topic1"
     val topic2 = "topic2"
@@ -795,17 +814,18 @@ class PlaintextConsumerTest extends BaseConsumerTest {
       (0 until partitionCount).map(new TopicPartition(topic, _))
     }
 
-    assertEquals(0, consumer0.assignment().size)
+    assertEquals(0, consumer.assignment().size)
 
-    consumer0.subscribe(List(topic1, topic2, topic3).asJava)
+    consumer.subscribe(List(topic1, topic2, topic3).asJava)
 
     TestUtils.waitUntilTrue(() => {
-      consumer0.poll(50)
-      consumer0.assignment() == partitions.toSet.asJava
-    }, s"Expected partitions ${partitions.asJava} but actually got ${consumer0.assignment}")
+      consumer.poll(50)
+      consumer.assignment() == partitions.toSet.asJava
+    }, s"Expected partitions ${partitions.asJava} but actually got ${consumer.assignment}")
 
-    val producerRecords = partitions.flatMap(sendRecords(partitionCount, _))
-    val consumerRecords = consumeRecords(consumer0, producerRecords.size)
+    val producer = createProducer()
+    val producerRecords = partitions.flatMap(sendRecords(producer, partitionCount, _))
+    val consumerRecords = consumeRecords(consumer, producerRecords.size)
 
     val expected = producerRecords.map { record =>
       (record.topic, record.partition, new String(record.key), new String(record.value), record.timestamp)
@@ -823,43 +843,44 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     // 1 consumer using round-robin assignment
     this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "roundrobin-group")
     this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[RoundRobinAssignor].getName)
-    val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
-    consumers += consumer0
+    val consumer = createConsumer()
 
     // create two new topics, each having 2 partitions
     val topic1 = "topic1"
     val topic2 = "topic2"
-    val expectedAssignment = createTopicAndSendRecords(topic1, 2, 100) ++ createTopicAndSendRecords(topic2, 2, 100)
+    val producer = createProducer()
+    val expectedAssignment = createTopicAndSendRecords(producer, topic1, 2, 100) ++
+      createTopicAndSendRecords(producer, topic2, 2, 100)
 
-    assertEquals(0, consumer0.assignment().size)
+    assertEquals(0, consumer.assignment().size)
 
     // subscribe to two topics
-    consumer0.subscribe(List(topic1, topic2).asJava)
+    consumer.subscribe(List(topic1, topic2).asJava)
     TestUtils.waitUntilTrue(() => {
-      consumer0.poll(50)
-      consumer0.assignment() == expectedAssignment.asJava
-    }, s"Expected partitions ${expectedAssignment.asJava} but actually got ${consumer0.assignment()}")
+      consumer.poll(50)
+      consumer.assignment() == expectedAssignment.asJava
+    }, s"Expected partitions ${expectedAssignment.asJava} but actually got ${consumer.assignment()}")
 
     // add one more topic with 2 partitions
     val topic3 = "topic3"
-    createTopicAndSendRecords(topic3, 2, 100)
+    createTopicAndSendRecords(producer, topic3, 2, 100)
 
     val newExpectedAssignment = expectedAssignment ++ Set(new TopicPartition(topic3, 0), new TopicPartition(topic3, 1))
-    consumer0.subscribe(List(topic1, topic2, topic3).asJava)
+    consumer.subscribe(List(topic1, topic2, topic3).asJava)
     TestUtils.waitUntilTrue(() => {
-      consumer0.poll(50)
-      consumer0.assignment() == newExpectedAssignment.asJava
-    }, s"Expected partitions ${newExpectedAssignment.asJava} but actually got ${consumer0.assignment()}")
+      consumer.poll(50)
+      consumer.assignment() == newExpectedAssignment.asJava
+    }, s"Expected partitions ${newExpectedAssignment.asJava} but actually got ${consumer.assignment()}")
 
     // remove the topic we just added
-    consumer0.subscribe(List(topic1, topic2).asJava)
+    consumer.subscribe(List(topic1, topic2).asJava)
     TestUtils.waitUntilTrue(() => {
-      consumer0.poll(50)
-      consumer0.assignment() == expectedAssignment.asJava
-    }, s"Expected partitions ${expectedAssignment.asJava} but actually got ${consumer0.assignment()}")
+      consumer.poll(50)
+      consumer.assignment() == expectedAssignment.asJava
+    }, s"Expected partitions ${expectedAssignment.asJava} but actually got ${consumer.assignment()}")
 
-    consumer0.unsubscribe()
-    assertEquals(0, consumer0.assignment().size)
+    consumer.unsubscribe()
+    assertEquals(0, consumer.assignment().size)
   }
 
   @Test
@@ -870,24 +891,24 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     // create two new topics, total number of partitions must be greater than number of consumers
     val topic1 = "topic1"
     val topic2 = "topic2"
-    val subscriptions = createTopicAndSendRecords(topic1, 5, 100) ++ createTopicAndSendRecords(topic2, 8, 100)
+    val producer = createProducer()
+    val subscriptions = createTopicAndSendRecords(producer, topic1, 5, 100) ++
+      createTopicAndSendRecords(producer, topic2, 8, 100)
 
     // create a group of consumers, subscribe the consumers to all the topics and start polling
     // for the topic partition assignment
-    val (_, consumerPollers) = createConsumerGroupAndWaitForAssignment(10, List(topic1, topic2), subscriptions)
+    val (consumerGroup, consumerPollers) = createConsumerGroupAndWaitForAssignment(10, List(topic1, topic2), subscriptions)
     try {
       validateGroupAssignment(consumerPollers, subscriptions, s"Did not get valid initial assignment for partitions ${subscriptions.asJava}")
 
       // add one more consumer and validate re-assignment
-      addConsumersToGroupAndWaitForGroupAssignment(1, consumers, consumerPollers, List(topic1, topic2), subscriptions)
+      addConsumersToGroupAndWaitForGroupAssignment(1, consumerGroup, consumerPollers,
+        List(topic1, topic2), subscriptions)
     } finally {
       consumerPollers.foreach(_.shutdown())
     }
   }
 
-  def reverse(m: Map[Long, Set[TopicPartition]]) =
-    m.values.toSet.flatten.map(v => (v, m.keys.filter(m(_).contains(v)).head)).toMap
-
   /**
    * This test runs the following scenario to verify sticky assignor behavior.
    * Topics: single-topic, with random number of partitions, where #par is 10, 20, 30, 40, 50, 60, 70, 80, 90, or 100
@@ -901,23 +922,27 @@ class PlaintextConsumerTest extends BaseConsumerTest {
    */
   @Test
   def testMultiConsumerStickyAssignment() {
-    this.consumers.clear()
+
+    def reverse(m: Map[Long, Set[TopicPartition]]) =
+      m.values.toSet.flatten.map(v => (v, m.keys.filter(m(_).contains(v)).head)).toMap
+
     this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "sticky-group")
     this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[StickyAssignor].getName)
 
     // create one new topic
     val topic = "single-topic"
     val rand = 1 + scala.util.Random.nextInt(10)
-    val partitions = createTopicAndSendRecords(topic, rand * 10, 100)
+    val producer = createProducer()
+    val partitions = createTopicAndSendRecords(producer, topic, rand * 10, 100)
 
     // create a group of consumers, subscribe the consumers to the single topic and start polling
     // for the topic partition assignment
-    val (_, consumerPollers) = createConsumerGroupAndWaitForAssignment(9, List(topic), partitions)
+    val (consumerGroup, consumerPollers) = createConsumerGroupAndWaitForAssignment(9, List(topic), partitions)
     validateGroupAssignment(consumerPollers, partitions, s"Did not get valid initial assignment for partitions ${partitions.asJava}")
     val prePartition2PollerId = reverse(consumerPollers.map(poller => (poller.getId, poller.consumerAssignment())).toMap)
 
     // add one more consumer and validate re-assignment
-    addConsumersToGroupAndWaitForGroupAssignment(1, consumers, consumerPollers, List(topic), partitions)
+    addConsumersToGroupAndWaitForGroupAssignment(1, consumerGroup, consumerPollers, List(topic), partitions)
 
     val postPartition2PollerId = reverse(consumerPollers.map(poller => (poller.getId, poller.consumerAssignment())).toMap)
     val keys = prePartition2PollerId.keySet.union(postPartition2PollerId.keySet)
@@ -944,23 +969,28 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   @Test
   def testMultiConsumerDefaultAssignment() {
     // use consumers and topics defined in this class + one more topic
-    sendRecords(100, tp)
-    sendRecords(100, tp2)
+    val producer = createProducer()
+    sendRecords(producer, numRecords = 100, tp)
+    sendRecords(producer, numRecords = 100, tp2)
     val topic1 = "topic1"
-    val subscriptions = Set(tp, tp2) ++ createTopicAndSendRecords(topic1, 5, 100)
+    val subscriptions = Set(tp, tp2) ++ createTopicAndSendRecords(producer, topic1, 5, 100)
 
     // subscribe all consumers to all topics and validate the assignment
-    val consumerPollers = subscribeConsumers(consumers, List(topic, topic1))
 
+    val consumersInGroup = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
+    consumersInGroup += createConsumer()
+    consumersInGroup += createConsumer()
+
+    val consumerPollers = subscribeConsumers(consumersInGroup, List(topic, topic1))
     try {
       validateGroupAssignment(consumerPollers, subscriptions, s"Did not get valid initial assignment for partitions ${subscriptions.asJava}")
 
       // add 2 more consumers and validate re-assignment
-      addConsumersToGroupAndWaitForGroupAssignment(2, consumers, consumerPollers, List(topic, topic1), subscriptions)
+      addConsumersToGroupAndWaitForGroupAssignment(2, consumersInGroup, consumerPollers, List(topic, topic1), subscriptions)
 
       // add one more topic and validate partition re-assignment
       val topic2 = "topic2"
-      val expandedSubscriptions = subscriptions ++ createTopicAndSendRecords(topic2, 3, 100)
+      val expandedSubscriptions = subscriptions ++ createTopicAndSendRecords(producer, topic2, 3, 100)
       changeConsumerGroupSubscriptionAndValidateAssignment(consumerPollers, List(topic, topic1, topic2), expandedSubscriptions)
 
       // remove the topic we just added and validate re-assignment
@@ -989,10 +1019,11 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     // create producer with interceptor
     val producerProps = new Properties()
-    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
-    producerProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.test.MockProducerInterceptor")
+    producerProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, classOf[MockProducerInterceptor].getName)
     producerProps.put("mock.interceptor.append", appendStr)
-    val testProducer = new KafkaProducer(producerProps, new StringSerializer, new StringSerializer)
+    val testProducer = createProducer(keySerializer = new StringSerializer,
+      valueSerializer = new StringSerializer,
+      configOverrides = producerProps)
 
     // produce records
     val numRecords = 10
@@ -1013,7 +1044,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     // create consumer with interceptor
     this.consumerConfig.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.test.MockConsumerInterceptor")
-    val testConsumer = new KafkaConsumer(this.consumerConfig, new StringDeserializer, new StringDeserializer)
+    val testConsumer = createConsumer(keyDeserializer = new StringDeserializer, valueDeserializer = new StringDeserializer)
     testConsumer.assign(List(tp).asJava)
     testConsumer.seek(tp, 0)
 
@@ -1053,7 +1084,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     // produce records
     val numRecords = 100
-    val testProducer = new KafkaProducer[String, String](this.producerConfig, new StringSerializer, new StringSerializer)
+    val testProducer = createProducer(keySerializer = new StringSerializer, valueSerializer = new StringSerializer)
     (0 until numRecords).map { i =>
       testProducer.send(new ProducerRecord(tp.topic(), tp.partition(), s"key $i", s"value $i"))
     }.foreach(_.get)
@@ -1061,7 +1092,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     // create consumer with interceptor
     this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
     this.consumerConfig.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.test.MockConsumerInterceptor")
-    val testConsumer = new KafkaConsumer[String, String](this.consumerConfig, new StringDeserializer(), new StringDeserializer())
+    val testConsumer = createConsumer(keyDeserializer = new StringDeserializer, valueDeserializer = new StringDeserializer)
     val rebalanceListener = new ConsumerRebalanceListener {
       override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]) = {
         // keep partitions paused in this test so that we can verify the commits based on specific seeks
@@ -1104,16 +1135,14 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
     producerProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.test.MockProducerInterceptor")
     producerProps.put("mock.interceptor.append", appendStr)
-    val testProducer = new KafkaProducer(producerProps, new ByteArraySerializer(), new ByteArraySerializer())
-    producers += testProducer
+    val testProducer = createProducer()
 
     // producing records should succeed
     testProducer.send(new ProducerRecord(tp.topic(), tp.partition(), s"key".getBytes, s"value will not be modified".getBytes))
 
     // create consumer with interceptor that has different key and value types from the consumer
     this.consumerConfig.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.test.MockConsumerInterceptor")
-    val testConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
-    consumers += testConsumer
+    val testConsumer = createConsumer()
 
     testConsumer.assign(List(tp).asJava)
     testConsumer.seek(tp, 0)
@@ -1128,15 +1157,17 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   def testConsumeMessagesWithCreateTime() {
     val numRecords = 50
     // Test non-compressed messages
-    sendRecords(numRecords, tp)
-    this.consumers.head.assign(List(tp).asJava)
-    consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = numRecords, startingOffset = 0, startingKeyAndValueIndex = 0,
+    val producer = createProducer()
+    sendRecords(producer, numRecords, tp)
+    val consumer = createConsumer()
+    consumer.assign(List(tp).asJava)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, startingOffset = 0, startingKeyAndValueIndex = 0,
       startingTimestamp = 0)
 
     // Test compressed messages
     sendCompressedMessages(numRecords, tp2)
-    this.consumers.head.assign(List(tp2).asJava)
-    consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = numRecords, tp = tp2, startingOffset = 0, startingKeyAndValueIndex = 0,
+    consumer.assign(List(tp2).asJava)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, tp = tp2, startingOffset = 0, startingKeyAndValueIndex = 0,
       startingTimestamp = 0)
   }
 
@@ -1152,16 +1183,19 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     // Test non-compressed messages
     val tp1 = new TopicPartition(topicName, 0)
-    sendRecords(numRecords, tp1)
-    this.consumers.head.assign(List(tp1).asJava)
-    consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = numRecords, tp = tp1, startingOffset = 0, startingKeyAndValueIndex = 0,
+    val producer = createProducer()
+    sendRecords(producer, numRecords, tp1)
+
+    val consumer = createConsumer()
+    consumer.assign(List(tp1).asJava)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, tp = tp1, startingOffset = 0, startingKeyAndValueIndex = 0,
       startingTimestamp = startTime, timestampType = TimestampType.LOG_APPEND_TIME)
 
     // Test compressed messages
     val tp2 = new TopicPartition(topicName, 1)
     sendCompressedMessages(numRecords, tp2)
-    this.consumers.head.assign(List(tp2).asJava)
-    consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = numRecords, tp = tp2, startingOffset = 0, startingKeyAndValueIndex = 0,
+    consumer.assign(List(tp2).asJava)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, tp = tp2, startingOffset = 0, startingKeyAndValueIndex = 0,
       startingTimestamp = startTime, timestampType = TimestampType.LOG_APPEND_TIME)
   }
 
@@ -1175,7 +1209,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     createTopic(topic2, numParts, 1)
     createTopic(topic3, numParts, 1)
 
-    val topics = this.consumers.head.listTopics()
+    val consumer = createConsumer()
+    val topics = consumer.listTopics()
     assertNotNull(topics)
     assertEquals(5, topics.size())
     assertEquals(5, topics.keySet().size())
@@ -1197,19 +1232,20 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     createTopic(topic2, numParts, 1, props)
     createTopic(topic3, numParts, 1)
 
-    val consumer = this.consumers.head
+    val consumer = createConsumer()
 
     // Test negative target time
     intercept[IllegalArgumentException](
       consumer.offsetsForTimes(Collections.singletonMap(new TopicPartition(topic1, 0), -1)))
 
+    val producer = createProducer()
     val timestampsToSearch = new util.HashMap[TopicPartition, java.lang.Long]()
     var i = 0
     for (topic <- List(topic1, topic2, topic3)) {
       for (part <- 0 until numParts) {
         val tp = new TopicPartition(topic, part)
         // In sendRecords(), each message will have key, value and timestamp equal to the sequence number.
-        sendRecords(100, tp)
+        sendRecords(producer, numRecords = 100, tp)
         timestampsToSearch.put(tp, i * 20)
         i += 1
       }
@@ -1239,17 +1275,18 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   def testEarliestOrLatestOffsets() {
     val topic0 = "topicWithNewMessageFormat"
     val topic1 = "topicWithOldMessageFormat"
-    createTopicAndSendRecords(topicName = topic0, numPartitions = 2, recordsPerPartition = 100)
+    val producer = createProducer()
+    createTopicAndSendRecords(producer, topicName = topic0, numPartitions = 2, recordsPerPartition = 100)
     val props = new Properties()
     props.setProperty(LogConfig.MessageFormatVersionProp, "0.9.0")
     createTopic(topic1, numPartitions = 1, replicationFactor = 1, props)
-    sendRecords(100, new TopicPartition(topic1, 0))
+    sendRecords(producer, numRecords = 100, new TopicPartition(topic1, 0))
 
     val t0p0 = new TopicPartition(topic0, 0)
     val t0p1 = new TopicPartition(topic0, 1)
     val t1p0 = new TopicPartition(topic1, 0)
     val partitions = Set(t0p0, t0p1, t1p0).asJava
-    val consumer = this.consumers.head
+    val consumer = createConsumer()
 
     val earliests = consumer.beginningOffsets(partitions)
     assertEquals(0L, earliests.get(t0p0))
@@ -1266,67 +1303,68 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   def testUnsubscribeTopic() {
     this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test
     this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30")
-    val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
-    consumers += consumer0
+    val consumer = createConsumer()
 
     val listener = new TestConsumerReassignmentListener()
-    consumer0.subscribe(List(topic).asJava, listener)
+    consumer.subscribe(List(topic).asJava, listener)
 
     // the initial subscription should cause a callback execution
     while (listener.callsToAssigned == 0)
-      consumer0.poll(50)
+      consumer.poll(50)
 
-    consumer0.subscribe(List[String]().asJava)
-    assertEquals(0, consumer0.assignment.size())
+    consumer.subscribe(List[String]().asJava)
+    assertEquals(0, consumer.assignment.size())
   }
 
   @Test
   def testPauseStateNotPreservedByRebalance() {
     this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test
     this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30")
-    val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
-    consumers += consumer0
+    val consumer = createConsumer()
 
-    sendRecords(5)
-    consumer0.subscribe(List(topic).asJava)
-    consumeAndVerifyRecords(consumer = consumer0, numRecords = 5, startingOffset = 0)
-    consumer0.pause(List(tp).asJava)
+    val producer = createProducer()
+    sendRecords(producer, numRecords = 5, tp)
+    consumer.subscribe(List(topic).asJava)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 5, startingOffset = 0)
+    consumer.pause(List(tp).asJava)
 
     // subscribe to a new topic to trigger a rebalance
-    consumer0.subscribe(List("topic2").asJava)
+    consumer.subscribe(List("topic2").asJava)
 
     // after rebalance, our position should be reset and our pause state lost,
     // so we should be able to consume from the beginning
-    consumeAndVerifyRecords(consumer = consumer0, numRecords = 0, startingOffset = 5)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 0, startingOffset = 5)
   }
 
   @Test
   def testCommitSpecifiedOffsets() {
-    sendRecords(5, tp)
-    sendRecords(7, tp2)
+    val producer = createProducer()
+    sendRecords(producer, numRecords = 5, tp)
+    sendRecords(producer, numRecords = 7, tp2)
 
-    this.consumers.head.assign(List(tp, tp2).asJava)
+    val consumer = createConsumer()
+    consumer.assign(List(tp, tp2).asJava)
 
     // Need to poll to join the group
-    this.consumers.head.poll(50)
-    val pos1 = this.consumers.head.position(tp)
-    val pos2 = this.consumers.head.position(tp2)
-    this.consumers.head.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(3L))).asJava)
-    assertEquals(3, this.consumers.head.committed(tp).offset)
-    assertNull(this.consumers.head.committed(tp2))
+    consumer.poll(50)
+    val pos1 = consumer.position(tp)
+    val pos2 = consumer.position(tp2)
+    consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(3L))).asJava)
+    assertEquals(3, consumer.committed(tp).offset)
+    assertNull(consumer.committed(tp2))
 
     // Positions should not change
-    assertEquals(pos1, this.consumers.head.position(tp))
-    assertEquals(pos2, this.consumers.head.position(tp2))
-    this.consumers.head.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(5L))).asJava)
-    assertEquals(3, this.consumers.head.committed(tp).offset)
-    assertEquals(5, this.consumers.head.committed(tp2).offset)
+    assertEquals(pos1, consumer.position(tp))
+    assertEquals(pos2, consumer.position(tp2))
+    consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(5L))).asJava)
+    assertEquals(3, consumer.committed(tp).offset)
+    assertEquals(5, consumer.committed(tp2).offset)
 
     // Using async should pick up the committed changes after commit completes
     val commitCallback = new CountConsumerCommitCallback()
-    this.consumers.head.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(7L))).asJava, commitCallback)
-    awaitCommitCallback(this.consumers.head, commitCallback)
-    assertEquals(7, this.consumers.head.committed(tp2).offset)
+    consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(7L))).asJava, commitCallback)
+    awaitCommitCallback(consumer, commitCallback)
+    assertEquals(7, consumer.committed(tp2).offset)
   }
 
   @Test
@@ -1335,44 +1373,44 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     createTopic(topic2, 2, serverCount)
 
     this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
-    val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
-    consumers += consumer0
+    val consumer = createConsumer()
 
     val numRecords = 10000
-    sendRecords(numRecords)
+    val producer = createProducer()
+    sendRecords(producer, numRecords, tp)
 
     val rebalanceListener = new ConsumerRebalanceListener {
       override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]) = {
         // keep partitions paused in this test so that we can verify the commits based on specific seeks
-        consumer0.pause(partitions)
+        consumer.pause(partitions)
       }
 
       override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]) = {}
     }
 
-    consumer0.subscribe(List(topic).asJava, rebalanceListener)
+    consumer.subscribe(List(topic).asJava, rebalanceListener)
 
     val assignment = Set(tp, tp2)
     TestUtils.waitUntilTrue(() => {
-      consumer0.poll(50)
-      consumer0.assignment() == assignment.asJava
-    }, s"Expected partitions ${assignment.asJava} but actually got ${consumer0.assignment()}")
+      consumer.poll(50)
+      consumer.assignment() == assignment.asJava
+    }, s"Expected partitions ${assignment.asJava} but actually got ${consumer.assignment()}")
 
-    consumer0.seek(tp, 300)
-    consumer0.seek(tp2, 500)
+    consumer.seek(tp, 300)
+    consumer.seek(tp2, 500)
 
     // change subscription to trigger rebalance
-    consumer0.subscribe(List(topic, topic2).asJava, rebalanceListener)
+    consumer.subscribe(List(topic, topic2).asJava, rebalanceListener)
 
     val newAssignment = Set(tp, tp2, new TopicPartition(topic2, 0), new TopicPartition(topic2, 1))
     TestUtils.waitUntilTrue(() => {
-      consumer0.poll(50)
-      consumer0.assignment() == newAssignment.asJava
-    }, s"Expected partitions ${newAssignment.asJava} but actually got ${consumer0.assignment()}")
+      consumer.poll(50)
+      consumer.assignment() == newAssignment.asJava
+    }, s"Expected partitions ${newAssignment.asJava} but actually got ${consumer.assignment()}")
 
     // after rebalancing, we should have reset to the committed positions
-    assertEquals(300, consumer0.committed(tp).offset)
-    assertEquals(500, consumer0.committed(tp2).offset)
+    assertEquals(300, consumer.committed(tp).offset)
+    assertEquals(500, consumer.committed(tp2).offset)
   }
 
   @Test
@@ -1381,47 +1419,44 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     val topic2 = "topic2"
     createTopic(topic2, 2, serverCount)
     // send some messages.
-    sendRecords(numMessages, tp)
+    val producer = createProducer()
+    sendRecords(producer, numMessages, tp)
     // Test subscribe
     // Create a consumer and consumer some messages.
     consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithSubscribe")
     consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithSubscribe")
-    val consumer = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
-    try {
-      val listener0 = new TestConsumerReassignmentListener
-      consumer.subscribe(List(topic, topic2).asJava, listener0)
-      var records: ConsumerRecords[Array[Byte], Array[Byte]] = ConsumerRecords.empty()
-      TestUtils.waitUntilTrue(() => {
-        records = consumer.poll(100)
-        !records.records(tp).isEmpty
-      }, "Consumer did not consume any message before timeout.")
-      assertEquals("should be assigned once", 1, listener0.callsToAssigned)
-      // Verify the metric exist.
-      val tags1 = new util.HashMap[String, String]()
-      tags1.put("client-id", "testPerPartitionLeadMetricsCleanUpWithSubscribe")
-      tags1.put("topic", tp.topic())
-      tags1.put("partition", String.valueOf(tp.partition()))
-
-      val tags2 = new util.HashMap[String, String]()
-      tags2.put("client-id", "testPerPartitionLeadMetricsCleanUpWithSubscribe")
-      tags2.put("topic", tp2.topic())
-      tags2.put("partition", String.valueOf(tp2.partition()))
-      val fetchLead0 = consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags1))
-      assertNotNull(fetchLead0)
-      assertTrue(s"The lead should be ${records.count}", fetchLead0.metricValue() == records.count)
-
-      // Remove topic from subscription
-      consumer.subscribe(List(topic2).asJava, listener0)
-      TestUtils.waitUntilTrue(() => {
-        consumer.poll(100)
-        listener0.callsToAssigned >= 2
-      }, "Expected rebalance did not occur.")
-      // Verify the metric has gone
-      assertNull(consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags1)))
-      assertNull(consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags2)))
-    } finally {
-      consumer.close()
-    }
+    val consumer = createConsumer()
+    val listener0 = new TestConsumerReassignmentListener
+    consumer.subscribe(List(topic, topic2).asJava, listener0)
+    var records: ConsumerRecords[Array[Byte], Array[Byte]] = ConsumerRecords.empty()
+    TestUtils.waitUntilTrue(() => {
+      records = consumer.poll(100)
+      !records.records(tp).isEmpty
+    }, "Consumer did not consume any message before timeout.")
+    assertEquals("should be assigned once", 1, listener0.callsToAssigned)
+    // Verify the metric exist.
+    val tags1 = new util.HashMap[String, String]()
+    tags1.put("client-id", "testPerPartitionLeadMetricsCleanUpWithSubscribe")
+    tags1.put("topic", tp.topic())
+    tags1.put("partition", String.valueOf(tp.partition()))
+
+    val tags2 = new util.HashMap[String, String]()
+    tags2.put("client-id", "testPerPartitionLeadMetricsCleanUpWithSubscribe")
+    tags2.put("topic", tp2.topic())
+    tags2.put("partition", String.valueOf(tp2.partition()))
+    val fetchLead0 = consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags1))
+    assertNotNull(fetchLead0)
+    assertTrue(s"The lead should be ${records.count}", fetchLead0.metricValue() == records.count)
+
+    // Remove topic from subscription
+    consumer.subscribe(List(topic2).asJava, listener0)
+    TestUtils.waitUntilTrue(() => {
+      consumer.poll(100)
+      listener0.callsToAssigned >= 2
+    }, "Expected rebalance did not occur.")
+    // Verify the metric has gone
+    assertNull(consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags1)))
+    assertNull(consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags2)))
   }
 
   @Test
@@ -1430,48 +1465,45 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     val topic2 = "topic2"
     createTopic(topic2, 2, serverCount)
     // send some messages.
-    sendRecords(numMessages, tp)
+    val producer = createProducer()
+    sendRecords(producer, numMessages, tp)
     // Test subscribe
     // Create a consumer and consumer some messages.
     consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPerPartitionLagMetricsCleanUpWithSubscribe")
     consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testPerPartitionLagMetricsCleanUpWithSubscribe")
-    val consumer = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
-    try {
-      val listener0 = new TestConsumerReassignmentListener
-      consumer.subscribe(List(topic, topic2).asJava, listener0)
-      var records: ConsumerRecords[Array[Byte], Array[Byte]] = ConsumerRecords.empty()
-      TestUtils.waitUntilTrue(() => {
-        records = consumer.poll(100)
-        !records.records(tp).isEmpty
-      }, "Consumer did not consume any message before timeout.")
-      assertEquals("should be assigned once", 1, listener0.callsToAssigned)
-      // Verify the metric exist.
-      val tags1 = new util.HashMap[String, String]()
-      tags1.put("client-id", "testPerPartitionLagMetricsCleanUpWithSubscribe")
-      tags1.put("topic", tp.topic())
-      tags1.put("partition", String.valueOf(tp.partition()))
-
-      val tags2 = new util.HashMap[String, String]()
-      tags2.put("client-id", "testPerPartitionLagMetricsCleanUpWithSubscribe")
-      tags2.put("topic", tp2.topic())
-      tags2.put("partition", String.valueOf(tp2.partition()))
-      val fetchLag0 = consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags1))
-      assertNotNull(fetchLag0)
-      val expectedLag = numMessages - records.count
-      assertEquals(s"The lag should be $expectedLag", expectedLag, fetchLag0.value, epsilon)
-
-      // Remove topic from subscription
-      consumer.subscribe(List(topic2).asJava, listener0)
-      TestUtils.waitUntilTrue(() => {
-        consumer.poll(100)
-        listener0.callsToAssigned >= 2
-      }, "Expected rebalance did not occur.")
-      // Verify the metric has gone
-      assertNull(consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags1)))
-      assertNull(consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags2)))
-    } finally {
-      consumer.close()
-    }
+    val consumer = createConsumer()
+    val listener0 = new TestConsumerReassignmentListener
+    consumer.subscribe(List(topic, topic2).asJava, listener0)
+    var records: ConsumerRecords[Array[Byte], Array[Byte]] = ConsumerRecords.empty()
+    TestUtils.waitUntilTrue(() => {
+      records = consumer.poll(100)
+      !records.records(tp).isEmpty
+    }, "Consumer did not consume any message before timeout.")
+    assertEquals("should be assigned once", 1, listener0.callsToAssigned)
+    // Verify the metric exist.
+    val tags1 = new util.HashMap[String, String]()
+    tags1.put("client-id", "testPerPartitionLagMetricsCleanUpWithSubscribe")
+    tags1.put("topic", tp.topic())
+    tags1.put("partition", String.valueOf(tp.partition()))
+
+    val tags2 = new util.HashMap[String, String]()
+    tags2.put("client-id", "testPerPartitionLagMetricsCleanUpWithSubscribe")
+    tags2.put("topic", tp2.topic())
+    tags2.put("partition", String.valueOf(tp2.partition()))
+    val fetchLag0 = consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags1))
+    assertNotNull(fetchLag0)
+    val expectedLag = numMessages - records.count
+    assertEquals(s"The lag should be $expectedLag", expectedLag, fetchLag0.value, epsilon)
+
+    // Remove topic from subscription
+    consumer.subscribe(List(topic2).asJava, listener0)
+    TestUtils.waitUntilTrue(() => {
+      consumer.poll(100)
+      listener0.callsToAssigned >= 2
+    }, "Expected rebalance did not occur.")
+    // Verify the metric has gone
+    assertNull(consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags1)))
+    assertNull(consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags2)))
   }
 
   @Test
@@ -1479,34 +1511,32 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     val numMessages = 1000
     // Test assign
     // send some messages.
-    sendRecords(numMessages, tp)
-    sendRecords(numMessages, tp2)
+    val producer = createProducer()
+    sendRecords(producer, numMessages, tp)
+    sendRecords(producer, numMessages, tp2)
+
     consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithAssign")
     consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithAssign")
-    val consumer = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
-    try {
-      consumer.assign(List(tp).asJava)
-      var records: ConsumerRecords[Array[Byte], Array[Byte]] = ConsumerRecords.empty()
-      TestUtils.waitUntilTrue(() => {
-        records = consumer.poll(100)
-        !records.records(tp).isEmpty
-      }, "Consumer did not consume any message before timeout.")
-      // Verify the metric exist.
-      val tags = new util.HashMap[String, String]()
-      tags.put("client-id", "testPerPartitionLeadMetricsCleanUpWithAssign")
-      tags.put("topic", tp.topic())
-      tags.put("partition", String.valueOf(tp.partition()))
-      val fetchLead = consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags))
-      assertNotNull(fetchLead)
-
-      assertTrue(s"The lead should be ${records.count}", records.count == fetchLead.metricValue())
-
-      consumer.assign(List(tp2).asJava)
-      TestUtils.waitUntilTrue(() => !consumer.poll(100).isEmpty, "Consumer did not consume any message before timeout.")
-      assertNull(consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags)))
-    } finally {
-      consumer.close()
-    }
+    val consumer = createConsumer()
+    consumer.assign(List(tp).asJava)
+    var records: ConsumerRecords[Array[Byte], Array[Byte]] = ConsumerRecords.empty()
+    TestUtils.waitUntilTrue(() => {
+      records = consumer.poll(100)
+      !records.records(tp).isEmpty
+    }, "Consumer did not consume any message before timeout.")
+    // Verify the metric exist.
+    val tags = new util.HashMap[String, String]()
+    tags.put("client-id", "testPerPartitionLeadMetricsCleanUpWithAssign")
+    tags.put("topic", tp.topic())
+    tags.put("partition", String.valueOf(tp.partition()))
+    val fetchLead = consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags))
+    assertNotNull(fetchLead)
+
+    assertTrue(s"The lead should be ${records.count}", records.count == fetchLead.metricValue())
+
+    consumer.assign(List(tp2).asJava)
+    TestUtils.waitUntilTrue(() => !consumer.poll(100).isEmpty, "Consumer did not consume any message before timeout.")
+    assertNull(consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags)))
   }
 
   @Test
@@ -1514,103 +1544,99 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     val numMessages = 1000
     // Test assign
     // send some messages.
-    sendRecords(numMessages, tp)
-    sendRecords(numMessages, tp2)
+    val producer = createProducer()
+    sendRecords(producer, numMessages, tp)
+    sendRecords(producer, numMessages, tp2)
+
     consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPerPartitionLagMetricsCleanUpWithAssign")
     consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testPerPartitionLagMetricsCleanUpWithAssign")
-    val consumer = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
-    try {
-      consumer.assign(List(tp).asJava)
-      var records: ConsumerRecords[Array[Byte], Array[Byte]] = ConsumerRecords.empty()
-      TestUtils.waitUntilTrue(() => {
-        records = consumer.poll(100)
-        !records.records(tp).isEmpty
-      }, "Consumer did not consume any message before timeout.")
-      // Verify the metric exist.
-      val tags = new util.HashMap[String, String]()
-      tags.put("client-id", "testPerPartitionLagMetricsCleanUpWithAssign")
-      tags.put("topic", tp.topic())
-      tags.put("partition", String.valueOf(tp.partition()))
-      val fetchLag = consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags))
-      assertNotNull(fetchLag)
-
-      val expectedLag = numMessages - records.count
-      assertEquals(s"The lag should be $expectedLag", expectedLag, fetchLag.value, epsilon)
-
-      consumer.assign(List(tp2).asJava)
-      TestUtils.waitUntilTrue(() => !consumer.poll(100).isEmpty, "Consumer did not consume any message before timeout.")
-      assertNull(consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags)))
-      assertNull(consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags)))
-    } finally {
-      consumer.close()
-    }
+    val consumer = createConsumer()
+    consumer.assign(List(tp).asJava)
+    var records: ConsumerRecords[Array[Byte], Array[Byte]] = ConsumerRecords.empty()
+    TestUtils.waitUntilTrue(() => {
+      records = consumer.poll(100)
+      !records.records(tp).isEmpty
+    }, "Consumer did not consume any message before timeout.")
+    // Verify the metric exist.
+    val tags = new util.HashMap[String, String]()
+    tags.put("client-id", "testPerPartitionLagMetricsCleanUpWithAssign")
+    tags.put("topic", tp.topic())
+    tags.put("partition", String.valueOf(tp.partition()))
+    val fetchLag = consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags))
+    assertNotNull(fetchLag)
+
+    val expectedLag = numMessages - records.count
+    assertEquals(s"The lag should be $expectedLag", expectedLag, fetchLag.value, epsilon)
+
+    consumer.assign(List(tp2).asJava)
+    TestUtils.waitUntilTrue(() => !consumer.poll(100).isEmpty, "Consumer did not consume any message before timeout.")
+    assertNull(consumer.metrics.get(new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags)))
+    assertNull(consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags)))
   }
 
   @Test
   def testPerPartitionLeadWithMaxPollRecords() {
     val numMessages = 1000
     val maxPollRecords = 10
-    sendRecords(numMessages, tp)
+    val producer = createProducer()
+    sendRecords(producer, numMessages, tp)
+
     consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPerPartitionLeadWithMaxPollRecords")
     consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testPerPartitionLeadWithMaxPollRecords")
     consumerConfig.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords.toString)
-    val consumer = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+    val consumer = createConsumer()
     consumer.assign(List(tp).asJava)
-    try {
-      var records: ConsumerRecords[Array[Byte], Array[Byte]] = ConsumerRecords.empty()
-      TestUtils.waitUntilTrue(() => {
-        records = consumer.poll(100)
-        !records.isEmpty
-      }, "Consumer did not consume any message before timeout.")
-
-      val tags = new util.HashMap[String, String]()
-      tags.put("client-id", "testPerPartitionLeadWithMaxPollRecords")
-      tags.put("topic", tp.topic())
-      tags.put("partition", String.valueOf(tp.partition()))
-      val lead = consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags))
-      assertTrue(s"The lead should be $maxPollRecords", lead.metricValue() == maxPollRecords)
-    } finally {
-      consumer.close()
-    }
+    var records: ConsumerRecords[Array[Byte], Array[Byte]] = ConsumerRecords.empty()
+    TestUtils.waitUntilTrue(() => {
+      records = consumer.poll(100)
+      !records.isEmpty
+    }, "Consumer did not consume any message before timeout.")
+
+    val tags = new util.HashMap[String, String]()
+    tags.put("client-id", "testPerPartitionLeadWithMaxPollRecords")
+    tags.put("topic", tp.topic())
+    tags.put("partition", String.valueOf(tp.partition()))
+    val lead = consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags))
+    assertTrue(s"The lead should be $maxPollRecords", lead.metricValue() == maxPollRecords)
   }
 
   @Test
   def testPerPartitionLagWithMaxPollRecords() {
     val numMessages = 1000
     val maxPollRecords = 10
-    sendRecords(numMessages, tp)
+    val producer = createProducer()
+    sendRecords(producer, numMessages, tp)
+
     consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPerPartitionLagWithMaxPollRecords")
     consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testPerPartitionLagWithMaxPollRecords")
     consumerConfig.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords.toString)
-    val consumer = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+    val consumer = createConsumer()
     consumer.assign(List(tp).asJava)
-    try {
-      var records: ConsumerRecords[Array[Byte], Array[Byte]] = ConsumerRecords.empty()
-      TestUtils.waitUntilTrue(() => {
-        records = consumer.poll(100)
-        !records.isEmpty
-      }, "Consumer did not consume any message before timeout.")
-
-      val tags = new util.HashMap[String, String]()
-      tags.put("client-id", "testPerPartitionLagWithMaxPollRecords")
-      tags.put("topic", tp.topic())
-      tags.put("partition", String.valueOf(tp.partition()))
-      val lag = consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags))
-
-      assertEquals(s"The lag should be ${numMessages - records.count}", numMessages - records.count, lag.value, epsilon)
-    } finally {
-      consumer.close()
-    }
+    var records: ConsumerRecords[Array[Byte], Array[Byte]] = ConsumerRecords.empty()
+    TestUtils.waitUntilTrue(() => {
+      records = consumer.poll(100)
+      !records.isEmpty
+    }, "Consumer did not consume any message before timeout.")
+
+    val tags = new util.HashMap[String, String]()
+    tags.put("client-id", "testPerPartitionLagWithMaxPollRecords")
+    tags.put("topic", tp.topic())
+    tags.put("partition", String.valueOf(tp.partition()))
+    val lag = consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags))
+
+    assertEquals(s"The lag should be ${numMessages - records.count}", numMessages - records.count, lag.value, epsilon)
   }
 
   @Test
   def testQuotaMetricsNotCreatedIfNoQuotasConfigured() {
     val numRecords = 1000
-    sendRecords(numRecords)
+    val producer = createProducer()
+    sendRecords(producer, numRecords, tp)
 
-    this.consumers.head.assign(List(tp).asJava)
-    this.consumers.head.seek(tp, 0)
-    consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = numRecords, startingOffset = 0)
+    val consumer = createConsumer()
+    consumer.assign(List(tp).asJava)
+    consumer.seek(tp, 0)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, startingOffset = 0)
 
     def assertNoMetric(broker: KafkaServer, name: String, quotaType: QuotaType, clientId: String) {
         val metricName = broker.metrics.metricName("throttle-time",
@@ -1640,21 +1666,19 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   def runMultiConsumerSessionTimeoutTest(closeConsumer: Boolean): Unit = {
     // use consumers defined in this class plus one additional consumer
     // Use topic defined in this class + one additional topic
-    sendRecords(100, tp)
-    sendRecords(100, tp2)
+    val producer = createProducer()
+    sendRecords(producer, numRecords = 100, tp)
+    sendRecords(producer, numRecords = 100, tp2)
     val topic1 = "topic1"
-    val subscriptions = Set(tp, tp2) ++ createTopicAndSendRecords(topic1, 6, 100)
+    val subscriptions = Set(tp, tp2) ++ createTopicAndSendRecords(producer, topic1, 6, 100)
 
     // first subscribe consumers that are defined in this class
     val consumerPollers = Buffer[ConsumerAssignmentPoller]()
-    for (consumer <- consumers)
-      consumerPollers += subscribeConsumerAndStartPolling(consumer, List(topic, topic1))
+    consumerPollers += subscribeConsumerAndStartPolling(createConsumer(), List(topic, topic1))
+    consumerPollers += subscribeConsumerAndStartPolling(createConsumer(), List(topic, topic1))
 
     // create one more consumer and add it to the group; we will timeout this consumer
-    val timeoutConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig)
-    // Close the consumer on test teardown, unless this test will manually
-    if(!closeConsumer)
-      consumers += timeoutConsumer
+    val timeoutConsumer = createConsumer()
     val timeoutPoller = subscribeConsumerAndStartPolling(timeoutConsumer, List(topic, topic1))
     consumerPollers += timeoutPoller
 
@@ -1679,12 +1703,15 @@ class PlaintextConsumerTest extends BaseConsumerTest {
    * Creates topic 'topicName' with 'numPartitions' partitions and produces 'recordsPerPartition'
    * records to each partition
    */
-  def createTopicAndSendRecords(topicName: String, numPartitions: Int, recordsPerPartition: Int): Set[TopicPartition] = {
+  def createTopicAndSendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]],
+                                topicName: String,
+                                numPartitions: Int,
+                                recordsPerPartition: Int): Set[TopicPartition] = {
     createTopic(topicName, numPartitions, serverCount)
     var parts = Set[TopicPartition]()
     for (partition <- 0 until numPartitions) {
       val tp = new TopicPartition(topicName, partition)
-      sendRecords(recordsPerPartition, tp)
+      sendRecords(producer, recordsPerPartition, tp)
       parts = parts + tp
     }
     parts
@@ -1742,12 +1769,10 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertTrue(consumerCount <= subscriptions.size)
     val consumerGroup = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
     for (_ <- 0 until consumerCount)
-      consumerGroup += new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig)
-    consumers ++= consumerGroup
+      consumerGroup += createConsumer()
 
     // create consumer pollers, wait for assignment and validate it
     val consumerPollers = subscribeConsumers(consumerGroup, topicsToSubscribe)
-
     (consumerGroup, consumerPollers)
   }
 
@@ -1771,14 +1796,14 @@ class PlaintextConsumerTest extends BaseConsumerTest {
                                                    subscriptions: Set[TopicPartition]): Unit = {
     assertTrue(consumerGroup.size + numOfConsumersToAdd <= subscriptions.size)
     for (_ <- 0 until numOfConsumersToAdd) {
-      val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig)
+      val consumer = createConsumer()
       consumerGroup += consumer
       consumerPollers += subscribeConsumerAndStartPolling(consumer, topicsToSubscribe)
     }
 
     // wait until topics get re-assigned and validate assignment
     validateGroupAssignment(consumerPollers, subscriptions,
-      s"Did not get valid assignment for partitions ${subscriptions.asJava} after we added ${numOfConsumersToAdd} consumer(s)")
+      s"Did not get valid assignment for partitions ${subscriptions.asJava} after we added $numOfConsumersToAdd consumer(s)")
   }
 
   /**
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala
index 0b2fbca..fc1853b 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala
@@ -78,7 +78,8 @@ class PlaintextEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
   @Test
   def testListenerName() {
     // To check the client listener name, establish a session on the server by sending any request eg sendRecords
-    intercept[TopicAuthorizationException](sendRecords(1, tp))
+    val producer = createProducer()
+    intercept[TopicAuthorizationException](sendRecords(producer, numRecords = 1, tp))
 
     assertEquals(Some("CLIENT"), PlaintextEndToEndAuthorizationTest.clientListenerName)
     assertEquals(Some("SERVER"), PlaintextEndToEndAuthorizationTest.serverListenerName)
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
index 8ae3952..d2ff2a8 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
@@ -43,17 +43,17 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
 
   @Test
   def testBatchSizeZero() {
-    val producerProps = new Properties()
-    producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "0")
-    val producer = createProducer(brokerList = brokerList, lingerMs = Int.MaxValue, props = Some(producerProps))
+    val producer = createProducer(brokerList = brokerList,
+      lingerMs = Int.MaxValue,
+      batchSize = 0)
     sendAndVerify(producer)
   }
 
   @Test
   def testSendCompressedMessageWithLogAppendTime() {
-    val producerProps = new Properties()
-    producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip")
-    val producer = createProducer(brokerList = brokerList, lingerMs = Int.MaxValue, props = Some(producerProps))
+    val producer = createProducer(brokerList = brokerList,
+      compressionType = "gzip",
+      lingerMs = Int.MaxValue)
     sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
   }
 
@@ -101,9 +101,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
     }
 
     // Test compressed messages.
-    val producerProps = new Properties()
-    producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip")
-    val compressedProducer = createProducer(brokerList = brokerList, props = Some(producerProps))
+    val compressedProducer = createProducer(brokerList = brokerList, compressionType = "gzip")
     try {
       compressedProducer.send(new ProducerRecord(topic, 0, System.currentTimeMillis() - 1001, "key".getBytes, "value".getBytes)).get()
       fail("Should throw CorruptedRecordException")
diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index e4a1828..bb49884 100755
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -30,7 +30,6 @@ import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.serialization.ByteArraySerializer
 
 @RunWith(value = classOf[Parameterized])
@@ -70,7 +69,7 @@ class ProducerCompressionTest(compression: String) extends ZooKeeperTestHarness
     producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000")
     producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "200")
     val producer = new KafkaProducer(producerProps, new ByteArraySerializer, new ByteArraySerializer)
-    val consumer = TestUtils.createConsumer(bootstrapServers, securityProtocol = SecurityProtocol.PLAINTEXT)
+    val consumer = TestUtils.createConsumer(bootstrapServers)
 
     try {
       // create topic
diff --git a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
index ebc587e..c353b52 100644
--- a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -23,7 +23,6 @@ import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.errors.SaslAuthenticationException
-import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.junit.{After, Before, Test}
 import org.junit.Assert._
 import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, ConsumerGroupService}
@@ -74,11 +73,12 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
 
   @Test
   def testProducerWithAuthenticationFailure() {
-    verifyAuthenticationException(sendOneRecord(10000))
-    verifyAuthenticationException(producers.head.partitionsFor(topic))
+    val producer = createProducer()
+    verifyAuthenticationException(sendOneRecord(producer, maxWaitMs = 10000))
+    verifyAuthenticationException(producer.partitionsFor(topic))
 
     createClientCredential()
-    verifyWithRetry(sendOneRecord())
+    verifyWithRetry(sendOneRecord(producer))
   }
 
   @Test
@@ -97,14 +97,14 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
 
   @Test
   def testConsumerWithAuthenticationFailure() {
-    val consumer = this.consumers.head
+    val consumer = createConsumer()
     consumer.subscribe(List(topic).asJava)
     verifyConsumerWithAuthenticationFailure(consumer)
   }
 
   @Test
   def testManualAssignmentConsumerWithAuthenticationFailure() {
-    val consumer = this.consumers.head
+    val consumer = createConsumer()
     consumer.assign(List(tp).asJava)
     verifyConsumerWithAuthenticationFailure(consumer)
   }
@@ -112,11 +112,9 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
   @Test
   def testManualAssignmentConsumerWithAutoCommitDisabledWithAuthenticationFailure() {
     this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false.toString)
-    val consumer = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
-    consumers += consumer
+    val consumer = createConsumer()
     consumer.assign(List(tp).asJava)
     consumer.seek(tp, 0)
-
     verifyConsumerWithAuthenticationFailure(consumer)
   }
 
@@ -125,7 +123,8 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
     verifyAuthenticationException(consumer.partitionsFor(topic))
 
     createClientCredential()
-    verifyWithRetry(sendOneRecord())
+    val producer = createProducer()
+    verifyWithRetry(sendOneRecord(producer))
     verifyWithRetry(assertEquals(1, consumer.poll(Duration.ofMillis(1000)).count))
   }
 
@@ -161,7 +160,7 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
   def testConsumerGroupServiceWithAuthenticationFailure() {
     val consumerGroupService: ConsumerGroupService = prepareConsumerGroupService
 
-    val consumer = consumers.head
+    val consumer = createConsumer()
     consumer.subscribe(List(topic).asJava)
 
     verifyAuthenticationException(consumerGroupService.listGroups)
@@ -173,7 +172,7 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
     createClientCredential()
     val consumerGroupService: ConsumerGroupService = prepareConsumerGroupService
 
-    val consumer = consumers.head
+    val consumer = createConsumer()
     consumer.subscribe(List(topic).asJava)
 
     verifyWithRetry(consumer.poll(Duration.ofMillis(1000)))
@@ -201,8 +200,7 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
     createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser2, JaasTestUtils.KafkaScramPassword2)
   }
 
-  private def sendOneRecord(maxWaitMs: Long = 15000): Unit = {
-    val producer = this.producers.head
+  private def sendOneRecord(producer: KafkaProducer[Array[Byte], Array[Byte]], maxWaitMs: Long = 15000): Unit = {
     val record = new ProducerRecord(tp.topic(), tp.partition(), 0L, "key".getBytes, "value".getBytes)
     val future = producer.send(record)
     producer.flush()
@@ -243,12 +241,6 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
   private def createTransactionalProducer(): KafkaProducer[Array[Byte], Array[Byte]] = {
     producerConfig.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txclient-1")
     producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
-    val txProducer = TestUtils.createProducer(brokerList,
-                                  securityProtocol = this.securityProtocol,
-                                  saslProperties = this.clientSaslProperties,
-                                  acks = -1,
-                                  props = Some(producerConfig))
-    producers += txProducer
-    txProducer
+    createProducer()
   }
 }
diff --git a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
index c252f81..572d9d3 100644
--- a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
@@ -16,10 +16,6 @@
   */
 package kafka.api
 
-import java.util.Properties
-
-import kafka.utils.TestUtils
-import kafka.utils.Implicits._
 import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.errors.{GroupAuthorizationException, TopicAuthorizationException}
@@ -57,21 +53,13 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
   @Test(timeout = 15000)
   def testTwoConsumersWithDifferentSaslCredentials(): Unit = {
     setAclsAndProduce(tp)
-    val consumer1 = consumers.head
+    val consumer1 = createConsumer()
 
-    val consumer2Config = new Properties
-    consumer2Config ++= consumerConfig
     // consumer2 retrieves its credentials from the static JAAS configuration, so we test also this path
-    consumer2Config.remove(SaslConfigs.SASL_JAAS_CONFIG)
-    consumer2Config.remove(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS)
-
-    val consumer2 = TestUtils.createConsumer(brokerList,
-                                                securityProtocol = securityProtocol,
-                                                trustStoreFile = trustStoreFile,
-                                                saslProperties = clientSaslProperties,
-                                                props = Some(consumer2Config))
-    consumers += consumer2
+    consumerConfig.remove(SaslConfigs.SASL_JAAS_CONFIG)
+    consumerConfig.remove(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS)
 
+    val consumer2 = createConsumer()
     consumer1.assign(List(tp).asJava)
     consumer2.assign(List(tp).asJava)
 
diff --git a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
index a7e33e0..2e51bff 100644
--- a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
@@ -16,7 +16,7 @@ import java.io.File
 
 import kafka.server.KafkaConfig
 import org.junit.{After, Before, Test}
-import kafka.utils.{JaasTestUtils, TestUtils}
+import kafka.utils.JaasTestUtils
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
 import scala.collection.JavaConverters._
@@ -46,20 +46,12 @@ class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslSetup {
   @Test
   def testMultipleBrokerMechanisms() {
 
-    val plainSaslProducer = producers.head
-    val plainSaslConsumer = consumers.head
+    val plainSaslProducer = createProducer()
+    val plainSaslConsumer = createConsumer()
 
     val gssapiSaslProperties = kafkaClientSaslProperties("GSSAPI", dynamicJaasConfig = true)
-    val gssapiSaslProducer = TestUtils.createProducer(brokerList,
-                                                         securityProtocol = this.securityProtocol,
-                                                         trustStoreFile = this.trustStoreFile,
-                                                         saslProperties = Some(gssapiSaslProperties))
-    producers += gssapiSaslProducer
-    val gssapiSaslConsumer = TestUtils.createConsumer(brokerList,
-                                                         securityProtocol = this.securityProtocol,
-                                                         trustStoreFile = this.trustStoreFile,
-                                                         saslProperties = Some(gssapiSaslProperties))
-    consumers += gssapiSaslConsumer
+    val gssapiSaslProducer = createProducer(configOverrides = gssapiSaslProperties)
+    val gssapiSaslConsumer = createConsumer(configOverrides = gssapiSaslProperties)
     val numRecords = 1000
     var startingOffset = 0
 
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
index a65cc2e..eabafcc 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
@@ -22,17 +22,14 @@ import java.util.Properties
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.{ShutdownableThread, TestUtils}
-import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.junit.Assert._
 import org.junit.Test
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
-
 class TransactionsBounceTest extends KafkaServerTestHarness {
   private val producerBufferSize =  65536
   private val serverMessageMaxBytes =  producerBufferSize/2
@@ -150,18 +147,13 @@ class TransactionsBounceTest extends KafkaServerTestHarness {
     verifyingConsumer.close()
   }
 
-  private def createConsumerAndSubscribeToTopics(groupId: String, topics: List[String], readCommitted: Boolean = false) = {
-    val props = new Properties()
-    if (readCommitted)
-      props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
-    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "2000")
-    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
-    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000")
-    props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000")
-    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
-
-    val consumer = TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = groupId,
-      securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(props))
+  private def createConsumerAndSubscribeToTopics(groupId: String,
+                                                 topics: List[String],
+                                                 readCommitted: Boolean = false) = {
+    val consumer = TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers),
+      groupId = groupId,
+      readCommitted = readCommitted,
+      enableAutoCommit = false)
     consumer.subscribe(topics.asJava)
     consumer
   }
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 461d3b6..58059b4 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -29,7 +29,6 @@ import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetA
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.errors.ProducerFencedException
-import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.junit.{After, Before, Test}
 import org.junit.Assert._
 
@@ -570,28 +569,28 @@ class TransactionsTest extends KafkaServerTestHarness {
     serverProps
   }
 
-  private def createReadCommittedConsumer(group: String = "group", maxPollRecords: Int = 500,
+  private def createReadCommittedConsumer(group: String = "group",
+                                          maxPollRecords: Int = 500,
                                           props: Properties = new Properties) = {
-    props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
-    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
-    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords.toString)
     val consumer = TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers),
-      groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(props))
+      groupId = group,
+      enableAutoCommit = false,
+      readCommitted = true,
+      maxPollRecords = maxPollRecords)
     transactionalConsumers += consumer
     consumer
   }
 
   private def createReadUncommittedConsumer(group: String) = {
-    val props = new Properties()
-    props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted")
-    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
     val consumer = TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers),
-      groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(props))
+      groupId = group,
+      enableAutoCommit = false)
     nonTransactionalConsumers += consumer
     consumer
   }
 
-  private def createTransactionalProducer(transactionalId: String, transactionTimeoutMs: Long = 60000): KafkaProducer[Array[Byte], Array[Byte]] = {
+  private def createTransactionalProducer(transactionalId: String,
+                                          transactionTimeoutMs: Long = 60000): KafkaProducer[Array[Byte], Array[Byte]] = {
     val producer = TestUtils.createTransactionalProducer(transactionalId, servers,
       transactionTimeoutMs = transactionTimeoutMs)
     transactionalProducers += producer
diff --git a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
index 47c8f5f..828c98b 100644
--- a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
@@ -42,7 +42,10 @@ class UserClientIdQuotaTest extends BaseQuotaTest {
   }
 
   override def createQuotaTestClients(topic: String, leaderNode: KafkaServer): QuotaTestClients = {
-    new QuotaTestClients(topic, leaderNode, producerClientId, consumerClientId, producers.head, consumers.head) {
+    val producer = createProducer()
+    val consumer = createConsumer()
+
+    new QuotaTestClients(topic, leaderNode, producerClientId, consumerClientId, producer, consumer) {
       override def userPrincipal: KafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "O=A client,CN=localhost")
       override def quotaMetricTags(clientId: String): Map[String, String] = {
         Map("user" -> Sanitizer.sanitize(userPrincipal.getName), "client-id" -> clientId)
diff --git a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
index 3386c91..d60b875 100644
--- a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
@@ -50,7 +50,10 @@ class UserQuotaTest extends BaseQuotaTest with SaslSetup {
   }
 
   override def createQuotaTestClients(topic: String, leaderNode: KafkaServer): QuotaTestClients = {
-    new QuotaTestClients(topic, leaderNode, producerClientId, consumerClientId, producers.head, consumers.head) {
+    val producer = createProducer()
+    val consumer = createConsumer()
+
+    new QuotaTestClients(topic, leaderNode, producerClientId, consumerClientId, producer, consumer) {
       override val userPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaClientPrincipalUnqualifiedName2)
       override def quotaMetricTags(clientId: String): Map[String, String] = {
         Map("user" -> userPrincipal.getName, "client-id" -> "")
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 95a4665..f772e58 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -42,7 +42,7 @@ import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.admin.ConfigEntry.{ConfigSource, ConfigSynonym}
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, ConsumerRecords, KafkaConsumer}
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, Reconfigurable, TopicPartition, TopicPartitionInfo}
 import org.apache.kafka.common.config.{ConfigException, ConfigResource}
 import org.apache.kafka.common.config.SslConfigs._
@@ -733,7 +733,8 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     // Verify that producer connections fail since advertised listener is invalid
     val bootstrap = TestUtils.bootstrapServers(servers, new ListenerName(SecureExternal))
       .replaceAll(invalidHost, "localhost") // allow bootstrap connection to succeed
-    val producer1 = ProducerBuilder().trustStoreProps(sslProperties1)
+    val producer1 = ProducerBuilder()
+      .trustStoreProps(sslProperties1)
       .maxRetries(0)
       .requestTimeoutMs(1000)
       .deliveryTimeoutMs(1000)
@@ -741,7 +742,8 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
       .build()
 
     assertTrue(intercept[ExecutionException] {
-      producer1.send(new ProducerRecord(topic, "key", "value")).get(2, TimeUnit.SECONDS)
+      val future = producer1.send(new ProducerRecord(topic, "key", "value"))
+      future.get(2, TimeUnit.SECONDS)
     }.getCause.isInstanceOf[org.apache.kafka.common.errors.TimeoutException])
 
     alterAdvertisedListener(adminClient, externalAdminClient, invalidHost, "localhost")
@@ -1378,16 +1380,14 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     def deliveryTimeoutMs(timeoutMs: Int): ProducerBuilder = { _deliveryTimeoutMs= timeoutMs; this }
 
     override def build(): KafkaProducer[String, String] = {
-      val producer = TestUtils.createProducer(bootstrapServers,
-        acks = _acks,
-        requestTimeoutMs = _requestTimeoutMs,
-        deliveryTimeoutMs = _deliveryTimeoutMs,
-        retries = _retries,
-        securityProtocol = _securityProtocol,
-        trustStoreFile = Some(trustStoreFile1),
-        keySerializer = new StringSerializer,
-        valueSerializer = new StringSerializer,
-        props = Some(propsOverride))
+      val producerProps = propsOverride
+      producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
+      producerProps.put(ProducerConfig.ACKS_CONFIG, _acks.toString)
+      producerProps.put(ProducerConfig.RETRIES_CONFIG, _retries.toString)
+      producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, _deliveryTimeoutMs.toString)
+      producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, _requestTimeoutMs.toString)
+
+      val producer = new KafkaProducer[String, String](producerProps, new StringSerializer, new StringSerializer)
       producers += producer
       producer
     }
@@ -1404,15 +1404,12 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
 
     override def build(): KafkaConsumer[String, String] = {
       val consumerProps = propsOverride
+      consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
+      consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, _autoOffsetReset)
+      consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, group)
       consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, _enableAutoCommit.toString)
-      val consumer = TestUtils.createConsumer(bootstrapServers,
-        group,
-        autoOffsetReset = _autoOffsetReset,
-        securityProtocol = _securityProtocol,
-        trustStoreFile = Some(trustStoreFile1),
-        keyDeserializer = new StringDeserializer,
-        valueDeserializer = new StringDeserializer,
-        props = Some(consumerProps))
+
+      val consumer = new KafkaConsumer[String, String](consumerProps, new StringDeserializer, new StringDeserializer)
       consumer.subscribe(Collections.singleton(_topic))
       if (_autoOffsetReset == "latest")
         awaitInitialPositions(consumer)
diff --git a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
index 037f614..04166c6 100644
--- a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
@@ -33,9 +33,6 @@ import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
 class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
-
-  override val producerCount = 0
-  override val consumerCount = 0
   override val serverCount = 1
   override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
 
diff --git a/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala b/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala
index 8502874..0f40650 100644
--- a/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala
+++ b/core/src/test/scala/integration/kafka/server/ScramServerStartupTest.scala
@@ -17,6 +17,7 @@
   */
 
 package kafka.server
+
 import java.util.Collections
 
 import kafka.api.{IntegrationTestHarness, KafkaSasl, SaslSetup}
@@ -35,8 +36,6 @@ import scala.collection.JavaConverters._
  */
 class ScramServerStartupTest extends IntegrationTestHarness with SaslSetup {
 
-  override val producerCount = 0
-  override val consumerCount = 0
   override val serverCount = 1
 
   private val kafkaClientSaslMechanism = "SCRAM-SHA-256"
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 21c1365..3e462db 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -1516,8 +1516,6 @@ class GroupMetadataManagerTest {
   @Test
   def testOffsetExpirationOfSimpleConsumer() {
     val memberId = "memberId"
-    val clientId = "clientId"
-    val clientHost = "localhost"
     val topic = "foo"
     val topicPartition1 = new TopicPartition(topic, 0)
     val offset = 37
@@ -1531,7 +1529,6 @@ class GroupMetadataManagerTest {
     val startMs = time.milliseconds
     // old clients, expiry timestamp is explicitly set
     val tp1OffsetAndMetadata = OffsetAndMetadata(offset, "", startMs)
-    val tp2OffsetAndMetadata = OffsetAndMetadata(offset, "", startMs)
     // new clients, no per-partition expiry timestamp, offsets of group expire together
     val offsets = immutable.Map(
       topicPartition1 -> tp1OffsetAndMetadata)
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index ebec1d3..0250cfb 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -30,7 +30,6 @@ import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.utils.{CoreUtils, TestUtils}
 import kafka.utils.TestUtils._
 import kafka.zk.ZooKeeperTestHarness
-import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.TimeoutException
 import org.apache.kafka.common.network.ListenerName
@@ -269,11 +268,11 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
 
   private def consumeAllMessages(topic: String, numMessages: Int): Seq[String] = {
     val brokerList = TestUtils.bootstrapServers(servers, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
-    val props = new Properties
     // Don't rely on coordinator as it may be down when this method is called
-    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
-    val consumer = TestUtils.createConsumer(brokerList, "group" + random.nextLong,
-      securityProtocol = SecurityProtocol.PLAINTEXT, valueDeserializer = new StringDeserializer, props = Some(props))
+    val consumer = TestUtils.createConsumer(brokerList,
+      groupId = "group" + random.nextLong,
+      enableAutoCommit = false,
+      valueDeserializer = new StringDeserializer)
     try {
       val tp = new TopicPartition(topic, partitionId)
       consumer.assign(Seq(tp).asJava)
diff --git a/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala
index 1739d27..e6fa1cb 100644
--- a/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala
@@ -32,9 +32,8 @@ import scala.collection.mutable
 import scala.util.Random
 
 class AlterReplicaLogDirsRequestTest extends BaseRequestTest {
-
-  override def numBrokers: Int = 1
-  override def logDirCount: Int = 5
+  override val logDirCount = 5
+  override val numBrokers = 1
 
   val topic = "topic"
 
diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
index 99355bc..4d178d7 100644
--- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
@@ -22,7 +22,7 @@ import java.net.Socket
 import java.nio.ByteBuffer
 import java.util.Properties
 
-import kafka.integration.KafkaServerTestHarness
+import kafka.api.IntegrationTestHarness
 import kafka.network.SocketServer
 import kafka.utils._
 import org.apache.kafka.common.network.ListenerName
@@ -31,18 +31,17 @@ import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractRequestResponse, RequestHeader, ResponseHeader}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
-abstract class BaseRequestTest extends KafkaServerTestHarness {
+abstract class BaseRequestTest extends IntegrationTestHarness {
+  override val serverCount: Int = numBrokers
   private var correlationId = 0
 
   // If required, set number of brokers
   protected def numBrokers: Int = 3
 
-  protected def logDirCount: Int = 1
-
   // If required, override properties by mutating the passed Properties object
   protected def propertyOverrides(properties: Properties) {}
 
-  def generateConfigs = {
+  override def generateConfigs = {
     val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect,
       enableControlledShutdown = false, enableDeleteTopic = true,
       interBrokerSecurityProtocol = Some(securityProtocol),
diff --git a/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
index 35e3262..5a0244b 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
@@ -26,9 +26,8 @@ import org.junit.Test
 import java.io.File
 
 class DescribeLogDirsRequestTest extends BaseRequestTest {
-
-  override def numBrokers: Int = 1
-  override def logDirCount: Int = 2
+  override val logDirCount = 2
+  override val numBrokers: Int = 1
 
   val topic = "topic"
   val partitionNum = 2
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 388b0f8..694b19d 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -23,7 +23,7 @@ import java.util.Properties
 import kafka.api.KAFKA_0_11_0_IV2
 import kafka.log.LogConfig
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.{MemoryRecords, Record, RecordBatch}
@@ -201,11 +201,11 @@ class FetchRequestTest extends BaseRequestTest {
 
     val msgValueLen = 100 * 1000
     val batchSize = 4 * msgValueLen
-    val propsOverride = new Properties
-    propsOverride.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize.toString)
     val producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
-      lingerMs = Int.MaxValue, keySerializer = new StringSerializer,
-      valueSerializer = new ByteArraySerializer, props = Some(propsOverride))
+      lingerMs = Int.MaxValue,
+      batchSize = batchSize,
+      keySerializer = new StringSerializer,
+      valueSerializer = new ByteArraySerializer)
     val bytes = new Array[Byte](msgValueLen)
     val futures = try {
       (0 to 1000).map { _ =>
@@ -262,7 +262,9 @@ class FetchRequestTest extends BaseRequestTest {
   def testDownConversionFromBatchedToUnbatchedRespectsOffset(): Unit = {
     // Increase linger so that we have control over the batches created
     producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
-      retries = 5, keySerializer = new StringSerializer, valueSerializer = new StringSerializer,
+      retries = 5,
+      keySerializer = new StringSerializer,
+      valueSerializer = new StringSerializer,
       lingerMs = 30 * 1000,
       deliveryTimeoutMs = 60 * 1000)
 
diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
index 9b32cd2..2dcbb8a 100644
--- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
@@ -25,7 +25,7 @@ import kafka.api.IntegrationTestHarness
 import kafka.controller.{OfflineReplica, PartitionAndReplica}
 import kafka.utils.{CoreUtils, Exit, TestUtils}
 import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
+import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.errors.{KafkaStorageException, NotLeaderForPartitionException}
@@ -44,8 +44,8 @@ class LogDirFailureTest extends IntegrationTestHarness {
   val serverCount: Int = 2
   private val topic = "topic"
   private val partitionNum = 12
+  override val logDirCount = 3
 
-  this.logDirCount = 3
   this.serverConfig.setProperty(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp, "60000")
   this.serverConfig.setProperty(KafkaConfig.NumReplicaFetchersProp, "1")
 
@@ -55,16 +55,6 @@ class LogDirFailureTest extends IntegrationTestHarness {
     createTopic(topic, partitionNum, serverCount)
   }
 
-  override def createProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
-    TestUtils.createProducer(brokerList,
-      retries = 0,
-      securityProtocol = this.securityProtocol,
-      trustStoreFile = this.trustStoreFile,
-      saslProperties = this.clientSaslProperties,
-      props = Some(producerConfig))
-  }
-
-
   @Test
   def testIOExceptionDuringLogRoll() {
     testProduceAfterLogDirFailureOnLeader(Roll)
@@ -107,7 +97,8 @@ class LogDirFailureTest extends IntegrationTestHarness {
 
   @Test
   def testReplicaFetcherThreadAfterLogDirFailureOnFollower() {
-    val producer = producers.head
+    this.producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "0")
+    val producer = createProducer()
     val partition = new TopicPartition(topic, 0)
 
     val partitionInfo = producer.partitionsFor(topic).asScala.find(_.partition() == 0).get
@@ -134,9 +125,12 @@ class LogDirFailureTest extends IntegrationTestHarness {
   }
 
   def testProduceAfterLogDirFailureOnLeader(failureType: LogDirFailureType) {
-    val consumer = consumers.head
+    val consumer = createConsumer()
     subscribeAndWaitForAssignment(topic, consumer)
-    val producer = producers.head
+
+    this.producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "0")
+    val producer = createProducer()
+
     val partition = new TopicPartition(topic, 0)
     val record = new ProducerRecord(topic, 0, s"key".getBytes, s"value".getBytes)
 
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index 6b61381..4e60a8f 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
 import org.junit.Assert._
-import org.junit.Test
+import org.junit.{Before, Test}
 import org.apache.kafka.test.TestUtils.isValidClusterId
 
 import scala.collection.JavaConverters._
@@ -38,6 +38,11 @@ class MetadataRequestTest extends BaseRequestTest {
     properties.setProperty(KafkaConfig.RackProp, s"rack/${properties.getProperty(KafkaConfig.BrokerIdProp)}")
   }
 
+  @Before
+  override def setUp(): Unit = {
+    doSetup(createOffsetsTopic = false)
+  }
+
   @Test
   def testClusterIdWithRequestVersion1() {
     val v1MetadataResponse = sendMetadataRequest(MetadataRequest.Builder.allTopics.build(1.toShort))
diff --git a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
index 149f05f..4859019 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
@@ -29,7 +29,7 @@ import kafka.utils.{CoreUtils, Logging, TestUtils}
 import kafka.utils.TestUtils._
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.serialization.Deserializer
@@ -411,11 +411,11 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
   }
 
   private def createBufferingProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
-    TestUtils.createProducer(getBrokerListStrFromServers(brokers), acks = -1, lingerMs = 10000,
-      props = Option(CoreUtils.propsWith(
-        (ProducerConfig.BATCH_SIZE_CONFIG, String.valueOf(msg.length * 1000))
-        , (ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy")
-      )))
+    TestUtils.createProducer(getBrokerListStrFromServers(brokers),
+      acks = -1,
+      lingerMs = 10000,
+      batchSize = msg.length * 1000,
+      compressionType = "snappy")
   }
 
   private def getLogFile(broker: KafkaServer, partition: Int): File = {
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index bdd6882..1e6f022 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -39,7 +39,7 @@ import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.zk.{AdminZkClient, BrokerIdsZNode, BrokerInfo, KafkaZkClient}
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.admin.{AdminClient, AlterConfigsResult, Config, ConfigEntry}
-import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer, OffsetAndMetadata, RangeAssignor}
+import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.{KafkaFuture, TopicPartition}
 import org.apache.kafka.common.config.ConfigResource
@@ -346,26 +346,6 @@ object TestUtils extends Logging {
   }
 
   /**
-   * Create a test config for a consumer
-   */
-  def createConsumerProperties(zkConnect: String, groupId: String, consumerId: String,
-                               consumerTimeout: Long = -1): Properties = {
-    val props = new Properties
-    props.put("zookeeper.connect", zkConnect)
-    props.put("group.id", groupId)
-    props.put("consumer.id", consumerId)
-    props.put("consumer.timeout.ms", consumerTimeout.toString)
-    props.put("zookeeper.session.timeout.ms", "6000")
-    props.put("zookeeper.sync.time.ms", "200")
-    props.put("auto.commit.interval.ms", "1000")
-    props.put("rebalance.max.retries", "4")
-    props.put("auto.offset.reset", "smallest")
-    props.put("num.consumer.fetchers", "2")
-
-    props
-  }
-
-  /**
    * Fail a test case explicitly. Return Nothing so that we are not constrained by the return type.
    */
   def fail(msg: String): Nothing = throw new AssertionError(msg)
@@ -522,11 +502,11 @@ object TestUtils extends Logging {
   }
 
   def securityConfigs(mode: Mode,
-                              securityProtocol: SecurityProtocol,
-                              trustStoreFile: Option[File],
-                              certAlias: String,
-                              certCn: String,
-                              saslProperties: Option[Properties]): Properties = {
+                      securityProtocol: SecurityProtocol,
+                      trustStoreFile: Option[File],
+                      certAlias: String,
+                      certCn: String,
+                      saslProperties: Option[Properties]): Properties = {
     val props = new Properties
     if (usesSslTransportLayer(securityProtocol))
       props ++= sslConfigs(mode, securityProtocol == SecurityProtocol.SSL, trustStoreFile, certAlias, certCn)
@@ -537,7 +517,9 @@ object TestUtils extends Logging {
     props
   }
 
-  def producerSecurityConfigs(securityProtocol: SecurityProtocol, trustStoreFile: Option[File], saslProperties: Option[Properties]): Properties =
+  def producerSecurityConfigs(securityProtocol: SecurityProtocol,
+                              trustStoreFile: Option[File],
+                              saslProperties: Option[Properties]): Properties =
     securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, "producer", SslCertificateCn, saslProperties)
 
   /**
@@ -550,15 +532,15 @@ object TestUtils extends Logging {
                            retries: Int = Int.MaxValue,
                            deliveryTimeoutMs: Int = 30 * 1000,
                            lingerMs: Int = 0,
+                           batchSize: Int = 16384,
+                           compressionType: String = "none",
                            requestTimeoutMs: Int = 20 * 1000,
                            securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT,
                            trustStoreFile: Option[File] = None,
                            saslProperties: Option[Properties] = None,
                            keySerializer: Serializer[K] = new ByteArraySerializer,
-                           valueSerializer: Serializer[V] = new ByteArraySerializer,
-                           props: Option[Properties] = None): KafkaProducer[K, V] = {
-
-    val producerProps = props.getOrElse(new Properties)
+                           valueSerializer: Serializer[V] = new ByteArraySerializer): KafkaProducer[K, V] = {
+    val producerProps = new Properties
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
     producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString)
     producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs.toString)
@@ -567,27 +549,9 @@ object TestUtils extends Logging {
     producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, deliveryTimeoutMs.toString)
     producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs.toString)
     producerProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs.toString)
-
-    /* Only use these if not already set */
-    val defaultProps = Map(
-      ProducerConfig.RETRY_BACKOFF_MS_CONFIG -> "100",
-      ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG -> "200",
-      ProducerConfig.LINGER_MS_CONFIG -> lingerMs.toString
-    )
-
-    defaultProps.foreach { case (key, value) =>
-      if (!producerProps.containsKey(key)) producerProps.put(key, value)
-    }
-
-    /*
-     * It uses CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to determine whether
-     * securityConfigs has been invoked already. For example, we need to
-     * invoke it before this call in IntegrationTestHarness, otherwise the
-     * SSL client auth fails.
-     */
-    if (!producerProps.containsKey(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG))
-      producerProps ++= producerSecurityConfigs(securityProtocol, trustStoreFile, saslProperties)
-
+    producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize.toString)
+    producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType)
+    producerProps ++= producerSecurityConfigs(securityProtocol, trustStoreFile, saslProperties)
     new KafkaProducer[K, V](producerProps, keySerializer, valueSerializer)
   }
 
@@ -613,60 +577,25 @@ object TestUtils extends Logging {
   def createConsumer[K, V](brokerList: String,
                            groupId: String = "group",
                            autoOffsetReset: String = "earliest",
-                           partitionFetchSize: Long = 4096L,
-                           partitionAssignmentStrategy: String = classOf[RangeAssignor].getName,
-                           sessionTimeout: Int = 30000,
-                           securityProtocol: SecurityProtocol,
+                           enableAutoCommit: Boolean = true,
+                           readCommitted: Boolean = false,
+                           maxPollRecords: Int = 500,
+                           securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT,
                            trustStoreFile: Option[File] = None,
                            saslProperties: Option[Properties] = None,
                            keyDeserializer: Deserializer[K] = new ByteArrayDeserializer,
-                           valueDeserializer: Deserializer[V] = new ByteArrayDeserializer,
-                           props: Option[Properties] = None): KafkaConsumer[K, V] = {
-    import org.apache.kafka.clients.consumer.ConsumerConfig
-
-    val consumerProps = props.getOrElse(new Properties())
+                           valueDeserializer: Deserializer[V] = new ByteArrayDeserializer): KafkaConsumer[K, V] = {
+    val consumerProps = new Properties
     consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
     consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset)
-    consumerProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, partitionFetchSize.toString)
-
-    val defaultProps = Map(
-      ConsumerConfig.RETRY_BACKOFF_MS_CONFIG -> "100",
-      ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG -> "200",
-      ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> partitionAssignmentStrategy,
-      ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG -> sessionTimeout.toString,
-      ConsumerConfig.GROUP_ID_CONFIG -> groupId)
-
-    defaultProps.foreach { case (key, value) =>
-      if (!consumerProps.containsKey(key)) consumerProps.put(key, value)
-    }
-
-    /*
-     * It uses CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to determine whether
-     * securityConfigs has been invoked already. For example, we need to
-     * invoke it before this call in IntegrationTestHarness, otherwise the
-     * SSL client auth fails.
-     */
-    if(!consumerProps.containsKey(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG))
-      consumerProps ++= consumerSecurityConfigs(securityProtocol, trustStoreFile, saslProperties)
-
+    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
+    consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit.toString)
+    consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords.toString)
+    consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, if (readCommitted) "read_committed" else "read_uncommitted")
+    consumerProps ++= consumerSecurityConfigs(securityProtocol, trustStoreFile, saslProperties)
     new KafkaConsumer[K, V](consumerProps, keyDeserializer, valueDeserializer)
   }
 
-  /**
-   * Create a default producer config properties map with the given metadata broker list
-   */
-  def getProducerConfig(brokerList: String): Properties = {
-    val props = new Properties()
-    props.put("metadata.broker.list", brokerList)
-    props.put("message.send.max.retries", "5")
-    props.put("retry.backoff.ms", "1000")
-    props.put("request.timeout.ms", "2000")
-    props.put("request.required.acks", "-1")
-    props.put("send.buffer.bytes", "65536")
-
-    props
-  }
-
   def createBrokersInZk(zkClient: KafkaZkClient, ids: Seq[Int]): Seq[Broker] =
     createBrokersInZk(ids.map(kafka.admin.BrokerMetadata(_, None)), zkClient)
 
@@ -1227,8 +1156,10 @@ object TestUtils extends Logging {
                                 securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT,
                                 trustStoreFile: Option[File] = None,
                                 waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Seq[ConsumerRecord[Array[Byte], Array[Byte]]] = {
-    val consumer = createConsumer(TestUtils.getBrokerListStrFromServers(servers, securityProtocol), groupId = groupId,
-      securityProtocol = securityProtocol, trustStoreFile = trustStoreFile)
+    val consumer = createConsumer(TestUtils.getBrokerListStrFromServers(servers, securityProtocol),
+      groupId = groupId,
+      securityProtocol = securityProtocol,
+      trustStoreFile = trustStoreFile)
     try {
       consumer.subscribe(Collections.singleton(topic))
       consumeRecords(consumer, numMessages, waitTime)
@@ -1264,21 +1195,26 @@ object TestUtils extends Logging {
     records
   }
 
-  def createTransactionalProducer(transactionalId: String, servers: Seq[KafkaServer], batchSize: Int = 16384,
+  def createTransactionalProducer(transactionalId: String,
+                                  servers: Seq[KafkaServer],
+                                  batchSize: Int = 16384,
                                   transactionTimeoutMs: Long = 60000) = {
     val props = new Properties()
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromServers(servers))
+    props.put(ProducerConfig.ACKS_CONFIG, "all")
+    props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize.toString)
     props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId)
     props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
-    props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize.toString)
     props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs.toString)
-    TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers), acks = -1, props = Some(props))
+    new KafkaProducer[Array[Byte], Array[Byte]](props, new ByteArraySerializer, new ByteArraySerializer)
   }
 
   // Seeds the given topic with records with keys and values in the range [0..numRecords)
   def seedTopicWithNumberedRecords(topic: String, numRecords: Int, servers: Seq[KafkaServer]): Unit = {
     val props = new Properties()
     props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
-    val producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers), acks = -1, props = Some(props))
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromServers(servers))
+    val producer = new KafkaProducer[Array[Byte], Array[Byte]](props, new ByteArraySerializer, new ByteArraySerializer)
     try {
       for (i <- 0 until numRecords) {
         producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, asBytes(i.toString), asBytes(i.toString)))


Mime
View raw message