kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch trunk updated: MINOR: Make the build compile with Scala 2.13 (#6989)
Date Tue, 02 Jul 2019 13:30:20 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6dd4ebc  MINOR: Make the build compile with Scala 2.13 (#6989)
6dd4ebc is described below

commit 6dd4ebcea78f9c6fb8a2921cab3fd92be4978063
Author: Ismael Juma <ismael@juma.me.uk>
AuthorDate: Tue Jul 2 06:29:39 2019 -0700

    MINOR: Make the build compile with Scala 2.13 (#6989)
    
    Scala 2.13 support was added to build via #5454. This PR adjusts the code so that
    it compiles with 2.11, 2.12 and 2.13.
    
    Changes:
    * Add `scala-collection-compat` dependency.
    * Import `scala.collection.Seq` in a number of places for consistent behavior between
    Scala 2.11, 2.12 and 2.13.
    * Remove wildcard imports that were causing the Java classes to have priority over the
    Scala ones, related Scala issue: https://github.com/scala/scala/pull/6589.
    * Replace parallel collection usage with `Future`. The former is no longer included by
    default in the standard library.
    * Replace val _: Unit workaround with one that is more concise and works with Scala 2.13
    * Replace `filterKeys` with `filter` when we expect a `Map`. `filterKeys` returns a view
    that doesn't implement the `Map` trait in Scala 2.13.
    * Replace `mapValues` with `map` or add a `toMap` as an additional transformation
    when we expect a `Map`. `mapValues` returns a view that doesn't implement the
    `Map` trait in Scala 2.13.
    * Replace `breakOut` with `iterator` and `to`, `breakOut` was removed in Scala
    2.13.
    * Replace to() with toMap, toIndexedSeq and toSet
    * Replace `mutable.Buffer.--` with `filterNot`.
    * ControlException is an abstract class in Scala 2.13.
    * Variable arguments can only receive arrays or immutable.Seq in Scala 2.13.
    * Use `Factory` instead of `CanBuildFrom` in DecodeJson. `CanBuildFrom` behaves
    a bit differently in Scala 2.13 and it's been deprecated. `Factory` has the behavior
    we need and it's available via the compat library.
    * Fix failing tests due to behavior change in Scala 2.13,
    "Map.values.map is not strict in Scala 2.13" (https://github.com/scala/bug/issues/11589).
    * Use Java collections instead of Scala ones in StreamResetter (a Java class).
    * Adjust CheckpointFile.write to take an `Iterable` instead of `Seq` to avoid
    unnecessary collection copies.
    * Fix DelayedElectLeader to use a Map instead of Set and avoid `to` call that
    doesn't work in Scala 2.13.
    * Use unordered map for mapping in SimpleAclAuthorizer, mapping of ordered
    maps require an `Ordering` in Scala 2.13 for safety reasons.
    * Adapt `ConsumerGroupCommand` to compile with Scala 2.13.
    * CoreUtils.min takes an `Iterable` instead of `TraversableOnce`, the latter does
    not exist in Scala 2.13.
    * Replace `Unit` with `()` in a couple places. Scala 2.13 is stricter when it expects
    a value instead of a type.
    * Fix bug in CustomQuotaCallbackTest where we did not necessarily set `partitionRatio`
    correctly, `forall` can terminate early.
    * Add a couple of spotbugs exclusions that are needed by code generated by Scala 2.13
    * Remove unused variables, simplify some code and remove procedure syntax in a few
    places.
    * Remove unused `CoreUtils.JSONEscapeString`.
    
    Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, José Armando García Sancio <jsancio@users.noreply.github.com>
---
 build.gradle                                       |  1 +
 .../src/main/scala/kafka/admin/ConfigCommand.scala |  2 +-
 .../scala/kafka/admin/ConsumerGroupCommand.scala   | 31 ++++++++---------
 .../scala/kafka/admin/DeleteRecordsCommand.scala   |  1 +
 .../scala/kafka/admin/LeaderElectionCommand.scala  |  4 +--
 .../main/scala/kafka/admin/LogDirsCommand.scala    |  2 +-
 .../PreferredReplicaLeaderElectionCommand.scala    |  4 +--
 .../kafka/admin/ReassignPartitionsCommand.scala    |  3 +-
 core/src/main/scala/kafka/api/package.scala        |  5 ++-
 core/src/main/scala/kafka/cluster/Broker.scala     |  2 ++
 core/src/main/scala/kafka/cluster/Partition.scala  |  2 +-
 .../common/ZkNodeChangeNotificationListener.scala  |  1 +
 .../controller/ControllerChannelManager.scala      |  2 +-
 .../scala/kafka/controller/ControllerContext.scala |  4 +--
 .../src/main/scala/kafka/controller/Election.scala |  2 ++
 .../scala/kafka/controller/KafkaController.scala   | 24 ++++++-------
 .../kafka/controller/PartitionStateMachine.scala   |  9 +++--
 .../kafka/controller/ReplicaStateMachine.scala     | 13 ++++---
 .../kafka/coordinator/group/GroupCoordinator.scala | 16 ++++-----
 .../coordinator/group/GroupMetadataManager.scala   |  8 ++---
 core/src/main/scala/kafka/log/LogCleaner.scala     |  2 +-
 .../main/scala/kafka/log/LogCleanerManager.scala   |  6 ++--
 core/src/main/scala/kafka/log/LogManager.scala     | 10 +++---
 core/src/main/scala/kafka/log/LogValidator.scala   |  2 +-
 .../scala/kafka/metrics/KafkaMetricsReporter.scala |  1 +
 .../kafka/security/auth/SimpleAclAuthorizer.scala  | 19 ++++++----
 .../scala/kafka/server/AbstractFetcherThread.scala |  2 +-
 .../src/main/scala/kafka/server/AdminManager.scala | 25 +++++---------
 .../scala/kafka/server/ClientQuotaManager.scala    |  2 +-
 .../main/scala/kafka/server/ConfigHandler.scala    |  1 +
 .../scala/kafka/server/DelayedDeleteRecords.scala  |  2 +-
 .../scala/kafka/server/DelayedElectLeader.scala    | 10 +++---
 .../main/scala/kafka/server/DelayedProduce.scala   |  2 +-
 .../scala/kafka/server/DynamicBrokerConfig.scala   |  8 ++---
 core/src/main/scala/kafka/server/KafkaApis.scala   | 31 ++++++++---------
 core/src/main/scala/kafka/server/KafkaConfig.scala |  2 +-
 .../scala/kafka/server/KafkaServerStartable.scala  |  2 ++
 .../main/scala/kafka/server/ReplicaManager.scala   | 18 +++++-----
 .../kafka/server/ReplicationQuotaManager.scala     |  4 ++-
 .../kafka/server/checkpoints/CheckpointFile.scala  |  2 +-
 .../server/checkpoints/OffsetCheckpointFile.scala  |  2 +-
 .../kafka/server/epoch/LeaderEpochFileCache.scala  | 10 ++++--
 .../main/scala/kafka/tools/GetOffsetShell.scala    |  5 ++-
 .../kafka/tools/ReplicaVerificationTool.scala      |  1 +
 .../main/scala/kafka/tools/StreamsResetter.java    | 40 +++++++++++++---------
 core/src/main/scala/kafka/utils/CoreUtils.scala    | 33 +++---------------
 .../main/scala/kafka/utils/json/DecodeJson.scala   | 16 ++++-----
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   | 18 +++++-----
 core/src/main/scala/kafka/zk/ZkData.scala          |  6 ++--
 .../scala/kafka/zookeeper/ZooKeeperClient.scala    |  1 +
 .../kafka/admin/BrokerApiVersionsCommandTest.scala |  4 ++-
 .../admin/ReassignPartitionsIntegrationTest.scala  |  2 +-
 .../kafka/api/AdminClientIntegrationTest.scala     |  1 +
 .../integration/kafka/api/BaseConsumerTest.scala   |  1 +
 .../integration/kafka/api/ConsumerBounceTest.scala |  1 +
 .../kafka/api/CustomQuotaCallbackTest.scala        | 14 ++++----
 .../kafka/api/IntegrationTestHarness.scala         |  1 +
 .../kafka/api/SaslPlainPlaintextConsumerTest.scala |  2 ++
 .../SaslPlainSslEndToEndAuthorizationTest.scala    |  2 ++
 .../scala/integration/kafka/api/SaslSetup.scala    |  2 ++
 .../integration/kafka/api/TransactionsTest.scala   |  1 +
 .../kafka/server/DelayedFetchTest.scala            |  2 ++
 .../server/DynamicBrokerReconfigurationTest.scala  |  6 ++--
 ...pleListenersWithAdditionalJaasContextTest.scala |  2 ++
 ...ltipleListenersWithDefaultJaasContextTest.scala |  2 ++
 ...ListenersWithSameSecurityProtocolBaseTest.scala |  1 +
 .../kafka/tools/MirrorMakerIntegrationTest.scala   |  2 ++
 .../unit/kafka/admin/AdminRackAwareTest.scala      |  6 ++--
 .../scala/unit/kafka/admin/DeleteTopicTest.scala   |  5 ++-
 .../kafka/admin/LeaderElectionCommandTest.scala    |  1 +
 ...PreferredReplicaLeaderElectionCommandTest.scala |  4 ++-
 .../admin/ReassignPartitionsCommandTest.scala      |  1 +
 .../kafka/admin/ResetConsumerGroupOffsetTest.scala | 22 ++++++------
 .../admin/TopicCommandWithAdminClientTest.scala    |  1 +
 .../scala/unit/kafka/cluster/PartitionTest.scala   |  2 +-
 .../ZkNodeChangeNotificationListenerTest.scala     |  1 +
 .../controller/ControllerChannelManagerTest.scala  | 22 +++++++-----
 .../controller/ControllerEventManagerTest.scala    |  2 +-
 .../controller/ControllerIntegrationTest.scala     |  1 +
 .../controller/MockPartitionStateMachine.scala     |  4 +--
 .../kafka/controller/MockReplicaStateMachine.scala |  2 ++
 .../controller/PartitionStateMachineTest.scala     |  5 ++-
 .../group/GroupMetadataManagerTest.scala           | 28 +++++++--------
 .../TransactionCoordinatorConcurrencyTest.scala    |  2 +-
 .../transaction/TransactionStateManagerTest.scala  |  4 +--
 .../kafka/integration/KafkaServerTestHarness.scala |  1 +
 .../integration/UncleanLeaderElectionTest.scala    |  1 +
 .../LogCleanerParameterizedIntegrationTest.scala   |  1 -
 .../test/scala/unit/kafka/log/LogCleanerTest.scala |  6 ++--
 .../unit/kafka/network/SocketServerTest.scala      | 17 ++++-----
 .../unit/kafka/server/AdvertiseBrokerTest.scala    |  2 +-
 .../server/AlterReplicaLogDirsRequestTest.scala    |  2 +-
 .../scala/unit/kafka/server/BaseRequestTest.scala  |  2 ++
 .../server/CreateTopicsRequestWithPolicyTest.scala |  2 +-
 .../kafka/server/DynamicConfigChangeTest.scala     |  2 +-
 .../scala/unit/kafka/server/FetchRequestTest.scala |  2 +-
 .../unit/kafka/server/ISRExpirationTest.scala      |  1 +
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  2 +-
 .../scala/unit/kafka/server/LogRecoveryTest.scala  |  8 +++--
 .../unit/kafka/server/MetadataRequestTest.scala    |  1 +
 .../server/ReplicaAlterLogDirsThreadTest.scala     |  1 -
 .../scala/unit/kafka/server/ReplicaFetchTest.scala |  2 ++
 .../kafka/server/ReplicaFetcherThreadTest.scala    |  1 -
 .../kafka/server/ReplicaManagerQuotasTest.scala    |  5 +--
 .../unit/kafka/server/ReplicaManagerTest.scala     |  2 +-
 .../kafka/server/ReplicationQuotaManagerTest.scala |  2 +-
 .../unit/kafka/server/ReplicationQuotasTest.scala  |  2 +-
 .../kafka/server/ServerGenerateBrokerIdTest.scala  |  2 ++
 .../server/epoch/LeaderEpochFileCacheTest.scala    |  5 +--
 .../server/epoch/LeaderEpochIntegrationTest.scala  |  2 +-
 .../scala/unit/kafka/utils/JaasTestUtils.scala     |  3 ++
 .../test/scala/unit/kafka/utils/TestUtils.scala    | 14 +++++---
 .../unit/kafka/utils/json/JsonValueTest.scala      |  2 ++
 .../scala/unit/kafka/zk/AdminZkClientTest.scala    |  2 +-
 .../scala/unit/kafka/zk/EmbeddedZookeeper.scala    |  2 +-
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala    |  7 ++--
 .../unit/kafka/zookeeper/ZooKeeperClientTest.scala |  2 ++
 gradle/dependencies.gradle                         |  6 ++--
 gradle/spotbugs-exclude.xml                        |  6 +++-
 .../streams/scala/kstream/KGroupedStream.scala     |  8 ++++-
 .../kafka/streams/scala/kstream/KStream.scala      | 10 +++++-
 .../kafka/streams/scala/kstream/KTable.scala       |  2 +-
 ...bleJoinScalaIntegrationTestImplicitSerdes.scala |  2 +-
 .../apache/kafka/streams/scala/TopologyTest.scala  | 22 ++++++------
 .../apache/kafka/streams/scala/WordCountTest.scala |  5 ++-
 125 files changed, 413 insertions(+), 332 deletions(-)

diff --git a/build.gradle b/build.gradle
index 5da2e55..a56bd67 100644
--- a/build.gradle
+++ b/build.gradle
@@ -667,6 +667,7 @@ project(':core') {
     compile libs.jacksonJDK8Datatypes
     compile libs.joptSimple
     compile libs.metrics
+    compile libs.scalaCollectionCompat
     compile libs.scalaJava8Compat
     compile libs.scalaLibrary
     // only needed transitively, but set it explicitly to ensure it has the same version as scala-library
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 63ee8eb..09c4456 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -194,7 +194,7 @@ object ConfigCommand extends Config {
    */
   private def preProcessBrokerConfigs(configsToBeAdded: Properties, perBrokerConfig: Boolean) {
     val passwordEncoderConfigs = new Properties
-    passwordEncoderConfigs ++= configsToBeAdded.asScala.filterKeys(_.startsWith("password.encoder."))
+    passwordEncoderConfigs ++= configsToBeAdded.asScala.filter { case (key, _) => key.startsWith("password.encoder.") }
     if (!passwordEncoderConfigs.isEmpty) {
       info(s"Password encoder configs ${passwordEncoderConfigs.keySet} will be used for encrypting" +
         " passwords, but will not be stored in ZooKeeper.")
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 1ca3515..0277d9b 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -34,7 +34,7 @@ import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
-import scala.collection.{Seq, Set, mutable}
+import scala.collection.{immutable, Map, Seq, Set, mutable}
 import scala.util.{Failure, Success, Try}
 import joptsimple.OptionSpec
 import scala.collection.immutable.TreeMap
@@ -374,7 +374,7 @@ object ConsumerGroupCommand extends Logging {
       ).describedGroups()
 
       val result =
-        consumerGroups.asScala.foldLeft(Map[String, Map[TopicPartition, OffsetAndMetadata]]()) {
+        consumerGroups.asScala.foldLeft(immutable.Map[String, Map[TopicPartition, OffsetAndMetadata]]()) {
           case (acc, (groupId, groupDescription)) =>
             groupDescription.get.state().toString match {
               case "Empty" | "Dead" =>
@@ -418,7 +418,7 @@ object ConsumerGroupCommand extends Logging {
 
       val groupOffsets = TreeMap[String, (Option[String], Option[Seq[PartitionAssignmentState]])]() ++ (for ((groupId, consumerGroup) <- consumerGroups) yield {
         val state = consumerGroup.state
-        val committedOffsets = getCommittedOffsets(groupId).asScala.toMap
+        val committedOffsets = getCommittedOffsets(groupId)
         var assignedTopicPartitions = ListBuffer[TopicPartition]()
         val rowsWithConsumer = consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toSeq
           .sortWith(_.assignment.topicPartitions.size > _.assignment.topicPartitions.size).flatMap { consumerSummary =>
@@ -441,7 +441,7 @@ object ConsumerGroupCommand extends Logging {
               Map(topicPartition -> Some(offset.offset)),
               Some(MISSING_COLUMN_VALUE),
               Some(MISSING_COLUMN_VALUE),
-              Some(MISSING_COLUMN_VALUE))
+              Some(MISSING_COLUMN_VALUE)).toSeq
         }
         groupId -> (Some(state.toString), Some(rowsWithConsumer ++ rowsWithoutConsumer))
       }).toMap
@@ -576,8 +576,7 @@ object ConsumerGroupCommand extends Logging {
 
     private def getPartitionsToReset(groupId: String): Seq[TopicPartition] = {
       if (opts.options.has(opts.allTopicsOpt)) {
-        val allTopicPartitions = getCommittedOffsets(groupId).keySet().asScala.toSeq
-        allTopicPartitions
+        getCommittedOffsets(groupId).keys.toSeq
       } else if (opts.options.has(opts.topicOpt)) {
         val topics = opts.options.valuesOf(opts.topicOpt).asScala
         parseTopicPartitionsToReset(groupId, topics)
@@ -589,19 +588,19 @@ object ConsumerGroupCommand extends Logging {
       }
     }
 
-    private def getCommittedOffsets(groupId: String) = {
+    private def getCommittedOffsets(groupId: String): Map[TopicPartition, OffsetAndMetadata] = {
       adminClient.listConsumerGroupOffsets(
         groupId,
         withTimeoutMs(new ListConsumerGroupOffsetsOptions)
-      ).partitionsToOffsetAndMetadata.get
+      ).partitionsToOffsetAndMetadata.get.asScala
     }
 
-    type GroupMetadata = Map[String, Map[TopicPartition, OffsetAndMetadata]]
+    type GroupMetadata = immutable.Map[String, immutable.Map[TopicPartition, OffsetAndMetadata]]
     private def parseResetPlan(resetPlanCsv: String): GroupMetadata = {
       def updateGroupMetadata(group: String, topic: String, partition: Int, offset: Long, acc: GroupMetadata) = {
         val topicPartition = new TopicPartition(topic, partition)
         val offsetAndMetadata = new OffsetAndMetadata(offset)
-        val dataMap = acc.getOrElse(group, Map()).updated(topicPartition, offsetAndMetadata)
+        val dataMap = acc.getOrElse(group, immutable.Map()).updated(topicPartition, offsetAndMetadata)
         acc.updated(group, dataMap)
       }
       val csvReader = CsvUtils().readerFor[CsvRecordNoGroup]
@@ -612,14 +611,14 @@ object ConsumerGroupCommand extends Logging {
       // Single group CSV format: "topic,partition,offset"
       val dataMap = if (isSingleGroupQuery && isOldCsvFormat) {
         val group = opts.options.valueOf(opts.groupOpt)
-        lines.foldLeft(Map[String, Map[TopicPartition, OffsetAndMetadata]]()) { (acc, line) =>
+        lines.foldLeft(immutable.Map[String, immutable.Map[TopicPartition, OffsetAndMetadata]]()) { (acc, line) =>
           val CsvRecordNoGroup(topic, partition, offset) = csvReader.readValue[CsvRecordNoGroup](line)
           updateGroupMetadata(group, topic, partition, offset, acc)
         }
         // Multiple group CSV format: "group,topic,partition,offset"
       } else {
         val csvReader = CsvUtils().readerFor[CsvRecordWithGroup]
-        lines.foldLeft(Map[String, Map[TopicPartition, OffsetAndMetadata]]()) { (acc, line) =>
+        lines.foldLeft(immutable.Map[String, immutable.Map[TopicPartition, OffsetAndMetadata]]()) { (acc, line) =>
           val CsvRecordWithGroup(group, topic, partition, offset) = csvReader.readValue[CsvRecordWithGroup](line)
           updateGroupMetadata(group, topic, partition, offset, acc)
         }
@@ -654,7 +653,7 @@ object ConsumerGroupCommand extends Logging {
         val currentCommittedOffsets = getCommittedOffsets(groupId)
         val requestedOffsets = partitionsToReset.map { topicPartition =>
           val shiftBy = opts.options.valueOf(opts.resetShiftByOpt)
-          val currentOffset = currentCommittedOffsets.asScala.getOrElse(topicPartition,
+          val currentOffset = currentCommittedOffsets.getOrElse(topicPartition,
             throw new IllegalArgumentException(s"Cannot shift offset for partition $topicPartition since there is no current committed offset")).offset
           (topicPartition, currentOffset + shiftBy)
         }.toMap
@@ -706,8 +705,8 @@ object ConsumerGroupCommand extends Logging {
 
         val preparedOffsetsForPartitionsWithCommittedOffset = partitionsToResetWithCommittedOffset.map { topicPartition =>
           (topicPartition, new OffsetAndMetadata(currentCommittedOffsets.get(topicPartition) match {
-            case offset if offset != null => offset.offset
-            case _ => throw new IllegalStateException(s"Expected a valid current offset for topic partition: $topicPartition")
+            case Some(offset) => offset.offset
+            case None => throw new IllegalStateException(s"Expected a valid current offset for topic partition: $topicPartition")
           }))
         }.toMap
 
@@ -756,7 +755,7 @@ object ConsumerGroupCommand extends Logging {
             if (isSingleGroupQuery) CsvRecordNoGroup(k.topic, k.partition, v.offset)
             else CsvRecordWithGroup(groupId, k.topic, k.partition, v.offset)
           csvWriter.writeValueAsString(csvRecord)
-        }(collection.breakOut): List[String]
+        }
       }
       rows.mkString("")
     }
diff --git a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
index 10977c2..600aef6 100644
--- a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
+++ b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
@@ -29,6 +29,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Utils
 
 import scala.collection.JavaConverters._
+import scala.collection.Seq
 
 /**
  * A command for delete records of the given partitions down to the specified offset.
diff --git a/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala
index 3d17795..e484dd3 100644
--- a/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala
@@ -148,8 +148,8 @@ object LeaderElectionCommand extends Logging {
     val noop = mutable.Set.empty[TopicPartition]
     val failed = mutable.Map.empty[TopicPartition, Throwable]
 
-    electionResults.foreach { case (topicPartition, error) =>
-      val _: Unit = if (error.isPresent) {
+    electionResults.foreach[Unit] { case (topicPartition, error) =>
+      if (error.isPresent) {
         error.get match {
           case _: ElectionNotNeededException => noop += topicPartition
           case _ => failed += topicPartition -> error.get
diff --git a/core/src/main/scala/kafka/admin/LogDirsCommand.scala b/core/src/main/scala/kafka/admin/LogDirsCommand.scala
index 3beb29b..e689b12 100644
--- a/core/src/main/scala/kafka/admin/LogDirsCommand.scala
+++ b/core/src/main/scala/kafka/admin/LogDirsCommand.scala
@@ -48,7 +48,7 @@ object LogDirsCommand {
 
         out.println("Querying brokers for log directories information")
         val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava)
-        val logDirInfosByBroker = describeLogDirsResult.all.get().asScala.mapValues(_.asScala)
+        val logDirInfosByBroker = describeLogDirsResult.all.get().asScala.mapValues(_.asScala).toMap
 
         out.println(s"Received log directory information from brokers ${brokerList.mkString(",")}")
         out.println(formatAsJson(logDirInfosByBroker, topicList.toSet))
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index 15242f7..cb8c899 100755
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -243,8 +243,8 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
       val noop = mutable.Set.empty[TopicPartition]
       val failed = mutable.Map.empty[TopicPartition, Throwable]
 
-      electionResults.foreach { case (topicPartition, error) =>
-        val _: Unit = if (error.isPresent) {
+      electionResults.foreach[Unit] { case (topicPartition, error) =>
+        if (error.isPresent) {
           if (error.get.isInstanceOf[ElectionNotNeededException]) {
             noop += topicPartition
           } else {
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index c108f07..4d0f071 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -640,7 +640,8 @@ class ReassignPartitionsCommand(zkClient: KafkaZkClient,
         val replicasAssignedToFutureDir = mutable.Set.empty[TopicPartitionReplica]
         while (remainingTimeMs > 0 && replicasAssignedToFutureDir.size < proposedReplicaAssignment.size) {
           replicasAssignedToFutureDir ++= alterReplicaLogDirsIgnoreReplicaNotAvailable(
-            proposedReplicaAssignment.filterKeys(replica => !replicasAssignedToFutureDir.contains(replica)), adminClientOpt.get, remainingTimeMs)
+            proposedReplicaAssignment.filter { case (replica, _) => !replicasAssignedToFutureDir.contains(replica) },
+            adminClientOpt.get, remainingTimeMs)
           Thread.sleep(100)
           remainingTimeMs = startTimeMs + timeoutMs - System.currentTimeMillis()
         }
diff --git a/core/src/main/scala/kafka/api/package.scala b/core/src/main/scala/kafka/api/package.scala
index aa2fdfd..98d0045 100644
--- a/core/src/main/scala/kafka/api/package.scala
+++ b/core/src/main/scala/kafka/api/package.scala
@@ -20,7 +20,6 @@ import org.apache.kafka.common.ElectionType
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.requests.ElectLeadersRequest
 import scala.collection.JavaConverters._
-import scala.collection.breakOut
 
 package object api {
   implicit final class ElectLeadersRequestOps(val self: ElectLeadersRequest) extends AnyVal {
@@ -28,11 +27,11 @@ package object api {
       if (self.data.topicPartitions == null) {
         Set.empty
       } else {
-        self.data.topicPartitions.asScala.flatMap { topicPartition =>
+        self.data.topicPartitions.asScala.iterator.flatMap { topicPartition =>
           topicPartition.partitionId.asScala.map { partitionId =>
             new TopicPartition(topicPartition.topic, partitionId)
           }
-        }(breakOut)
+        }.toSet
       }
     }
 
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index 0106982..d5ee82c 100755
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -22,6 +22,8 @@ import org.apache.kafka.common.Node
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
+import scala.collection.Seq
+
 /**
  * A Kafka broker.
  * A broker has an id, a collection of end-points, an optional rack and a listener to security protocol map.
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 0574cae..160c1d0 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -41,7 +41,7 @@ import org.apache.kafka.common.requests._
 import org.apache.kafka.common.utils.Time
 
 import scala.collection.JavaConverters._
-import scala.collection.Map
+import scala.collection.{Map, Seq}
 
 trait PartitionStateStore {
   def fetchTopicConfig(): Properties
diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
index 65c3506..9a160e6 100644
--- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
+++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
@@ -25,6 +25,7 @@ import kafka.zk.{KafkaZkClient, StateChangeHandlers}
 import kafka.zookeeper.{StateChangeHandler, ZNodeChildChangeHandler}
 import org.apache.kafka.common.utils.Time
 
+import scala.collection.Seq
 import scala.util.{Failure, Try}
 
 /**
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 3c46036..3719a31 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -38,7 +38,7 @@ import org.apache.kafka.common.{KafkaException, Node, Reconfigurable, TopicParti
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.{HashMap, ListBuffer}
-import scala.collection.{Set, mutable}
+import scala.collection.{Seq, Set, mutable}
 
 object ControllerChannelManager {
   val QueueSizeMetricName = "QueueSize"
diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala b/core/src/main/scala/kafka/controller/ControllerContext.scala
index e41c899..38443a7 100644
--- a/core/src/main/scala/kafka/controller/ControllerContext.scala
+++ b/core/src/main/scala/kafka/controller/ControllerContext.scala
@@ -20,7 +20,7 @@ package kafka.controller
 import kafka.cluster.Broker
 import org.apache.kafka.common.TopicPartition
 
-import scala.collection.{Seq, Set, mutable}
+import scala.collection.{Map, Seq, Set, mutable}
 
 class ControllerContext {
   val stats = new ControllerStats
@@ -113,7 +113,7 @@ class ControllerContext {
 
   def removeLiveBrokers(brokerIds: Set[Int]): Unit = {
     liveBrokers = liveBrokers.filter(broker => !brokerIds.contains(broker.id))
-    liveBrokerEpochs = liveBrokerEpochs.filterKeys(id => !brokerIds.contains(id))
+    liveBrokerEpochs = liveBrokerEpochs.filter { case (id, _) => !brokerIds.contains(id) }
   }
 
   def updateBrokerMetadata(oldMetadata: Broker, newMetadata: Broker): Unit = {
diff --git a/core/src/main/scala/kafka/controller/Election.scala b/core/src/main/scala/kafka/controller/Election.scala
index e30f69a..163f916 100644
--- a/core/src/main/scala/kafka/controller/Election.scala
+++ b/core/src/main/scala/kafka/controller/Election.scala
@@ -19,6 +19,8 @@ package kafka.controller
 import kafka.api.LeaderAndIsr
 import org.apache.kafka.common.TopicPartition
 
+import scala.collection.Seq
+
 case class ElectionResult(topicPartition: TopicPartition, leaderAndIsr: Option[LeaderAndIsr], liveReplicas: Seq[Int])
 
 object Election {
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 8abb26a..75aeba2 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -41,7 +41,7 @@ import org.apache.zookeeper.KeeperException
 import org.apache.zookeeper.KeeperException.Code
 
 import scala.collection.JavaConverters._
-import scala.collection._
+import scala.collection.{Map, Seq, Set, immutable, mutable}
 import scala.util.{Failure, Try}
 
 sealed trait ElectionTrigger
@@ -897,7 +897,7 @@ class KafkaController(val config: KafkaConfig,
       // Ensure we detect future reassignments
       eventManager.put(PartitionReassignment)
     } else {
-      val reassignment = updatedPartitionsBeingReassigned.mapValues(_.newReplicas)
+      val reassignment = updatedPartitionsBeingReassigned.map { case (k, v) => k -> v.newReplicas }
       try zkClient.setOrCreatePartitionReassignment(reassignment, controllerContext.epochZkVersion)
       catch {
         case e: KeeperException => throw new AdminOperationException(e)
@@ -1276,8 +1276,8 @@ class KafkaController(val config: KafkaConfig,
     val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds
     val bouncedBrokerIds = (curBrokerIds & liveOrShuttingDownBrokerIds)
       .filter(brokerId => curBrokerIdAndEpochs(brokerId) > controllerContext.liveBrokerIdAndEpochs(brokerId))
-    val newBrokerAndEpochs = curBrokerAndEpochs.filterKeys(broker => newBrokerIds.contains(broker.id))
-    val bouncedBrokerAndEpochs = curBrokerAndEpochs.filterKeys(broker => bouncedBrokerIds.contains(broker.id))
+    val newBrokerAndEpochs = curBrokerAndEpochs.filter { case (broker, _) => newBrokerIds.contains(broker.id) }
+    val bouncedBrokerAndEpochs = curBrokerAndEpochs.filter { case (broker, _) => bouncedBrokerIds.contains(broker.id) }
     val newBrokerIdsSorted = newBrokerIds.toSeq.sorted
     val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted
     val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted
@@ -1358,7 +1358,7 @@ class KafkaController(val config: KafkaConfig,
   }
 
   private def processPartitionModifications(topic: String): Unit = {
-    def restorePartitionReplicaAssignment(topic: String, newPartitionReplicaAssignment : immutable.Map[TopicPartition, Seq[Int]]): Unit = {
+    def restorePartitionReplicaAssignment(topic: String, newPartitionReplicaAssignment: Map[TopicPartition, Seq[Int]]): Unit = {
       info("Restoring the partition replica assignment for topic %s".format(topic))
 
       val existingPartitions = zkClient.getChildren(TopicPartitionsZNode.path(topic))
@@ -1505,7 +1505,7 @@ class KafkaController(val config: KafkaConfig,
   ): Unit = {
     callback(
       partitionsFromAdminClientOpt.fold(Map.empty[TopicPartition, Either[ApiError, Int]]) { partitions =>
-        partitions.map(partition => partition -> Left(new ApiError(Errors.NOT_CONTROLLER, null)))(breakOut)
+        partitions.iterator.map(partition => partition -> Left(new ApiError(Errors.NOT_CONTROLLER, null))).toMap
       }
     )
   }
@@ -1518,7 +1518,7 @@ class KafkaController(val config: KafkaConfig,
   ): Unit = {
     if (!isActive) {
       callback(partitionsFromAdminClientOpt.fold(Map.empty[TopicPartition, Either[ApiError, Int]]) { partitions =>
-        partitions.map(partition => partition -> Left(new ApiError(Errors.NOT_CONTROLLER, null)))(breakOut)
+        partitions.iterator.map(partition => partition -> Left(new ApiError(Errors.NOT_CONTROLLER, null))).toMap
       })
     } else {
       // We need to register the watcher if the path doesn't exist in order to detect future preferred replica
@@ -1556,19 +1556,19 @@ class KafkaController(val config: KafkaConfig,
           }
         }
 
-        val results = onReplicaElection(electablePartitions, electionType, electionTrigger).mapValues {
-          case Left(ex) =>
+        val results = onReplicaElection(electablePartitions, electionType, electionTrigger).map {
+          case (k, Left(ex)) =>
             if (ex.isInstanceOf[StateChangeFailedException]) {
               val error = if (electionType == ElectionType.PREFERRED) {
                 Errors.PREFERRED_LEADER_NOT_AVAILABLE
               } else {
                 Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE
               }
-              Left(new ApiError(error, ex.getMessage))
+              k -> Left(new ApiError(error, ex.getMessage))
             } else {
-              Left(ApiError.fromThrowable(ex))
+              k -> Left(ApiError.fromThrowable(ex))
             }
-          case Right(leaderAndIsr) => Right(leaderAndIsr.leader)
+          case (k, Right(leaderAndIsr)) => k -> Right(leaderAndIsr.leader)
         } ++
         alreadyValidLeader.map(_ -> Left(new ApiError(Errors.ELECTION_NOT_NEEDED))) ++
         partitionsBeingDeleted.map(
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index fca78ef..8669f73 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -28,8 +28,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.ControllerMovedException
 import org.apache.zookeeper.KeeperException
 import org.apache.zookeeper.KeeperException.Code
-import scala.collection.breakOut
-import scala.collection.mutable
+import scala.collection.{Map, Seq, mutable}
 
 abstract class PartitionStateMachine(controllerContext: ControllerContext) extends Logging {
   /**
@@ -165,7 +164,7 @@ class ZkPartitionStateMachine(config: KafkaConfig,
           throw e
         case e: Throwable =>
           error(s"Error while moving some partitions to $targetState state", e)
-          partitions.map(_ -> Left(e))(breakOut)
+          partitions.iterator.map(_ -> Left(e)).toMap
       }
     } else {
       Map.empty
@@ -368,7 +367,7 @@ class ZkPartitionStateMachine(config: KafkaConfig,
       zkClient.getTopicPartitionStatesRaw(partitions)
     } catch {
       case e: Exception =>
-        return (partitions.map(_ -> Left(e))(breakOut), Seq.empty)
+        return (partitions.iterator.map(_ -> Left(e)).toMap, Seq.empty)
     }
     val failedElections = mutable.Map.empty[TopicPartition, Either[Exception, LeaderAndIsr]]
     val validLeaderAndIsrs = mutable.Buffer.empty[(TopicPartition, LeaderAndIsr)]
@@ -470,7 +469,7 @@ class ZkPartitionStateMachine(config: KafkaConfig,
       }
     } else {
       val (logConfigs, failed) = zkClient.getLogConfigs(
-        partitionsWithNoLiveInSyncReplicas.map { case (partition, _) => partition.topic }(breakOut),
+        partitionsWithNoLiveInSyncReplicas.iterator.map { case (partition, _) => partition.topic }.toSet,
         config.originals()
       )
 
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index cdc1d22..e74f4dc 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -26,8 +26,7 @@ import kafka.zk.TopicPartitionStateZNode
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.ControllerMovedException
 import org.apache.zookeeper.KeeperException.Code
-import scala.collection.breakOut
-import scala.collection.mutable
+import scala.collection.{Seq, mutable}
 
 abstract class ReplicaStateMachine(controllerContext: ControllerContext) extends Logging {
   /**
@@ -337,7 +336,7 @@ class ZkReplicaStateMachine(config: KafkaConfig,
     )
 
     val exceptionsForPartitionsWithNoLeaderAndIsrInZk: Map[TopicPartition, Either[Exception, LeaderIsrAndControllerEpoch]] =
-      partitionsWithNoLeaderAndIsrInZk.flatMap { partition =>
+      partitionsWithNoLeaderAndIsrInZk.iterator.flatMap { partition =>
         if (!controllerContext.isTopicQueuedUpForDeletion(partition.topic)) {
           val exception = new StateChangeFailedException(
             s"Failed to change state of replica $replicaId for partition $partition since the leader and isr " +
@@ -345,7 +344,7 @@ class ZkReplicaStateMachine(config: KafkaConfig,
           )
           Option(partition -> Left(exception))
         } else None
-      }(breakOut)
+      }.toMap
 
     val leaderIsrAndControllerEpochs: Map[TopicPartition, Either[Exception, LeaderIsrAndControllerEpoch]] =
       (leaderAndIsrsWithoutReplica ++ finishedPartitions).map { case (partition, result: Either[Exception, LeaderAndIsr]) =>
@@ -381,15 +380,15 @@ class ZkReplicaStateMachine(config: KafkaConfig,
       zkClient.getTopicPartitionStatesRaw(partitions)
     } catch {
       case e: Exception =>
-        return (partitions.map(_ -> Left(e))(breakOut), Seq.empty)
+        return (partitions.iterator.map(_ -> Left(e)).toMap, Seq.empty)
     }
 
     val partitionsWithNoLeaderAndIsrInZk = mutable.Buffer.empty[TopicPartition]
     val result = mutable.Map.empty[TopicPartition, Either[Exception, LeaderAndIsr]]
 
-    getDataResponses.foreach { getDataResponse =>
+    getDataResponses.foreach[Unit] { getDataResponse =>
       val partition = getDataResponse.ctx.get.asInstanceOf[TopicPartition]
-      val _: Unit = if (getDataResponse.resultCode == Code.OK) {
+      if (getDataResponse.resultCode == Code.OK) {
         TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat) match {
           case None =>
             partitionsWithNoLeaderAndIsrInZk += partition
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 0bd1572..7f6641c 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -557,7 +557,7 @@ class GroupCoordinator(val brokerId: Int,
                              offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
                              responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit = {
     validateGroupStatus(groupId, ApiKeys.TXN_OFFSET_COMMIT) match {
-      case Some(error) => responseCallback(offsetMetadata.mapValues(_ => error))
+      case Some(error) => responseCallback(offsetMetadata.map { case (k, _) => k -> error })
       case None =>
         val group = groupManager.getGroup(groupId).getOrElse {
           groupManager.addGroup(new GroupMetadata(groupId, Empty, time))
@@ -573,7 +573,7 @@ class GroupCoordinator(val brokerId: Int,
                           offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
                           responseCallback: immutable.Map[TopicPartition, Errors] => Unit) {
     validateGroupStatus(groupId, ApiKeys.OFFSET_COMMIT) match {
-      case Some(error) => responseCallback(offsetMetadata.mapValues(_ => error))
+      case Some(error) => responseCallback(offsetMetadata.map { case (k, _) => k -> error })
       case None =>
         groupManager.getGroup(groupId) match {
           case None =>
@@ -584,7 +584,7 @@ class GroupCoordinator(val brokerId: Int,
                 offsetMetadata, responseCallback)
             } else {
               // or this is a request coming from an older generation. either way, reject the commit
-              responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION))
+              responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.ILLEGAL_GENERATION })
             }
 
           case Some(group) =>
@@ -616,17 +616,17 @@ class GroupCoordinator(val brokerId: Int,
         // from the coordinator metadata; it is likely that the group has migrated to some other
         // coordinator OR the group is in a transient unstable phase. Let the member retry
         // finding the correct coordinator and rejoin.
-        responseCallback(offsetMetadata.mapValues(_ => Errors.COORDINATOR_NOT_AVAILABLE))
+        responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.COORDINATOR_NOT_AVAILABLE })
       } else if (group.isStaticMemberFenced(memberId, groupInstanceId)) {
-        responseCallback(offsetMetadata.mapValues(_ => Errors.FENCED_INSTANCE_ID))
+        responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.FENCED_INSTANCE_ID })
       } else if ((generationId < 0 && group.is(Empty)) || (producerId != NO_PRODUCER_ID)) {
         // The group is only using Kafka to store offsets.
         // Also, for transactional offset commits we don't need to validate group membership and the generation.
         groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback, producerId, producerEpoch)
       } else if (!group.has(memberId)) {
-        responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID))
+        responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.UNKNOWN_MEMBER_ID })
       } else if (generationId != group.generationId) {
-        responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION))
+        responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.ILLEGAL_GENERATION })
       } else {
         group.currentState match {
           case Stable | PreparingRebalance =>
@@ -641,7 +641,7 @@ class GroupCoordinator(val brokerId: Int,
             // but since the consumer's member.id and generation is valid, it means it has received
             // the latest group generation information from the JoinResponse.
             // So let's return a REBALANCE_IN_PROGRESS to let consumer handle it gracefully.
-            responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS))
+            responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.REBALANCE_IN_PROGRESS })
 
           case _ =>
             throw new RuntimeException(s"Logic error: unexpected group state ${group.currentState}")
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 40643a4..1efbfa0 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -46,7 +46,7 @@ import org.apache.kafka.common.utils.{Time, Utils}
 
 import scala.collection.JavaConverters._
 import scala.collection._
-import scala.collection.mutable.ListBuffer
+import scala.collection.mutable.ArrayBuffer
 
 class GroupMetadataManager(brokerId: Int,
                            interBrokerProtocolVersion: ApiVersion,
@@ -312,7 +312,7 @@ class GroupMetadataManager(brokerId: Int,
     // construct the message set to append
     if (filteredOffsetMetadata.isEmpty) {
       // compute the final error codes for the commit response
-      val commitStatus = offsetMetadata.mapValues(_ => Errors.OFFSET_METADATA_TOO_LARGE)
+      val commitStatus = offsetMetadata.map { case (k, v) => k -> Errors.OFFSET_METADATA_TOO_LARGE }
       responseCallback(commitStatus)
       None
     } else {
@@ -757,7 +757,7 @@ class GroupMetadataManager(brokerId: Int,
         val timestamp = time.milliseconds()
 
           replicaManager.nonOfflinePartition(appendPartition).foreach { partition =>
-            val tombstones = ListBuffer.empty[SimpleRecord]
+            val tombstones = ArrayBuffer.empty[SimpleRecord]
             removedOffsets.foreach { case (topicPartition, offsetAndMetadata) =>
               trace(s"Removing expired/deleted offset and metadata for $groupId, $topicPartition: $offsetAndMetadata")
               val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition)
@@ -780,7 +780,7 @@ class GroupMetadataManager(brokerId: Int,
               try {
                 // do not need to require acks since even if the tombstone is lost,
                 // it will be appended again in the next purge cycle
-                val records = MemoryRecords.withRecords(magicValue, 0L, compressionType, timestampType, tombstones: _*)
+                val records = MemoryRecords.withRecords(magicValue, 0L, compressionType, timestampType, tombstones.toArray: _*)
                 partition.appendRecordsToLeader(records, isFromClient = false, requiredAcks = 0)
 
                 offsetsRemoved += removedOffsets.size
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index cdeb4e6..e09f2dc 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -36,7 +36,7 @@ import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Time
 
 import scala.collection.JavaConverters._
-import scala.collection.{Iterable, Set, mutable}
+import scala.collection.{Iterable, Seq, Set, mutable}
 import scala.util.control.ControlThrowable
 
 /**
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index f8dce22..ed3d526 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -32,7 +32,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.errors.KafkaStorageException
 
-import scala.collection.{Iterable, immutable, mutable}
+import scala.collection.{Iterable, Seq, immutable, mutable}
 
 private[log] sealed trait LogCleaningState
 private[log] case object LogCleaningInProgress extends LogCleaningState
@@ -356,7 +356,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
       val checkpoint = checkpoints(dataDir)
       if (checkpoint != null) {
         try {
-          val existing = checkpoint.read().filterKeys(logs.keys) ++ update
+          val existing = checkpoint.read().filter { case (k, _) => logs.keys.contains(k) } ++ update
           checkpoint.write(existing)
         } catch {
           case e: KafkaStorageException =>
@@ -393,7 +393,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
   def handleLogDirFailure(dir: String) {
     info(s"Stopping cleaning logs in dir $dir")
     inLock(lock) {
-      checkpoints = checkpoints.filterKeys(_.getAbsolutePath != dir)
+      checkpoints = checkpoints.filter { case (k, _) => k.getAbsolutePath != dir }
     }
   }
 
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 6455746..61b8231 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -103,7 +103,7 @@ class LogManager(logDirs: Seq[File],
   private val preferredLogDirs = new ConcurrentHashMap[TopicPartition, String]()
 
   private def offlineLogDirs: Iterable[File] = {
-    val logDirsSet = mutable.Set[File](logDirs: _*)
+    val logDirsSet = mutable.Set[File]() ++= logDirs
     _liveLogDirs.asScala.foreach(logDirsSet -=)
     logDirsSet
   }
@@ -607,7 +607,7 @@ class LogManager(logDirs: Seq[File],
       partitionToLog <- logsByDir.get(dir.getAbsolutePath)
       checkpoint <- recoveryPointCheckpoints.get(dir)
     } {
-      checkpoint.write(partitionToLog.mapValues(_.recoveryPoint))
+      checkpoint.write(partitionToLog.map { case (tp, log) => tp -> log.recoveryPoint })
     }
   }
 
@@ -620,9 +620,9 @@ class LogManager(logDirs: Seq[File],
       checkpoint <- logStartOffsetCheckpoints.get(dir)
     } {
       try {
-        val logStartOffsets = partitionToLog.filter { case (_, log) =>
-          log.logStartOffset > log.logSegments.head.baseOffset
-        }.mapValues(_.logStartOffset)
+        val logStartOffsets = partitionToLog.collect {
+          case (k, log) if log.logStartOffset > log.logSegments.head.baseOffset => k -> log.logStartOffset
+        }
         checkpoint.write(logStartOffsets)
       } catch {
         case e: IOException =>
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index d7088a6..775ea6e 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCom
 import org.apache.kafka.common.record.{AbstractRecords, CompressionType, InvalidRecordException, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType, BufferSupplier}
 import org.apache.kafka.common.utils.Time
 
-import scala.collection.mutable
+import scala.collection.{Seq, mutable}
 import scala.collection.JavaConverters._
 
 private[kafka] object LogValidator extends Logging {
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
index 6d35539..a373af0 100755
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
@@ -23,6 +23,7 @@ package kafka.metrics
 import kafka.utils.{CoreUtils, VerifiableProperties}
 import java.util.concurrent.atomic.AtomicBoolean
 
+import scala.collection.Seq
 import scala.collection.mutable.ArrayBuffer
 
 
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index e39babf..2eba792 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -221,10 +221,12 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
 
   override def getAcls(principal: KafkaPrincipal): Map[Resource, Set[Acl]] = {
     inReadLock(lock) {
-      aclCache.mapValues { versionedAcls =>
-        versionedAcls.acls.filter(_.principal == principal)
-      }.filter { case (_, acls) =>
-        acls.nonEmpty
+      unorderedAcls.flatMap { case (k, versionedAcls) =>
+        val aclsForPrincipal = versionedAcls.acls.filter(_.principal == principal)
+        if (aclsForPrincipal.nonEmpty)
+          Some(k -> aclsForPrincipal)
+        else
+          None
       }
     }
   }
@@ -243,7 +245,8 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
         .from(Resource(resourceType, resourceName, PatternType.PREFIXED))
         .to(Resource(resourceType, resourceName.take(1), PatternType.PREFIXED))
         .filterKeys(resource => resourceName.startsWith(resource.name))
-        .flatMap { case (resource, versionedAcls) => versionedAcls.acls }
+        .values
+        .flatMap { _.acls }
         .toSet
 
       prefixed ++ wildcard ++ literal
@@ -252,7 +255,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
 
   override def getAcls(): Map[Resource, Set[Acl]] = {
     inReadLock(lock) {
-      aclCache.mapValues(_.acls)
+      unorderedAcls.map { case (k, v) => k -> v.acls }
     }
   }
 
@@ -356,6 +359,10 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
     }
   }
 
+  // Returns Map instead of SortedMap since most callers don't care about ordering. In Scala 2.13, mapping from SortedMap
+  // to Map is restricted by default
+  private def unorderedAcls: Map[Resource, VersionedAcls] = aclCache
+
   private def getAclsFromCache(resource: Resource): VersionedAcls = {
     aclCache.getOrElse(resource, throw new IllegalArgumentException(s"ACLs do not exist in the cache for resource $resource"))
   }
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 0aec85a..2dd5944 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -31,7 +31,7 @@ import kafka.utils.CoreUtils.inLock
 import org.apache.kafka.common.protocol.Errors
 import AbstractFetcherThread._
 
-import scala.collection.{Map, Set, mutable}
+import scala.collection.{Map, Seq, Set, mutable}
 import scala.collection.JavaConverters._
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicLong
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index cfa599d..1ed55fb 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -88,8 +88,8 @@ class AdminManager(val config: KafkaConfig,
     val metadata = toCreate.values.map(topic =>
       try {
         val configs = new Properties()
-        topic.configs().asScala.foreach { case entry =>
-          configs.setProperty(entry.name(), entry.value())
+        topic.configs.asScala.foreach { entry =>
+          configs.setProperty(entry.name, entry.value)
         }
         LogConfig.validate(configs)
 
@@ -131,17 +131,9 @@ class AdminManager(val config: KafkaConfig,
             val javaAssignments = if (topic.assignments().isEmpty) {
               null
             } else {
-              val map = new java.util.HashMap[Integer, java.util.List[Integer]]
-              assignments.foreach {
-                case (k, v) => {
-                  val list = new java.util.ArrayList[Integer]
-                  v.foreach {
-                    case i => list.add(Integer.valueOf(i))
-                  }
-                  map.put(k, list)
-                }
-              }
-              map
+              assignments.map { case (k, v) =>
+                (k: java.lang.Integer) -> v.map(i => i: java.lang.Integer).asJava
+              }.asJava
             }
             val javaConfigs = new java.util.HashMap[String, String]
             topic.configs().asScala.foreach(config => javaConfigs.put(config.name(), config.value()))
@@ -169,7 +161,7 @@ class AdminManager(val config: KafkaConfig,
         case e: Throwable =>
           error(s"Error processing create topic request $topic", e)
           CreatePartitionsMetadata(topic.name, Map(), ApiError.fromThrowable(e))
-      })
+      }).toIndexedSeq
 
     // 2. if timeout <= 0, validateOnly or no topics can proceed return immediately
     if (timeout <= 0 || validateOnly || !metadata.exists(_.error.is(Errors.NONE))) {
@@ -184,9 +176,8 @@ class AdminManager(val config: KafkaConfig,
       responseCallback(results)
     } else {
       // 3. else pass the assignments and errors to the delayed operation and set the keys
-      val delayedCreate = new DelayedCreatePartitions(timeout, metadata.toSeq, this, responseCallback)
-      val delayedCreateKeys = toCreate.values.map(
-        topic => new TopicKey(topic.name())).toSeq
+      val delayedCreate = new DelayedCreatePartitions(timeout, metadata, this, responseCallback)
+      val delayedCreateKeys = toCreate.values.map(topic => new TopicKey(topic.name)).toIndexedSeq
       // try to complete the request immediately, otherwise put it into the purgatory
       topicPurgatory.tryCompleteElseWatch(delayedCreate, delayedCreateKeys)
     }
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 9817f21..959144c 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -310,7 +310,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
   }
 
   private def quotaLimit(metricTags: util.Map[String, String]): Double = {
-    Option(quotaCallback.quotaLimit(clientQuotaType, metricTags)).map(_.toDouble)getOrElse(Long.MaxValue)
+    Option(quotaCallback.quotaLimit(clientQuotaType, metricTags)).map(_.toDouble).getOrElse(Long.MaxValue)
   }
 
   /*
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 5a33ea2..0c3362d 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -34,6 +34,7 @@ import org.apache.kafka.common.metrics.Quota._
 import org.apache.kafka.common.utils.Sanitizer
 
 import scala.collection.JavaConverters._
+import scala.collection.Seq
 import scala.util.Try
 
 /**
diff --git a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
index 852f72d..4af8252 100644
--- a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
+++ b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
@@ -115,7 +115,7 @@ class DelayedDeleteRecords(delayMs: Long,
    * Upon completion, return the current response status along with the error code per partition
    */
   override def onComplete() {
-    val responseStatus = deleteRecordsStatus.mapValues(status => status.responseStatus)
+    val responseStatus = deleteRecordsStatus.map { case (k, status) => k -> status.responseStatus }
     responseCallback(responseStatus)
   }
 }
diff --git a/core/src/main/scala/kafka/server/DelayedElectLeader.scala b/core/src/main/scala/kafka/server/DelayedElectLeader.scala
index 9a1546b..95b577a 100644
--- a/core/src/main/scala/kafka/server/DelayedElectLeader.scala
+++ b/core/src/main/scala/kafka/server/DelayedElectLeader.scala
@@ -34,8 +34,8 @@ class DelayedElectLeader(
   responseCallback: Map[TopicPartition, ApiError] => Unit
 ) extends DelayedOperation(delayMs) {
 
-  var waitingPartitions = expectedLeaders
-  val fullResults = results.to[mutable.Set]
+  private var waitingPartitions = expectedLeaders
+  private val fullResults = mutable.Map() ++= results
 
 
   /**
@@ -50,8 +50,8 @@ class DelayedElectLeader(
   override def onComplete(): Unit = {
     // This could be called to force complete, so I need the full list of partitions, so I can time them all out.
     updateWaiting()
-    val timedout = waitingPartitions.map{
-      case (tp, leader) => tp -> new ApiError(Errors.REQUEST_TIMED_OUT, null)
+    val timedout = waitingPartitions.map {
+      case (tp, _) => tp -> new ApiError(Errors.REQUEST_TIMED_OUT, null)
     }.toMap
     responseCallback(timedout ++ fullResults)
   }
@@ -70,7 +70,7 @@ class DelayedElectLeader(
   }
 
   private def updateWaiting() = {
-    waitingPartitions.foreach{case (tp, leader) =>
+    waitingPartitions.foreach { case (tp, leader) =>
       val ps = replicaManager.metadataCache.getPartitionInfo(tp.topic, tp.partition)
       ps match {
         case Some(ps) =>
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala
index f1d1407..14b6aca 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -125,7 +125,7 @@ class DelayedProduce(delayMs: Long,
    * Upon completion, return the current response status along with the error code per partition
    */
   override def onComplete() {
-    val responseStatus = produceMetadata.produceStatus.mapValues(status => status.responseStatus)
+    val responseStatus = produceMetadata.produceStatus.map { case (k, status) => k -> status.responseStatus }
     responseCallback(responseStatus)
   }
 }
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 4f27226..c384afb 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -732,7 +732,7 @@ class DynamicMetricsReporters(brokerId: Int, server: KafkaServer) extends Reconf
       case reporter: Reconfigurable => dynamicConfig.maybeReconfigure(reporter, dynamicConfig.currentKafkaConfig, configs)
       case _ =>
     }
-    val added = updatedMetricsReporters -- currentReporters.keySet
+    val added = updatedMetricsReporters.filterNot(currentReporters.keySet)
     createReporters(added.asJava, configs)
   }
 
@@ -844,9 +844,9 @@ class DynamicListenerConfig(server: KafkaServer) extends BrokerReconfigurable wi
   def validateReconfiguration(newConfig: KafkaConfig): Unit = {
 
     def immutableListenerConfigs(kafkaConfig: KafkaConfig, prefix: String): Map[String, AnyRef] = {
-      newConfig.originals.asScala
-        .filterKeys(_.startsWith(prefix))
-        .filterKeys(k => !DynamicSecurityConfigs.contains(k))
+      newConfig.originals.asScala.filter { case (key, _) =>
+        key.startsWith(prefix) && !DynamicSecurityConfigs.contains(key)
+      }
     }
 
     val oldConfig = server.config
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index d06c2fb..7ce7d99 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -83,7 +83,7 @@ import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{Node, TopicPartition}
 
 import scala.collection.JavaConverters._
-import scala.collection._
+import scala.collection.{Map, Seq, Set, immutable, mutable}
 import scala.collection.mutable.ArrayBuffer
 import scala.util.{Failure, Success, Try}
 
@@ -316,7 +316,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
     val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
     // the callback for sending an offset commit response
-    def sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Errors]) {
+    def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]) {
       val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ nonExistingTopicErrors
       if (isDebugEnabled)
         combinedCommitStatus.foreach { case (topicPartition, error) =>
@@ -402,13 +402,13 @@ class KafkaApis(val requestChannel: RequestChannel,
         //   - If v2/v3/v4 (no explicit commit timestamp) we treat it the same as v5.
         //   - For v5 and beyond there is no per partition expiration timestamp, so this field is no longer in effect
         val currentTimestamp = time.milliseconds
-        val partitionData = authorizedTopicRequestInfo.mapValues { partitionData =>
+        val partitionData = authorizedTopicRequestInfo.map { case (k, partitionData) =>
           val metadata = if (partitionData.committedMetadata() == null)
             OffsetAndMetadata.NoMetadata
           else
             partitionData.committedMetadata()
 
-          new OffsetAndMetadata(
+          k -> new OffsetAndMetadata(
             offset = partitionData.committedOffset(),
             leaderEpoch = Optional.ofNullable[Integer](partitionData.committedLeaderEpoch),
             metadata = metadata,
@@ -1595,22 +1595,21 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
       })
       val toCreate = mutable.Map[String, CreatableTopic]()
-      createTopicsRequest.data.topics.asScala.foreach { case topic =>
-        if (results.find(topic.name()).errorCode() == 0) {
-          toCreate += topic.name() -> topic
+      createTopicsRequest.data.topics.asScala.foreach { topic =>
+        if (results.find(topic.name).errorCode == Errors.NONE.code) {
+          toCreate += topic.name -> topic
         }
       }
       def handleCreateTopicsResults(errors: Map[String, ApiError]): Unit = {
-        errors.foreach {
-          case (topicName, error) =>
-            results.find(topicName).
-              setErrorCode(error.error().code()).
-              setErrorMessage(error.message())
+        errors.foreach { case (topicName, error) =>
+          results.find(topicName).
+            setErrorCode(error.error.code).
+            setErrorMessage(error.message)
         }
         sendResponseCallback(results)
       }
-      adminManager.createTopics(createTopicsRequest.data.timeoutMs(),
-          createTopicsRequest.data.validateOnly(),
+      adminManager.createTopics(createTopicsRequest.data.timeoutMs,
+          createTopicsRequest.data.validateOnly,
           toCreate,
           handleCreateTopicsResults)
     }
@@ -1849,7 +1848,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       */
     def maybeSendResponseCallback(producerId: Long, result: TransactionResult)(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {
       trace(s"End transaction marker append for producer id $producerId completed with status: $responseStatus")
-      val currentErrors = new ConcurrentHashMap[TopicPartition, Errors](responseStatus.mapValues(_.error).asJava)
+      val currentErrors = new ConcurrentHashMap[TopicPartition, Errors](responseStatus.map { case (k, v) => k -> v.error }.asJava)
       updateErrors(producerId, currentErrors)
       val successfulOffsetsPartitions = responseStatus.filter { case (topicPartition, partitionResponse) =>
         topicPartition.topic == GROUP_METADATA_TOPIC_NAME && partitionResponse.error == Errors.NONE
@@ -2503,7 +2502,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     if (!authorize(request.session, Alter, Resource.ClusterResource)) {
       val error = new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, null)
       val partitionErrors: Map[TopicPartition, ApiError] =
-        electionRequest.topicPartitions.map(partition => partition -> error)(breakOut)
+        electionRequest.topicPartitions.iterator.map(partition => partition -> error).toMap
 
       sendResponseCallback(error)(partitionErrors)
     } else {
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 60af03c..5123771 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -39,7 +39,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.Utils
 
 import scala.collection.JavaConverters._
-import scala.collection.Map
+import scala.collection.{Map, Seq}
 
 object Defaults {
   /** ********* Zookeeper Configuration ***********/
diff --git a/core/src/main/scala/kafka/server/KafkaServerStartable.scala b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
index bbe9aba..f5fc8ce 100644
--- a/core/src/main/scala/kafka/server/KafkaServerStartable.scala
+++ b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
@@ -22,6 +22,8 @@ import java.util.Properties
 import kafka.metrics.KafkaMetricsReporter
 import kafka.utils.{Exit, Logging, VerifiableProperties}
 
+import scala.collection.Seq
+
 object KafkaServerStartable {
   def fromProps(serverProps: Properties) = {
     val reporters = KafkaMetricsReporter.startReporters(new VerifiableProperties(serverProps))
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index b107974..71f38c2 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -45,11 +45,11 @@ import org.apache.kafka.common.requests.EpochEndOffset._
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
-import org.apache.kafka.common.requests._
+import org.apache.kafka.common.requests.{ApiError, DeleteRecordsResponse, DescribeLogDirsResponse, EpochEndOffset, IsolationLevel, LeaderAndIsrRequest, LeaderAndIsrResponse, OffsetsForLeaderEpochRequest, StopReplicaRequest, UpdateMetadataRequest}
 import org.apache.kafka.common.utils.Time
 
 import scala.collection.JavaConverters._
-import scala.collection._
+import scala.collection.{Map, Seq, Set, mutable}
 
 /*
  * Result metadata of a log append operation on the log
@@ -193,7 +193,7 @@ class ReplicaManager(val config: KafkaConfig,
   val replicaFetcherManager = createReplicaFetcherManager(metrics, time, threadNamePrefix, quotaManagers.follower)
   val replicaAlterLogDirsManager = createReplicaAlterLogDirsManager(quotaManagers.alterLogDirs, brokerTopicStats)
   private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
-  @volatile var highWatermarkCheckpoints = logManager.liveLogDirs.map(dir =>
+  @volatile var highWatermarkCheckpoints: Map[String, OffsetCheckpointFile] = logManager.liveLogDirs.map(dir =>
     (dir.getAbsolutePath, new OffsetCheckpointFile(new File(dir, ReplicaManager.HighWatermarkFilename), logDirFailureChannel))).toMap
 
   this.logIdent = s"[ReplicaManager broker=$localBrokerId] "
@@ -487,7 +487,7 @@ class ReplicaManager(val config: KafkaConfig,
                   new PartitionResponse(result.error, result.info.firstOffset.getOrElse(-1), result.info.logAppendTime, result.info.logStartOffset)) // response status
       }
 
-      recordConversionStatsCallback(localProduceResults.mapValues(_.info.recordConversionStats))
+      recordConversionStatsCallback(localProduceResults.map { case (k, v) => k -> v.info.recordConversionStats })
 
       if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
         // create delayed produce operation
@@ -504,7 +504,7 @@ class ReplicaManager(val config: KafkaConfig,
 
       } else {
         // we can respond immediately
-        val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus)
+        val produceResponseStatus = produceStatus.map { case (k, status) => k -> status.responseStatus }
         responseCallback(produceResponseStatus)
       }
     } else {
@@ -709,7 +709,7 @@ class ReplicaManager(val config: KafkaConfig,
       delayedDeleteRecordsPurgatory.tryCompleteElseWatch(delayedDeleteRecords, deleteRecordsRequestKeys)
     } else {
       // we can respond immediately
-      val deleteRecordsResponseStatus = deleteRecordsStatus.mapValues(status => status.responseStatus)
+      val deleteRecordsResponseStatus = deleteRecordsStatus.map { case (k, status) => k -> status.responseStatus }
       responseCallback(deleteRecordsResponseStatus)
     }
   }
@@ -1481,7 +1481,7 @@ class ReplicaManager(val config: KafkaConfig,
       newOfflinePartitions.map(_.topic).foreach { topic: String =>
         maybeRemoveTopicMetrics(topic)
       }
-      highWatermarkCheckpoints = highWatermarkCheckpoints.filterKeys(_ != dir)
+      highWatermarkCheckpoints = highWatermarkCheckpoints.filter { case (checkpointDir, _) => checkpointDir != dir }
 
       info(s"Broker $localBrokerId stopped fetcher for partitions ${newOfflinePartitions.mkString(",")} and stopped moving logs " +
            s"for partitions ${partitionsWithOfflineFutureReplica.mkString(",")} because they are in the failed log directory $dir.")
@@ -1566,9 +1566,9 @@ class ReplicaManager(val config: KafkaConfig,
       }
 
       if (expectedLeaders.nonEmpty) {
-        val watchKeys: Seq[TopicPartitionOperationKey] = expectedLeaders.map{
+        val watchKeys = expectedLeaders.iterator.map {
           case (tp, _) => TopicPartitionOperationKey(tp)
-        }(breakOut)
+        }.toIndexedSeq
 
         delayedElectLeaderPurgatory.tryCompleteElseWatch(
           new DelayedElectLeader(
diff --git a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
index 7835c9d..59d8172 100644
--- a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
@@ -17,13 +17,15 @@
 package kafka.server
 
 import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+import java.util.concurrent.locks.ReentrantReadWriteLock
+
+import scala.collection.Seq
 
 import kafka.server.Constants._
 import kafka.server.ReplicationQuotaManagerConfig._
 import kafka.utils.CoreUtils._
 import kafka.utils.Logging
 import org.apache.kafka.common.metrics._
-import java.util.concurrent.locks.ReentrantReadWriteLock
 
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.stats.SimpleRate
diff --git a/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
index 1878ae2..c14c1ef 100644
--- a/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
+++ b/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
@@ -45,7 +45,7 @@ class CheckpointFile[T](val file: File,
   try Files.createFile(file.toPath) // create the file if it doesn't exist
   catch { case _: FileAlreadyExistsException => }
 
-  def write(entries: Seq[T]) {
+  def write(entries: Iterable[T]) {
     lock synchronized {
       try {
         // write to temp file and then swap with the existing file
diff --git a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
index 69e62d2..b330368 100644
--- a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
+++ b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
@@ -56,7 +56,7 @@ class OffsetCheckpointFile(val file: File, logDirFailureChannel: LogDirFailureCh
   val checkpoint = new CheckpointFile[(TopicPartition, Long)](file, OffsetCheckpointFile.CurrentVersion,
     OffsetCheckpointFile.Formatter, logDirFailureChannel, file.getParent)
 
-  def write(offsets: Map[TopicPartition, Long]): Unit = checkpoint.write(offsets.toSeq)
+  def write(offsets: Map[TopicPartition, Long]): Unit = checkpoint.write(offsets)
 
   def read(): Map[TopicPartition, Long] = checkpoint.read().toMap
 
diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
index 7219ee1..e6dd064 100644
--- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
+++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
@@ -24,7 +24,8 @@ import kafka.utils.CoreUtils._
 import kafka.utils.Logging
 import org.apache.kafka.common.TopicPartition
 
-import scala.collection.mutable.ListBuffer
+import scala.collection.Seq
+import scala.collection.mutable.ArrayBuffer
 
 /**
  * Represents a cache of (LeaderEpoch => Offset) mappings for a particular replica.
@@ -42,7 +43,10 @@ class LeaderEpochFileCache(topicPartition: TopicPartition,
   this.logIdent = s"[LeaderEpochCache $topicPartition] "
 
   private val lock = new ReentrantReadWriteLock()
-  private var epochs: ListBuffer[EpochEntry] = inWriteLock(lock) { ListBuffer(checkpoint.read(): _*) }
+  private var epochs: ArrayBuffer[EpochEntry] = inWriteLock(lock) {
+    val read = checkpoint.read()
+    new ArrayBuffer(read.size) ++= read
+  }
 
   /**
     * Assigns the supplied Leader Epoch to the supplied Offset
@@ -223,7 +227,7 @@ class LeaderEpochFileCache(topicPartition: TopicPartition,
   }
 
   // Visible for testing
-  def epochEntries: ListBuffer[EpochEntry] = {
+  def epochEntries: Seq[EpochEntry] = {
     epochs
   }
 
diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
index 3474974..b8b2ad6 100644
--- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala
+++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
@@ -28,6 +28,7 @@ import org.apache.kafka.common.requests.ListOffsetRequest
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 
 import scala.collection.JavaConverters._
+import scala.collection.Seq
 
 object GetOffsetShell {
 
@@ -127,7 +128,9 @@ object GetOffsetShell {
       case ListOffsetRequest.LATEST_TIMESTAMP => consumer.endOffsets(topicPartitions.asJava).asScala
       case _ =>
         val timestampsToSearch = topicPartitions.map(tp => tp -> (listOffsetsTimestamp: java.lang.Long)).toMap.asJava
-        consumer.offsetsForTimes(timestampsToSearch).asScala.mapValues(x => if (x == null) null else x.offset)
+        consumer.offsetsForTimes(timestampsToSearch).asScala.map { case (k, x) =>
+          if (x == null) (k, null) else (k, x.offset: java.lang.Long)
+        }
     }
 
     partitionOffsets.toSeq.sortBy { case (tp, _) => tp.partition }.foreach { case (tp, offset) =>
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 2afec15..e3f969b 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -43,6 +43,7 @@ import org.apache.kafka.common.utils.{LogContext, Time}
 import org.apache.kafka.common.{Node, TopicPartition}
 
 import scala.collection.JavaConverters._
+import scala.collection.Seq
 
 /**
  * For verifying the consistency among replicas.
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 6ec59d8..09f1240 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -38,6 +38,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
+import scala.collection.JavaConverters;
 
 import java.io.IOException;
 import java.text.ParseException;
@@ -261,22 +262,29 @@ public class StreamsResetter {
             CommandLineUtils.printUsageAndDie(optionParser, "Only one of --dry-run and --execute can be specified");
         }
 
-        final scala.collection.immutable.HashSet<OptionSpec<?>> allScenarioOptions = new scala.collection.immutable.HashSet<>();
-        allScenarioOptions.$plus(toOffsetOption);
-        allScenarioOptions.$plus(toDatetimeOption);
-        allScenarioOptions.$plus(byDurationOption);
-        allScenarioOptions.$plus(toEarliestOption);
-        allScenarioOptions.$plus(toLatestOption);
-        allScenarioOptions.$plus(fromFileOption);
-        allScenarioOptions.$plus(shiftByOption);
-
-        CommandLineUtils.checkInvalidArgs(optionParser, options, toOffsetOption, allScenarioOptions.$minus(toOffsetOption));
-        CommandLineUtils.checkInvalidArgs(optionParser, options, toDatetimeOption, allScenarioOptions.$minus(toDatetimeOption));
-        CommandLineUtils.checkInvalidArgs(optionParser, options, byDurationOption, allScenarioOptions.$minus(byDurationOption));
-        CommandLineUtils.checkInvalidArgs(optionParser, options, toEarliestOption, allScenarioOptions.$minus(toEarliestOption));
-        CommandLineUtils.checkInvalidArgs(optionParser, options, toLatestOption, allScenarioOptions.$minus(toLatestOption));
-        CommandLineUtils.checkInvalidArgs(optionParser, options, fromFileOption, allScenarioOptions.$minus(fromFileOption));
-        CommandLineUtils.checkInvalidArgs(optionParser, options, shiftByOption, allScenarioOptions.$minus(shiftByOption));
+        final Set<OptionSpec<?>> allScenarioOptions = new HashSet<>();
+        allScenarioOptions.add(toOffsetOption);
+        allScenarioOptions.add(toDatetimeOption);
+        allScenarioOptions.add(byDurationOption);
+        allScenarioOptions.add(toEarliestOption);
+        allScenarioOptions.add(toLatestOption);
+        allScenarioOptions.add(fromFileOption);
+        allScenarioOptions.add(shiftByOption);
+
+        checkInvalidArgs(optionParser, options, allScenarioOptions, toOffsetOption);
+        checkInvalidArgs(optionParser, options, allScenarioOptions, toDatetimeOption);
+        checkInvalidArgs(optionParser, options, allScenarioOptions, byDurationOption);
+        checkInvalidArgs(optionParser, options, allScenarioOptions, toEarliestOption);
+        checkInvalidArgs(optionParser, options, allScenarioOptions, toLatestOption);
+        checkInvalidArgs(optionParser, options, allScenarioOptions, fromFileOption);
+        checkInvalidArgs(optionParser, options, allScenarioOptions, shiftByOption);
+    }
+
+    private <T> void checkInvalidArgs(OptionParser optionParser, OptionSet options, Set<OptionSpec<?>> allOptions,
+                                  OptionSpec<T> option) {
+        Set<OptionSpec<?>> invalidOptions = new HashSet<>(allOptions);
+        invalidOptions.remove(option);
+        CommandLineUtils.checkInvalidArgs(optionParser, options, option, JavaConverters.asScalaSetConverter(invalidOptions).asScala());
     }
 
     private int maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map consumerConfig,
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 778cc0d..8f1fd78 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -28,7 +28,7 @@ import com.typesafe.scalalogging.Logger
 import javax.management._
 
 import scala.collection._
-import scala.collection.mutable
+import scala.collection.{Seq, mutable}
 import kafka.cluster.EndPoint
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.SecurityProtocol
@@ -50,10 +50,10 @@ object CoreUtils {
   private val logger = Logger(getClass)
 
   /**
-   * Return the smallest element in `traversable` if it is not empty. Otherwise return `ifEmpty`.
+   * Return the smallest element in `iterable` if it is not empty. Otherwise return `ifEmpty`.
    */
-  def min[A, B >: A](traversable: TraversableOnce[A], ifEmpty: A)(implicit cmp: Ordering[B]): A =
-    if (traversable.isEmpty) ifEmpty else traversable.min(cmp)
+  def min[A, B >: A](iterable: Iterable[A], ifEmpty: A)(implicit cmp: Ordering[B]): A =
+    if (iterable.isEmpty) ifEmpty else iterable.min(cmp)
 
   /**
    * Wrap the given function in a java.lang.Runnable
@@ -260,31 +260,6 @@ object CoreUtils {
 
   def inWriteLock[T](lock: ReadWriteLock)(fun: => T): T = inLock[T](lock.writeLock)(fun)
 
-
-  //JSON strings need to be escaped based on ECMA-404 standard http://json.org
-  def JSONEscapeString (s : String) : String = {
-    s.map {
-      case '"'  => "\\\""
-      case '\\' => "\\\\"
-      case '/'  => "\\/"
-      case '\b' => "\\b"
-      case '\f' => "\\f"
-      case '\n' => "\\n"
-      case '\r' => "\\r"
-      case '\t' => "\\t"
-      /* We'll unicode escape any control characters. These include:
-       * 0x0 -> 0x1f  : ASCII Control (C0 Control Codes)
-       * 0x7f         : ASCII DELETE
-       * 0x80 -> 0x9f : C1 Control Codes
-       *
-       * Per RFC4627, section 2.5, we're not technically required to
-       * encode the C1 codes, but we do to be safe.
-       */
-      case c if (c >= '\u0000' && c <= '\u001f') || (c >= '\u007f' && c <= '\u009f') => "\\u%04x".format(c: Int)
-      case c => c
-    }.mkString
-  }
-
   /**
    * Returns a list of duplicated items
    */
diff --git a/core/src/main/scala/kafka/utils/json/DecodeJson.scala b/core/src/main/scala/kafka/utils/json/DecodeJson.scala
index 0b57fa7..6962c03 100644
--- a/core/src/main/scala/kafka/utils/json/DecodeJson.scala
+++ b/core/src/main/scala/kafka/utils/json/DecodeJson.scala
@@ -17,10 +17,10 @@
 
 package kafka.utils.json
 
-import scala.collection._
+import scala.collection.{Map, Seq}
+import scala.collection.compat._
 import scala.language.higherKinds
-import JavaConverters._
-import generic.CanBuildFrom
+import scala.collection.JavaConverters._
 
 import com.fasterxml.jackson.databind.{JsonMappingException, JsonNode}
 
@@ -88,7 +88,7 @@ object DecodeJson {
     }
   }
 
-  implicit def decodeSeq[E, S[+T] <: Seq[E]](implicit decodeJson: DecodeJson[E], cbf: CanBuildFrom[Nothing, E, S[E]]): DecodeJson[S[E]] = new DecodeJson[S[E]] {
+  implicit def decodeSeq[E, S[+T] <: Seq[E]](implicit decodeJson: DecodeJson[E], factory: Factory[E, S[E]]): DecodeJson[S[E]] = new DecodeJson[S[E]] {
     def decodeEither(node: JsonNode): Either[String, S[E]] = {
       if (node.isArray)
         decodeIterator(node.elements.asScala)(decodeJson.decodeEither)
@@ -96,16 +96,16 @@ object DecodeJson {
     }
   }
 
-  implicit def decodeMap[V, M[K, +V] <: Map[K, V]](implicit decodeJson: DecodeJson[V], cbf: CanBuildFrom[Nothing, (String, V), M[String, V]]): DecodeJson[M[String, V]] = new DecodeJson[M[String, V]] {
+  implicit def decodeMap[V, M[K, +V] <: Map[K, V]](implicit decodeJson: DecodeJson[V], factory: Factory[(String, V), M[String, V]]): DecodeJson[M[String, V]] = new DecodeJson[M[String, V]] {
     def decodeEither(node: JsonNode): Either[String, M[String, V]] = {
       if (node.isObject)
-        decodeIterator(node.fields.asScala)(e => decodeJson.decodeEither(e.getValue).right.map(v => (e.getKey, v)))(cbf)
+        decodeIterator(node.fields.asScala)(e => decodeJson.decodeEither(e.getValue).right.map(v => (e.getKey, v)))
       else Left(s"Expected JSON object, received $node")
     }
   }
 
-  private def decodeIterator[S, T, C](it: Iterator[S])(f: S => Either[String, T])(implicit cbf: CanBuildFrom[Nothing, T, C]): Either[String, C] = {
-    val result = cbf()
+  private def decodeIterator[S, T, C](it: Iterator[S])(f: S => Either[String, T])(implicit factory: Factory[T, C]): Either[String, C] = {
+    val result = factory.newBuilder
     while (it.hasNext) {
       f(it.next) match {
         case Right(x) => result += x
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 572938e..ce82492 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -38,9 +38,7 @@ import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
 import org.apache.zookeeper.OpResult.{CreateResult, ErrorResult, SetDataResult}
 import org.apache.zookeeper.data.{ACL, Stat}
 import org.apache.zookeeper.{CreateMode, KeeperException, ZooKeeper}
-import scala.collection.Seq
-import scala.collection.breakOut
-import scala.collection.mutable
+import scala.collection.{Map, Seq, mutable}
 
 /**
  * Provides higher level Kafka-specific operations on top of the pipelined [[kafka.zookeeper.ZooKeeperClient]].
@@ -261,11 +259,11 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
     } catch {
       case e: ControllerMovedException => throw e
       case e: Exception =>
-        return UpdateLeaderAndIsrResult(leaderAndIsrs.keys.map(_ -> Left(e))(breakOut), Seq.empty)
+        return UpdateLeaderAndIsrResult(leaderAndIsrs.keys.iterator.map(_ -> Left(e)).toMap, Seq.empty)
     }
 
     val updatesToRetry = mutable.Buffer.empty[TopicPartition]
-    val finished: Map[TopicPartition, Either[Exception, LeaderAndIsr]] = setDataResponses.flatMap { setDataResponse =>
+    val finished = setDataResponses.iterator.flatMap { setDataResponse =>
       val partition = setDataResponse.ctx.get.asInstanceOf[TopicPartition]
       setDataResponse.resultCode match {
         case Code.OK =>
@@ -278,7 +276,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
         case _ =>
           Some(partition -> Left(setDataResponse.resultException.get))
       }
-    }(breakOut)
+    }.toMap
 
     UpdateLeaderAndIsrResult(finished, updatesToRetry)
   }
@@ -1626,7 +1624,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
     retryRequestsUntilConnected(createRequests, expectedControllerEpochZkVersion)
   }
 
-  private def createTopicPartitions(topics: Seq[String], expectedControllerEpochZkVersion: Int):Seq[CreateResponse] = {
+  private def createTopicPartitions(topics: Seq[String], expectedControllerEpochZkVersion: Int): Seq[CreateResponse] = {
     val createRequests = topics.map { topic =>
       val path = TopicPartitionsZNode.path(topic)
       CreateRequest(path, null, defaultAcls(path), CreateMode.PERSISTENT, Some(topic))
@@ -1635,9 +1633,9 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
   }
 
   private def getTopicConfigs(topics: Set[String]): Seq[GetDataResponse] = {
-    val getDataRequests: Seq[GetDataRequest] = topics.map { topic =>
+    val getDataRequests: Seq[GetDataRequest] = topics.iterator.map { topic =>
       GetDataRequest(ConfigEntityZNode.path(ConfigType.Topic, topic), ctx = Some(topic))
-    }(breakOut)
+    }.toIndexedSeq
 
     retryRequestsUntilConnected(getDataRequests)
   }
@@ -1662,7 +1660,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
   }
 
   private def retryRequestsUntilConnected[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = {
-    val remainingRequests = mutable.ArrayBuffer(requests: _*)
+    val remainingRequests = new mutable.ArrayBuffer(requests.size) ++= requests
     val responses = new mutable.ArrayBuffer[Req#Response]
     while (remainingRequests.nonEmpty) {
       val batchResponses = zooKeeperClient.handleRequests(remainingRequests)
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index f8ad5ea..66e8764 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -43,7 +43,7 @@ import org.apache.zookeeper.data.{ACL, Stat}
 import scala.beans.BeanProperty
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
-import scala.collection.{Seq, breakOut}
+import scala.collection.{Map, Seq}
 import scala.util.{Failure, Success, Try}
 
 // This file contains objects for encoding/decoding data stored in ZooKeeper nodes (znodes).
@@ -405,9 +405,9 @@ object ReassignPartitionsZNode {
 
   def decode(bytes: Array[Byte]): Either[JsonProcessingException, collection.Map[TopicPartition, Seq[Int]]] =
     Json.parseBytesAs[PartitionAssignment](bytes).right.map { partitionAssignment =>
-      partitionAssignment.partitions.asScala.map { replicaAssignment =>
+      partitionAssignment.partitions.asScala.iterator.map { replicaAssignment =>
         new TopicPartition(replicaAssignment.topic, replicaAssignment.partition) -> replicaAssignment.replicas.asScala
-      }(breakOut)
+      }.toMap
     }
 }
 
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index c193ff2..8ae8e17 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -35,6 +35,7 @@ import org.apache.zookeeper.data.{ACL, Stat}
 import org.apache.zookeeper._
 
 import scala.collection.JavaConverters._
+import scala.collection.Seq
 import scala.collection.mutable.Set
 
 /**
diff --git a/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
index 1f4b5e2..28b1f06 100644
--- a/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
@@ -20,6 +20,8 @@ package kafka.admin
 import java.io.{ByteArrayOutputStream, PrintStream}
 import java.nio.charset.StandardCharsets
 
+import scala.collection.Seq
+
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
@@ -33,7 +35,7 @@ class BrokerApiVersionsCommandTest extends KafkaServerTestHarness {
   def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps)
 
   @Test(timeout=120000)
-  def checkBrokerApiVersionCommandOutput() {
+  def checkBrokerApiVersionCommandOutput(): Unit = {
     val byteArrayOutputStream = new ByteArrayOutputStream
     val printStream = new PrintStream(byteArrayOutputStream, false, StandardCharsets.UTF_8.name())
     BrokerApiVersionsCommand.execute(Array("--bootstrap-server", brokerList), printStream)
diff --git a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
index 19a6b47..c473386 100644
--- a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
@@ -20,7 +20,7 @@ import org.junit.Test
 class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness with RackAwareTest {
 
   @Test
-  def testRackAwareReassign() {
+  def testRackAwareReassign(): Unit = {
     val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4 -> "rack3", 5 -> "rack3")
     TestUtils.createBrokersInZk(toBrokerMetadata(rackInfo), zkClient)
 
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index a8d317c..c0a38f5 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -50,6 +50,7 @@ import org.junit.rules.Timeout
 import org.junit.{After, Before, Rule, Test}
 import org.scalatest.Assertions.intercept
 import scala.collection.JavaConverters._
+import scala.collection.Seq
 import scala.compat.java8.OptionConverters._
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, Future}
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 466728d..e715525 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -23,6 +23,7 @@ import org.junit.Test
 import org.junit.Assert._
 
 import scala.collection.JavaConverters._
+import scala.collection.Seq
 
 /**
  * Integration tests for the consumer that cover basic usage as well as coordinator failure
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 1bd1c81..778e3b8 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -31,6 +31,7 @@ import org.junit.{After, Ignore, Test}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
+import scala.collection.Seq
 
 /**
  * Integration tests for the consumer that cover basic usage as well as server failures
diff --git a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
index 1394c83..174a3e3 100644
--- a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
+++ b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
@@ -18,7 +18,7 @@ import java.io.File
 import java.{lang, util}
 import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
-import java.util.{Collections, Properties}
+import java.util.Properties
 
 import kafka.api.GroupedUserPrincipalBuilder._
 import kafka.api.GroupedUserQuotaCallback._
@@ -315,7 +315,7 @@ object GroupedUserQuotaCallback {
   val QuotaGroupTag = "group"
   val DefaultProduceQuotaProp = "default.produce.quota"
   val DefaultFetchQuotaProp = "default.fetch.quota"
-  val UnlimitedQuotaMetricTags = Collections.emptyMap[String, String]
+  val UnlimitedQuotaMetricTags = new util.HashMap[String, String]
   val quotaLimitCalls = Map(
     ClientQuotaType.PRODUCE -> new AtomicInteger,
     ClientQuotaType.FETCH -> new AtomicInteger,
@@ -344,10 +344,8 @@ object GroupedUserQuotaCallback {
 class GroupedUserQuotaCallback extends ClientQuotaCallback with Reconfigurable with Logging {
 
   var brokerId: Int = -1
-  val customQuotasUpdated = ClientQuotaType.values.toList
-    .map(quotaType =>(quotaType -> new AtomicBoolean)).toMap
-  val quotas = ClientQuotaType.values.toList
-    .map(quotaType => (quotaType -> new ConcurrentHashMap[String, Double])).toMap
+  val customQuotasUpdated = ClientQuotaType.values.map(quotaType => quotaType -> new AtomicBoolean).toMap
+  val quotas = ClientQuotaType.values.map(quotaType => quotaType -> new ConcurrentHashMap[String, Double]).toMap
 
   val partitionRatio = new ConcurrentHashMap[String, Double]()
 
@@ -398,7 +396,7 @@ class GroupedUserQuotaCallback extends ClientQuotaCallback with Reconfigurable w
   override def updateClusterMetadata(cluster: Cluster): Boolean = {
     val topicsByGroup = cluster.topics.asScala.groupBy(group)
 
-    !topicsByGroup.forall { case (group, groupTopics) =>
+    topicsByGroup.map { case (group, groupTopics) =>
       val groupPartitions = groupTopics.flatMap(topic => cluster.partitionsForTopic(topic).asScala)
       val totalPartitions = groupPartitions.size
       val partitionsOnThisBroker = groupPartitions.count { p => p.leader != null && p.leader.id == brokerId }
@@ -409,7 +407,7 @@ class GroupedUserQuotaCallback extends ClientQuotaCallback with Reconfigurable w
       else
         partitionsOnThisBroker.toDouble / totalPartitions
       partitionRatio.put(group, multiplier) != multiplier
-    }
+    }.exists(identity)
   }
 
   override def updateQuota(quotaType: ClientQuotaType, quotaEntity: ClientQuotaEntity, newValue: Double): Unit = {
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 242a305..92742e4 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
 import org.junit.{After, Before}
 
 import scala.collection.mutable
+import scala.collection.Seq
 
 /**
  * A helper class for writing integration tests that involve producers, consumers, and servers
diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
index 582b81d..c32f7c2 100644
--- a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
@@ -15,6 +15,8 @@ package kafka.api
 import java.io.File
 import java.util.{Locale, Properties}
 
+import scala.collection.Seq
+
 import kafka.server.KafkaConfig
 import kafka.utils.{JaasTestUtils, TestUtils}
 import org.apache.kafka.common.network.ListenerName
diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
index bbe0dd8..d5bf887 100644
--- a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
@@ -21,6 +21,8 @@ import javax.security.auth.callback._
 import javax.security.auth.Subject
 import javax.security.auth.login.AppConfigurationEntry
 
+import scala.collection.Seq
+
 import kafka.server.KafkaConfig
 import kafka.utils.{TestUtils}
 import kafka.utils.JaasTestUtils._
diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
index 81de105..c0cd9a0 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
@@ -21,6 +21,8 @@ import java.io.File
 import java.util.Properties
 import javax.security.auth.login.Configuration
 
+import scala.collection.Seq
+
 import kafka.admin.ConfigCommand
 import kafka.security.minikdc.MiniKdc
 import kafka.server.KafkaConfig
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 13ddd92..5b1888a 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -36,6 +36,7 @@ import org.scalatest.Assertions.fail
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.Buffer
+import scala.collection.Seq
 import scala.concurrent.ExecutionException
 
 class TransactionsTest extends KafkaServerTestHarness {
diff --git a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
index 6a5a33d..2d8fac1 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
@@ -18,6 +18,8 @@ package kafka.server
 
 import java.util.Optional
 
+import scala.collection.Seq
+
 import kafka.cluster.Partition
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.FencedLeaderEpochException
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index f08450a..011565e 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -1233,7 +1233,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
   private def reconfigureServers(newProps: Properties, perBrokerConfig: Boolean, aPropToVerify: (String, String), expectFailure: Boolean = false): Unit = {
     val alterResult = TestUtils.alterConfigs(servers, adminClients.head, newProps, perBrokerConfig)
     if (expectFailure) {
-      val oldProps = servers.head.config.values.asScala.filterKeys(newProps.containsKey)
+      val oldProps = servers.head.config.values.asScala.filter { case (k, _) => newProps.containsKey(k) }
       val brokerResources = if (perBrokerConfig)
         servers.map(server => new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString))
       else
@@ -1242,7 +1242,9 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
         val exception = intercept[ExecutionException](alterResult.values.get(brokerResource).get)
         assertTrue(exception.getCause.isInstanceOf[InvalidRequestException])
       }
-      servers.foreach { server => assertEquals(oldProps, server.config.values.asScala.filterKeys(newProps.containsKey)) }
+      servers.foreach { server =>
+        assertEquals(oldProps, server.config.values.asScala.filter { case (k, _) => newProps.containsKey(k) })
+      }
     } else {
       alterResult.all.get
       waitForConfig(aPropToVerify._1, aPropToVerify._2)
diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala
index f2c6753..39aa3c3 100644
--- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala
+++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala
@@ -19,6 +19,8 @@ package kafka.server
 
 import java.util.Properties
 
+import scala.collection.Seq
+
 import kafka.utils.JaasTestUtils
 import kafka.utils.JaasTestUtils.JaasSection
 
diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala
index 00695ed..23746c4 100644
--- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala
+++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala
@@ -19,6 +19,8 @@ package kafka.server
 
 import java.util.Properties
 
+import scala.collection.Seq
+
 import kafka.api.Both
 import kafka.utils.JaasTestUtils.JaasSection
 
diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
index 95a4843..86d05a7 100644
--- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
+++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
@@ -38,6 +38,7 @@ import org.junit.{After, Before, Test}
 
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
+import scala.collection.Seq
 
 object MultipleListenersWithSameSecurityProtocolBaseTest {
   val SecureInternal = "SECURE_INTERNAL"
diff --git a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
index 0329f52..4384091 100644
--- a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
@@ -18,6 +18,8 @@ package kafka.tools
 
 import java.util.Properties
 
+import scala.collection.Seq
+
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
 import kafka.tools.MirrorMaker.{ConsumerWrapper, MirrorMakerProducer, NoRecordsException}
diff --git a/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala b/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala
index bece037..b065ed2 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala
@@ -148,7 +148,7 @@ class AdminRackAwareTest extends RackAwareTest with Logging {
     val replicationFactor = 5
     val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack2")
     val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor)
-    assertEquals(List.fill(assignment.size)(replicationFactor), assignment.values.map(_.size))
+    assertEquals(List.fill(assignment.size)(replicationFactor), assignment.values.toIndexedSeq.map(_.size))
     val distribution = getReplicaDistribution(assignment, brokerRackMapping)
     for (partition <- 0 until numPartitions)
       assertEquals(3, distribution.partitionRacks(partition).toSet.size)
@@ -161,7 +161,7 @@ class AdminRackAwareTest extends RackAwareTest with Logging {
     val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack2")
     val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions,
       replicationFactor)
-    assertEquals(List.fill(assignment.size)(replicationFactor), assignment.values.map(_.size))
+    assertEquals(List.fill(assignment.size)(replicationFactor), assignment.values.toIndexedSeq.map(_.size))
     val distribution = getReplicaDistribution(assignment, brokerRackMapping)
     for (partition <- 0 to 5)
       assertEquals(2, distribution.partitionRacks(partition).toSet.size)
@@ -173,7 +173,7 @@ class AdminRackAwareTest extends RackAwareTest with Logging {
     val replicationFactor = 3
     val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack1", 2 -> "rack1", 3 -> "rack1", 4 -> "rack1", 5 -> "rack1")
     val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor)
-    assertEquals(List.fill(assignment.size)(replicationFactor), assignment.values.map(_.size))
+    assertEquals(List.fill(assignment.size)(replicationFactor), assignment.values.toIndexedSeq.map(_.size))
     val distribution = getReplicaDistribution(assignment, brokerRackMapping)
     for (partition <- 0 until numPartitions)
       assertEquals(1, distribution.partitionRacks(partition).toSet.size)
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 6e40cad..dfdef94 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -16,13 +16,16 @@
  */
 package kafka.admin
 
+import java.util.Properties
+
+import scala.collection.Seq
+
 import kafka.log.Log
 import kafka.zk.{TopicPartitionZNode, ZooKeeperTestHarness}
 import kafka.utils.TestUtils
 import kafka.server.{KafkaConfig, KafkaServer}
 import org.junit.Assert._
 import org.junit.{After, Test}
-import java.util.Properties
 
 import kafka.admin.TopicCommand.ZookeeperTopicService
 import kafka.common.TopicAlreadyMarkedForDeletionException
diff --git a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
index 6f87687..ae78fa4 100644
--- a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
@@ -36,6 +36,7 @@ import org.junit.Assert._
 import org.junit.Before
 import org.junit.Test
 import scala.collection.JavaConverters._
+import scala.collection.Seq
 import scala.concurrent.duration._
 
 final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
diff --git a/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala b/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala
index 38f2430..1fc57a1 100644
--- a/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala
@@ -21,6 +21,8 @@ import java.nio.charset.StandardCharsets
 import java.nio.file.{Files, Paths}
 import java.util.Properties
 
+import scala.collection.Seq
+
 import kafka.common.AdminCommandFailedException
 import kafka.network.RequestChannel
 import kafka.security.auth._
@@ -56,7 +58,7 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
         brokerConfigs.foreach(p => p.setProperty("authorizer.class.name", className))
       case None =>
     }
-    createTestTopicAndCluster(topicPartition,brokerConfigs)
+    createTestTopicAndCluster(topicPartition, brokerConfigs)
   }
 
   private def createTestTopicAndCluster(partitionsAndAssignments: Map[TopicPartition, List[Int]],
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
index 6e5fcef..79620e7 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
@@ -37,6 +37,7 @@ import scala.collection.JavaConverters._
 import org.apache.kafka.common.TopicPartition
 
 import scala.collection.mutable
+import scala.collection.Seq
 
 class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging {
   var servers: Seq[KafkaServer] = Seq()
diff --git a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
index 8b1bad6..a078419 100644
--- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
@@ -16,6 +16,8 @@ import java.io.{BufferedWriter, File, FileWriter}
 import java.text.{ParseException, SimpleDateFormat}
 import java.util.{Calendar, Date, Properties}
 
+import scala.collection.Seq
+
 import joptsimple.OptionException
 import kafka.admin.ConsumerGroupCommand.ConsumerGroupService
 import kafka.server.KafkaConfig
@@ -325,7 +327,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
     val tp1 = new TopicPartition(topic1, 0)
     val tp2 = new TopicPartition(topic2, 0)
 
-    val allResetOffsets = resetOffsets(consumerGroupCommand)(group).mapValues(_.offset())
+    val allResetOffsets = resetOffsets(consumerGroupCommand)(group).mapValues(_.offset).toMap
     assertEquals(Map(tp1 -> 0L, tp2 -> 0L), allResetOffsets)
     assertEquals(Map(tp1 -> 0L), committedOffsets(topic1))
     assertEquals(Map(tp2 -> 0L), committedOffsets(topic2))
@@ -353,7 +355,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
 
     val tp1 = new TopicPartition(topic1, 1)
     val tp2 = new TopicPartition(topic2, 1)
-    val allResetOffsets = resetOffsets(consumerGroupCommand)(group).mapValues(_.offset())
+    val allResetOffsets = resetOffsets(consumerGroupCommand)(group).mapValues(_.offset).toMap
     assertEquals(Map(tp1 -> 0, tp2 -> 0), allResetOffsets)
 
     assertEquals(priorCommittedOffsets1 + (tp1 -> 0L), committedOffsets(topic1))
@@ -383,12 +385,12 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
     val bw = new BufferedWriter(new FileWriter(file))
     bw.write(consumerGroupCommand.exportOffsetsToCsv(exportedOffsets))
     bw.close()
-    assertEquals(Map(tp0 -> 2L, tp1 -> 2L), exportedOffsets(group).mapValues(_.offset))
+    assertEquals(Map(tp0 -> 2L, tp1 -> 2L), exportedOffsets(group).mapValues(_.offset).toMap)
 
     val cgcArgsExec = buildArgsForGroup(group, "--all-topics", "--from-file", file.getCanonicalPath, "--dry-run")
     val consumerGroupCommandExec = getConsumerGroupService(cgcArgsExec)
     val importedOffsets = consumerGroupCommandExec.resetOffsets()
-    assertEquals(Map(tp0 -> 2L, tp1 -> 2L), importedOffsets(group).mapValues(_.offset))
+    assertEquals(Map(tp0 -> 2L, tp1 -> 2L), importedOffsets(group).mapValues(_.offset).toMap)
 
     adminZkClient.deleteTopic(topic)
   }
@@ -421,21 +423,21 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
     val bw = new BufferedWriter(new FileWriter(file))
     bw.write(consumerGroupCommand.exportOffsetsToCsv(exportedOffsets))
     bw.close()
-    assertEquals(Map(t1p0 -> 2L, t1p1 -> 2L), exportedOffsets(group1).mapValues(_.offset))
-    assertEquals(Map(t2p0 -> 2L, t2p1 -> 2L), exportedOffsets(group2).mapValues(_.offset))
+    assertEquals(Map(t1p0 -> 2L, t1p1 -> 2L), exportedOffsets(group1).mapValues(_.offset).toMap)
+    assertEquals(Map(t2p0 -> 2L, t2p1 -> 2L), exportedOffsets(group2).mapValues(_.offset).toMap)
 
     // Multiple --group's offset import
     val cgcArgsExec = buildArgsForGroups(Seq(group1, group2), "--all-topics", "--from-file", file.getCanonicalPath, "--dry-run")
     val consumerGroupCommandExec = getConsumerGroupService(cgcArgsExec)
     val importedOffsets = consumerGroupCommandExec.resetOffsets()
-    assertEquals(Map(t1p0 -> 2L, t1p1 -> 2L), importedOffsets(group1).mapValues(_.offset))
-    assertEquals(Map(t2p0 -> 2L, t2p1 -> 2L), importedOffsets(group2).mapValues(_.offset))
+    assertEquals(Map(t1p0 -> 2L, t1p1 -> 2L), importedOffsets(group1).mapValues(_.offset).toMap)
+    assertEquals(Map(t2p0 -> 2L, t2p1 -> 2L), importedOffsets(group2).mapValues(_.offset).toMap)
 
     // Single --group offset import using "group,topic,partition,offset" csv format
     val cgcArgsExec2 = buildArgsForGroup(group1, "--all-topics", "--from-file", file.getCanonicalPath, "--dry-run")
     val consumerGroupCommandExec2 = getConsumerGroupService(cgcArgsExec2)
     val importedOffsets2 = consumerGroupCommandExec2.resetOffsets()
-    assertEquals(Map(t1p0 -> 2L, t1p1 -> 2L), importedOffsets2(group1).mapValues(_.offset))
+    assertEquals(Map(t1p0 -> 2L, t1p1 -> 2L), importedOffsets2(group1).mapValues(_.offset).toMap)
 
     adminZkClient.deleteTopic(topic)
   }
@@ -482,7 +484,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
         (group, partitionInfo) <- resetOffsetsResultByGroup
       } {
         val priorOffsets = committedOffsets(topic = topic, group = group)
-        assertEquals(expectedOffsets(topic), partitionInfo.filter(partitionInfo => partitionInfo._1.topic() == topic).mapValues(_.offset))
+        assertEquals(expectedOffsets(topic), partitionInfo.filter(partitionInfo => partitionInfo._1.topic() == topic).mapValues(_.offset).toMap)
         assertEquals(if (dryRun) priorOffsets else expectedOffsets(topic), committedOffsets(topic = topic, group = group))
       }
     } finally {
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
index de7fd6b..a98f751 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
@@ -37,6 +37,7 @@ import org.junit.{After, Before, Rule, Test}
 import org.scalatest.Assertions.{fail, intercept}
 
 import scala.collection.JavaConverters._
+import scala.collection.Seq
 import scala.concurrent.ExecutionException
 import scala.util.Random
 
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index bb1270d..9989fc0 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -754,7 +754,7 @@ class PartitionTest {
     }
 
     assertThrows[IllegalArgumentException] {
-      val replica = partition.getOrCreateReplica(brokerId)
+      partition.getOrCreateReplica(brokerId)
     }
 
     val remoteReplicaId = brokerId + 1;
diff --git a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
index 2572969..12f55ed 100644
--- a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
@@ -23,6 +23,7 @@ import org.apache.kafka.common.resource.PatternType.LITERAL
 import org.junit.{After, Before, Test}
 
 import scala.collection.mutable.ArrayBuffer
+import scala.collection.Seq
 
 class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness {
 
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
index 9f09231..afe5308 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
@@ -71,10 +71,10 @@ class ControllerChannelManagerTest {
     assertEquals(controllerId, leaderAndIsrRequest.controllerId)
     assertEquals(controllerEpoch, leaderAndIsrRequest.controllerEpoch)
     assertEquals(partitions.keySet, leaderAndIsrRequest.partitionStates.keySet.asScala)
-    assertEquals(partitions.mapValues(_.leader),
-      leaderAndIsrRequest.partitionStates.asScala.mapValues(_.basePartitionState.leader))
-    assertEquals(partitions.mapValues(_.isr),
-      leaderAndIsrRequest.partitionStates.asScala.mapValues(_.basePartitionState.isr.asScala))
+    assertEquals(partitions.map { case (k, v) => (k, v.leader) },
+      leaderAndIsrRequest.partitionStates.asScala.mapValues(_.basePartitionState.leader).toMap)
+    assertEquals(partitions.map { case (k, v) => (k, v.isr) },
+      leaderAndIsrRequest.partitionStates.asScala.mapValues(_.basePartitionState.isr.asScala).toMap)
 
     applyLeaderAndIsrResponseCallbacks(Errors.NONE, batch.sentRequests(2).toList)
     assertEquals(1, batch.sentEvents.size)
@@ -202,8 +202,10 @@ class ControllerChannelManagerTest {
 
     val updateMetadataRequest = updateMetadataRequests.head
     assertEquals(3, updateMetadataRequest.partitionStates.size)
-    assertEquals(partitions.mapValues(_.leader), updateMetadataRequest.partitionStates.asScala.mapValues(_.basePartitionState.leader))
-    assertEquals(partitions.mapValues(_.isr), updateMetadataRequest.partitionStates.asScala.mapValues(_.basePartitionState.isr.asScala))
+    assertEquals(partitions.map { case (k, v) => (k, v.leader) },
+      updateMetadataRequest.partitionStates.asScala.map { case (k, v) => (k, v.basePartitionState.leader) })
+    assertEquals(partitions.map { case (k, v) => (k, v.isr) },
+      updateMetadataRequest.partitionStates.asScala.map { case (k, v) => (k, v.basePartitionState.isr.asScala) })
 
     assertEquals(controllerId, updateMetadataRequest.controllerId)
     assertEquals(controllerEpoch, updateMetadataRequest.controllerEpoch)
@@ -272,9 +274,11 @@ class ControllerChannelManagerTest {
       .map(_.basePartitionState.leader)
       .forall(leaderId => leaderId == LeaderAndIsr.LeaderDuringDelete))
 
-    assertEquals(partitions.filterKeys(_.topic == "bar").mapValues(_.leader),
-      updateMetadataRequest.partitionStates.asScala.filterKeys(_.topic == "bar").mapValues(_.basePartitionState.leader))
-    assertEquals(partitions.mapValues(_.isr), updateMetadataRequest.partitionStates.asScala.mapValues(_.basePartitionState.isr.asScala))
+    assertEquals(partitions.filter { case (k, _) => k.topic == "bar" }.map { case (k, v) => (k, v.leader) },
+      updateMetadataRequest.partitionStates.asScala.filter { case (k, _) => k.topic == "bar" }.map { case (k, v) =>
+        (k, v.basePartitionState.leader) })
+    assertEquals(partitions.map { case (k, v) => (k, v.isr) },
+      updateMetadataRequest.partitionStates.asScala.map { case (k, v) => (k, v.basePartitionState.isr.asScala) })
 
     assertEquals(3, updateMetadataRequest.liveBrokers.size)
     assertEquals(Set(1, 2, 3), updateMetadataRequest.liveBrokers.asScala.map(_.id).toSet)
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
index 2c4f66d..fc8f324 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
@@ -103,7 +103,7 @@ class ControllerEventManagerTest {
   @Test
   def testSuccessfulEvent(): Unit = {
     check("kafka.controller:type=ControllerStats,name=AutoLeaderBalanceRateAndTimeMs",
-      AutoPreferredReplicaLeaderElection, () => Unit)
+      AutoPreferredReplicaLeaderElection, () => ())
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index d57d78e..a69a4a2 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -37,6 +37,7 @@ import org.scalatest.Assertions.fail
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
+import scala.collection.Seq
 import scala.util.{Failure, Success, Try}
 
 class ControllerIntegrationTest extends ZooKeeperTestHarness {
diff --git a/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala b/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala
index 94fd177..0c6c00d 100644
--- a/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala
+++ b/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala
@@ -21,7 +21,7 @@ import kafka.common.StateChangeFailedException
 import kafka.controller.Election._
 import org.apache.kafka.common.TopicPartition
 
-import scala.collection.{breakOut, mutable}
+import scala.collection.{Seq, mutable}
 
 class MockPartitionStateMachine(controllerContext: ControllerContext,
                                 uncleanLeaderElectionEnabled: Boolean)
@@ -111,7 +111,7 @@ class MockPartitionStateMachine(controllerContext: ControllerContext,
       }
 
       partition -> value
-    }(breakOut)
+    }.toMap
 
     results ++ failedElections
   }
diff --git a/core/src/test/scala/unit/kafka/controller/MockReplicaStateMachine.scala b/core/src/test/scala/unit/kafka/controller/MockReplicaStateMachine.scala
index 248a5de..e5207bf 100644
--- a/core/src/test/scala/unit/kafka/controller/MockReplicaStateMachine.scala
+++ b/core/src/test/scala/unit/kafka/controller/MockReplicaStateMachine.scala
@@ -16,6 +16,8 @@
  */
 package kafka.controller
 
+import scala.collection.Seq
+
 class MockReplicaStateMachine(controllerContext: ControllerContext) extends ReplicaStateMachine(controllerContext) {
 
   override def handleStateChanges(replicas: Seq[PartitionAndReplica], targetState: ReplicaState): Unit = {
diff --git a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
index 9482dfa..9ca3bd2 100644
--- a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
@@ -30,7 +30,6 @@ import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.{Before, Test}
 import org.mockito.Mockito
-import scala.collection.breakOut
 
 class PartitionStateMachineTest {
   private var controllerContext: ControllerContext = null
@@ -431,9 +430,9 @@ class PartitionStateMachineTest {
     def prepareMockToUpdateLeaderAndIsr(): Unit = {
       val updatedLeaderAndIsr: Map[TopicPartition, LeaderAndIsr] = partitions.map { partition =>
         partition -> leaderAndIsr.newLeaderAndIsr(brokerId, List(brokerId))
-      }(breakOut)
+      }.toMap
       EasyMock.expect(mockZkClient.updateLeaderAndIsr(updatedLeaderAndIsr, controllerEpoch, controllerContext.epochZkVersion))
-        .andReturn(UpdateLeaderAndIsrResult(updatedLeaderAndIsr.mapValues(Right(_)), Seq.empty))
+        .andReturn(UpdateLeaderAndIsrResult(updatedLeaderAndIsr.mapValues(Right(_)).toMap, Seq.empty))
     }
     prepareMockToUpdateLeaderAndIsr()
   }
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index f9b571e..6761b0c 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -104,7 +104,7 @@ class GroupMetadataManagerTest {
     )
 
     val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
-    val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, offsetCommitRecords: _*)
+    val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, offsetCommitRecords.toArray: _*)
     expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
 
     EasyMock.replay(replicaManager)
@@ -135,7 +135,7 @@ class GroupMetadataManagerTest {
     val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
     val groupMetadataRecord = buildEmptyGroupRecord(generation, protocolType)
     val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
-      offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
+      (offsetCommitRecords ++ Seq(groupMetadataRecord)).toArray: _*)
 
     expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
 
@@ -566,7 +566,7 @@ class GroupMetadataManagerTest {
     val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
     val tombstone = new SimpleRecord(GroupMetadataManager.offsetCommitKey(groupId, tombstonePartition), null)
     val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
-      offsetCommitRecords ++ Seq(tombstone): _*)
+      (offsetCommitRecords ++ Seq(tombstone)).toArray: _*)
 
     expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
 
@@ -603,7 +603,7 @@ class GroupMetadataManagerTest {
     val memberId = "98098230493"
     val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
     val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
-      offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
+      (offsetCommitRecords ++ Seq(groupMetadataRecord)).toArray: _*)
 
     expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
 
@@ -635,7 +635,7 @@ class GroupMetadataManagerTest {
       protocolType = "consumer", protocol = "range", memberId)
     val groupMetadataTombstone = new SimpleRecord(GroupMetadataManager.groupMetadataKey(groupId), null)
     val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
-      Seq(groupMetadataRecord, groupMetadataTombstone): _*)
+      Seq(groupMetadataRecord, groupMetadataTombstone).toArray: _*)
 
     expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
 
@@ -664,7 +664,7 @@ class GroupMetadataManagerTest {
     val groupMetadataRecord = buildStableGroupRecordWithMember(generation = 15,
       protocolType = "consumer", protocol = "range", memberId, assignmentSize)
     val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
-      offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
+      (offsetCommitRecords ++ Seq(groupMetadataRecord)).toArray: _*)
 
     expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
 
@@ -700,7 +700,7 @@ class GroupMetadataManagerTest {
     val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
     val groupMetadataTombstone = new SimpleRecord(GroupMetadataManager.groupMetadataKey(groupId), null)
     val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
-      Seq(groupMetadataRecord, groupMetadataTombstone) ++ offsetCommitRecords: _*)
+      (Seq(groupMetadataRecord, groupMetadataTombstone) ++ offsetCommitRecords).toArray: _*)
 
     expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
 
@@ -734,15 +734,15 @@ class GroupMetadataManagerTest {
     val segment1MemberId = "a"
     val segment1Offsets = Map(tp0 -> 23L, tp1 -> 455L, tp3 -> 42L)
     val segment1Records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
-      createCommittedOffsetRecords(segment1Offsets) ++ Seq(buildStableGroupRecordWithMember(
-        generation, protocolType, protocol, segment1MemberId)): _*)
+      (createCommittedOffsetRecords(segment1Offsets) ++ Seq(buildStableGroupRecordWithMember(
+        generation, protocolType, protocol, segment1MemberId))).toArray: _*)
     val segment1End = expectGroupMetadataLoad(logMock, startOffset, segment1Records)
 
     val segment2MemberId = "b"
     val segment2Offsets = Map(tp0 -> 33L, tp2 -> 8992L, tp3 -> 10L)
     val segment2Records = MemoryRecords.withRecords(segment1End, CompressionType.NONE,
-      createCommittedOffsetRecords(segment2Offsets) ++ Seq(buildStableGroupRecordWithMember(
-        generation, protocolType, protocol, segment2MemberId)): _*)
+      (createCommittedOffsetRecords(segment2Offsets) ++ Seq(buildStableGroupRecordWithMember(
+        generation, protocolType, protocol, segment2MemberId))).toArray: _*)
     val segment2End = expectGroupMetadataLoad(logMock, segment1End, segment2Records)
 
     EasyMock.expect(replicaManager.getLogEndOffset(groupTopicPartition)).andStubReturn(Some(segment2End))
@@ -1686,7 +1686,7 @@ class GroupMetadataManagerTest {
     val memberId = "98098230493"
     val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, apiVersion = apiVersion)
     val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
-      offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
+      (offsetCommitRecords ++ Seq(groupMetadataRecord)).toArray: _*)
 
     expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
 
@@ -1726,7 +1726,7 @@ class GroupMetadataManagerTest {
     val memberId = "98098230493"
     val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
     val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
-      offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
+      (offsetCommitRecords ++ Seq(groupMetadataRecord)).toArray: _*)
 
     expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
 
@@ -1856,7 +1856,7 @@ class GroupMetadataManagerTest {
     val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
     val groupMetadataRecord = buildEmptyGroupRecord(generation, protocolType)
     val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
-      offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
+      (offsetCommitRecords ++ Seq(groupMetadataRecord)).toArray: _*)
 
     // Prepend empty control batch to valid records
     val mockBatch: MutableRecordBatch = EasyMock.createMock(classOf[MutableRecordBatch])
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
index 3cf9566..9719a48 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
@@ -251,7 +251,7 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
 
     val topicPartition = new TopicPartition(TRANSACTION_STATE_TOPIC_NAME, partitionId)
     val startOffset = replicaManager.getLogEndOffset(topicPartition).getOrElse(20L)
-    val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, txnRecordsByPartition(partitionId): _*)
+    val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, txnRecordsByPartition(partitionId).toArray: _*)
     val endOffset = startOffset + records.records.asScala.size
 
     EasyMock.expect(logMock.logStartOffset).andStubReturn(startOffset)
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index bee333d..08d0b32 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -156,7 +156,7 @@ class TransactionStateManagerTest {
     txnRecords += new SimpleRecord(txnMessageKeyBytes2, TransactionLog.valueToBytes(txnMetadata2.prepareNoTransit()))
 
     val startOffset = 15L   // it should work for any start offset
-    val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, txnRecords: _*)
+    val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, txnRecords.toArray: _*)
 
     prepareTxnLog(topicPartition, startOffset, records)
 
@@ -538,7 +538,7 @@ class TransactionStateManagerTest {
 
     txnRecords += new SimpleRecord(txnMessageKeyBytes1, TransactionLog.valueToBytes(txnMetadata1.prepareNoTransit()))
     val startOffset = 0L
-    val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, txnRecords: _*)
+    val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, txnRecords.toArray: _*)
 
     prepareTxnLog(topicPartition, 0, records)
 
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 33001fd..dd6b8d0 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -26,6 +26,7 @@ import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.junit.{After, Before}
 
+import scala.collection.Seq
 import scala.collection.mutable.{ArrayBuffer, Buffer}
 import java.util.Properties
 
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 64d7174..daa0982 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -22,6 +22,7 @@ import org.junit.{After, Before, Test}
 
 import scala.util.Random
 import scala.collection.JavaConverters._
+import scala.collection.Seq
 import org.apache.log4j.{Level, Logger}
 import java.util.Properties
 import java.util.concurrent.ExecutionException
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
index e3df4f0..5282e6d 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
@@ -33,7 +33,6 @@ import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.runners.Parameterized.Parameters
 
-import scala.Seq
 import scala.collection._
 
 /**
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 42ad8a0..984509c 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -1283,7 +1283,7 @@ class LogCleanerTest {
 
     val numSegmentsInitial = log.logSegments.size
     val allKeys = LogTest.keysInLog(log).toList
-    val expectedKeysAfterCleaning = mutable.MutableList[Long]()
+    val expectedKeysAfterCleaning = new mutable.ArrayBuffer[Long]()
 
     // pretend we want to clean every alternate key
     val offsetMap = new FakeOffsetMap(Int.MaxValue)
@@ -1642,9 +1642,9 @@ class LogCleanerTest {
         new SimpleRecord(time.milliseconds(), keyBytes, keyBytes) // the value doesn't matter since we validate offsets
       }
       val records = if (isTransactional)
-        MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, simpleRecords: _*)
+        MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, simpleRecords.toArray: _*)
       else
-        MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, producerEpoch, sequence, simpleRecords: _*)
+        MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, producerEpoch, sequence, simpleRecords.toArray: _*)
       sequence += simpleRecords.size
       log.appendAsLeader(records, leaderEpoch, isFromClient)
     }
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 5162d7e..7bed0d5 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -807,8 +807,7 @@ class SocketServerTest {
     def requestMetricMeters = YammerMetrics
       .defaultRegistry
       .allMetrics.asScala
-      .filterKeys(k => k.getType == "RequestMetrics")
-      .collect { case (k, metric: Meter) => (k.toString, metric.count) }
+      .collect { case (k, metric: Meter) if k.getType == "RequestMetrics" => (k.toString, metric.count) }
 
     assertEquals(nonZeroMeters, requestMetricMeters.filter { case (_, value) => value != 0 })
     server.shutdown()
@@ -822,7 +821,7 @@ class SocketServerTest {
     val nonZeroMetricNamesAndValues = YammerMetrics
       .defaultRegistry
       .allMetrics.asScala
-      .filterKeys(k => k.getName.endsWith("IdlePercent") || k.getName.endsWith("NetworkProcessorAvgIdlePercent"))
+      .filter { case (k, _) => k.getName.endsWith("IdlePercent") || k.getName.endsWith("NetworkProcessorAvgIdlePercent") }
       .collect { case (k, metric: Gauge[_]) => (k, metric.value().asInstanceOf[Double]) }
       .filter { case (_, value) => value != 0.0 && !value.equals(Double.NaN) }
 
@@ -1068,7 +1067,7 @@ class SocketServerTest {
 
       testableSelector.operationCounts.clear()
       testableSelector.addFailure(SelectorOperation.Poll,
-          Some(new RuntimeException("ControlThrowable exception during poll()") with ControlThrowable))
+          Some(new ControlThrowable() {}))
       testableSelector.waitForOperations(SelectorOperation.Poll, 1)
 
       testableSelector.waitForOperations(SelectorOperation.CloseSelector, 1)
@@ -1086,8 +1085,10 @@ class SocketServerTest {
     val errors = new mutable.HashSet[String]
 
     def acceptorStackTraces: scala.collection.Map[Thread, String] = {
-      Thread.getAllStackTraces.asScala.filterKeys(_.getName.contains("kafka-socket-acceptor"))
-        .mapValues(_.toList.mkString("\n"))
+      Thread.getAllStackTraces.asScala.collect {
+        case (thread, stacktraceElement) if thread.getName.contains("kafka-socket-acceptor") =>
+          thread -> stacktraceElement.mkString("\n")
+      }
     }
 
     def acceptorBlocked: Boolean = {
@@ -1247,7 +1248,7 @@ class SocketServerTest {
         extends Selector(config.socketRequestMaxBytes, config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs,
             metrics, time, "socket-server", metricTags.asJava, false, true, channelBuilder, MemoryPool.NONE, new LogContext()) {
 
-    val failures = mutable.Map[SelectorOperation, Exception]()
+    val failures = mutable.Map[SelectorOperation, Throwable]()
     val operationCounts = mutable.Map[SelectorOperation, Int]().withDefaultValue(0)
     val allChannels = mutable.Set[String]()
     val allLocallyClosedChannels = mutable.Set[String]()
@@ -1283,7 +1284,7 @@ class SocketServerTest {
     @volatile var pollTimeoutOverride: Option[Long] = None
     @volatile var pollCallback: () => Unit = () => {}
 
-    def addFailure(operation: SelectorOperation, exception: Option[Exception] = None) {
+    def addFailure(operation: SelectorOperation, exception: Option[Throwable] = None) {
       failures += operation ->
         exception.getOrElse(new IllegalStateException(s"Test exception during $operation"))
     }
diff --git a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
index 75038bf..06004bc 100755
--- a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
@@ -31,7 +31,7 @@ class AdvertiseBrokerTest extends ZooKeeperTestHarness {
   val brokerId = 0
 
   @After
-  override def tearDown() {
+  override def tearDown(): Unit = {
     TestUtils.shutdownServers(servers)
     super.tearDown()
   }
diff --git a/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala
index d61c14c..eda0e55 100644
--- a/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala
@@ -38,7 +38,7 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest {
   val topic = "topic"
 
   @Test
-  def testAlterReplicaLogDirsRequest() {
+  def testAlterReplicaLogDirsRequest(): Unit = {
     val partitionNum = 5
 
     // Alter replica dir before topic creation
diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
index a92aeaf..1e94e3f 100644
--- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
@@ -22,6 +22,8 @@ import java.net.Socket
 import java.nio.ByteBuffer
 import java.util.Properties
 
+import scala.collection.Seq
+
 import kafka.api.IntegrationTestHarness
 import kafka.network.SocketServer
 import org.apache.kafka.common.network.ListenerName
diff --git a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
index 74d9892..1cdd8f1 100644
--- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
@@ -146,7 +146,7 @@ object CreateTopicsRequestWithPolicyTest {
         require(replicationFactor == null, s"replicationFactor should be null, but it is $replicationFactor")
         require(replicasAssignments != null, s"replicaAssigments should not be null, but it is $replicasAssignments")
 
-        replicasAssignments.asScala.foreach { case (partitionId, assignment) =>
+        replicasAssignments.asScala.toSeq.sortBy { case (tp, _) => tp }.foreach { case (partitionId, assignment) =>
           if (assignment.size < 2)
             throw new PolicyViolationException("Topic partitions should have at least 2 partitions, received " +
               s"${assignment.size} for partition $partitionId")
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index f5cc134..86d977c 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -31,7 +31,7 @@ import kafka.admin.AdminOperationException
 import kafka.zk.ConfigEntityChangeNotificationZNode
 import org.apache.kafka.common.TopicPartition
 
-import scala.collection.Map
+import scala.collection.{Map, Seq}
 import scala.collection.JavaConverters._
 
 class DynamicConfigChangeTest extends KafkaServerTestHarness {
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 8ef611a..7d3a58b 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -44,7 +44,7 @@ class FetchRequestTest extends BaseRequestTest {
 
   private var producer: KafkaProducer[String, String] = null
 
-  override def tearDown() {
+  override def tearDown(): Unit = {
     if (producer != null)
       producer.close()
     super.tearDown()
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index efdadbb..2fe67f7 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -30,6 +30,7 @@ import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
+import scala.collection.Seq
 import scala.collection.mutable.{HashMap, Map}
 
 class IsrExpirationTest {
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index aac7ad1..41e7c38 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -54,7 +54,7 @@ import org.junit.Assert.{assertEquals, assertNull, assertTrue}
 import org.junit.{After, Test}
 
 import scala.collection.JavaConverters._
-import scala.collection.Map
+import scala.collection.{Map, Seq}
 
 class KafkaApisTest {
 
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index ee7472f..e36b371 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -18,6 +18,8 @@ package kafka.server
 
 import java.util.Properties
 
+import scala.collection.Seq
+
 import kafka.utils.TestUtils
 import TestUtils._
 import kafka.zk.ZooKeeperTestHarness
@@ -73,7 +75,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
   }
 
   @Before
-  override def setUp() {
+  override def setUp(): Unit = {
     super.setUp()
 
     configs = TestUtils.createBrokerConfigs(2, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
@@ -91,7 +93,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
   }
 
   @After
-  override def tearDown() {
+  override def tearDown(): Unit = {
     producer.close()
     TestUtils.shutdownServers(servers)
     super.tearDown()
@@ -238,7 +240,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
     assertEquals(hw, hwFile2.read.getOrElse(topicPartition, 0L))
   }
 
-  private def sendMessages(n: Int) {
+  private def sendMessages(n: Int): Unit = {
     (0 until n).map(_ => producer.send(new ProducerRecord(topic, 0, message))).foreach(_.get)
   }
 }
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index f920d94..4041b35 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -31,6 +31,7 @@ import org.junit.{Before, Test}
 import org.apache.kafka.test.TestUtils.isValidClusterId
 
 import scala.collection.JavaConverters._
+import scala.collection.Seq
 
 class MetadataRequestTest extends BaseRequestTest {
 
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index 6607272..9005ec3 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -311,7 +311,6 @@ class ReplicaAlterLogDirsThreadTest {
     val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit]  = EasyMock.newCapture()
 
     val initialFetchOffset = 100
-    val futureReplicaLEO = 111
 
     //Stubs
     expect(replicaManager.getPartitionOrException(t1p0, expectLeader = false))
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
index 8eba824..3a63b3a 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
@@ -17,6 +17,8 @@
 
 package kafka.server
 
+import scala.collection.Seq
+
 import org.junit.{After, Before, Test}
 import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.TestUtils
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index bb4cbda..544c599 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -518,7 +518,6 @@ class ReplicaFetcherThreadTest {
     val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
     val initialFetchOffset = 100
-    val initialLeo = 300
 
     //Stubs
     expect(partition.truncateTo(capture(truncated), anyBoolean())).anyTimes()
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index e1a3e2c..9e39b5f 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -190,7 +190,8 @@ class ReplicaManagerQuotasTest {
     assertFalse("Out of sync replica should not complete", setupDelayedFetch(isReplicaInSync = false).tryComplete())
   }
 
-  def setUpMocks(fetchInfo: Seq[(TopicPartition, PartitionData)], record: SimpleRecord = this.record, bothReplicasInSync: Boolean = false) {
+  def setUpMocks(fetchInfo: Seq[(TopicPartition, PartitionData)], record: SimpleRecord = this.record,
+                 bothReplicasInSync: Boolean = false): Unit = {
     val zkClient: KafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient])
     val scheduler: KafkaScheduler = createNiceMock(classOf[KafkaScheduler])
 
@@ -257,7 +258,7 @@ class ReplicaManagerQuotasTest {
   }
 
   @After
-  def tearDown() {
+  def tearDown(): Unit = {
     if (replicaManager != null)
       replicaManager.shutdown(false)
     metrics.close()
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 5778650..0ed7ff6 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -46,7 +46,7 @@ import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
 import scala.collection.JavaConverters._
-import scala.collection.Map
+import scala.collection.{Map, Seq}
 
 class ReplicaManagerTest {
 
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
index dbb52e7..284d9ab 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
@@ -31,7 +31,7 @@ class ReplicationQuotaManagerTest {
   private val time = new MockTime
 
   @Test
-  def shouldThrottleOnlyDefinedReplicas() {
+  def shouldThrottleOnlyDefinedReplicas(): Unit = {
     val quota = new ReplicationQuotaManager(ReplicationQuotaManagerConfig(), newMetrics, QuotaType.Fetch, time)
     quota.markThrottled("topic1", Seq(1, 2, 3))
 
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index 45c21d2..4583665 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -51,7 +51,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
   var producer: KafkaProducer[Array[Byte], Array[Byte]] = null
 
   @After
-  override def tearDown() {
+  override def tearDown(): Unit = {
     producer.close()
     shutdownServers(brokers)
     super.tearDown()
diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
index afa0d51..d5e6cb1 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
@@ -18,6 +18,8 @@ package kafka.server
 
 import java.util.Properties
 
+import scala.collection.Seq
+
 import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.TestUtils
 import org.junit.{After, Before, Test}
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
index e3b96db..000fea7 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
@@ -19,6 +19,9 @@ package kafka.server.epoch
 
 import java.io.File
 
+import scala.collection.Seq
+import scala.collection.mutable.ListBuffer
+
 import kafka.server.checkpoints.{LeaderEpochCheckpoint, LeaderEpochCheckpointFile}
 import org.apache.kafka.common.requests.EpochEndOffset.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
 import kafka.utils.TestUtils
@@ -26,8 +29,6 @@ import org.apache.kafka.common.TopicPartition
 import org.junit.Assert._
 import org.junit.Test
 
-import scala.collection.mutable.ListBuffer
-
 /**
   * Unit test for the LeaderEpochFileCache.
   */
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index ef3ac85..e6f9156 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -278,7 +278,7 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
 
     def leaderOffsetsFor(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = {
       val partitionData = partitions.mapValues(
-        new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), _))
+        new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), _)).toMap
 
       val request = OffsetsForLeaderEpochRequest.Builder.forFollower(
         ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, partitionData.asJava, 1)
diff --git a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
index 1870a49..fb7b07e 100644
--- a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
@@ -18,6 +18,9 @@ package kafka.utils
 
 import java.io.{File, BufferedWriter, FileWriter}
 import java.util.Properties
+
+import scala.collection.Seq
+
 import kafka.server.KafkaConfig
 import org.apache.kafka.common.utils.Java
 
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 98fbc4e..d97f6c1 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -62,8 +62,10 @@ import org.junit.Assert._
 import org.scalatest.Assertions.fail
 
 import scala.collection.JavaConverters._
-import scala.collection.{Map, mutable}
+import scala.collection.{Map, Seq, mutable}
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.{Await, ExecutionContext, Future}
 
 /**
  * Utility functions to help with testing
@@ -204,10 +206,14 @@ object TestUtils extends Logging {
     * Shutdown `servers` and delete their log directories.
     */
   def shutdownServers(servers: Seq[KafkaServer]) {
-    servers.par.foreach { s =>
-      s.shutdown()
-      CoreUtils.delete(s.config.logDirs)
+    import ExecutionContext.Implicits._
+    val future = Future.traverse(servers) { s =>
+      Future {
+        s.shutdown()
+        CoreUtils.delete(s.config.logDirs)
+      }
     }
+    Await.result(future, FiniteDuration(5, TimeUnit.MINUTES))
   }
 
   /**
diff --git a/core/src/test/scala/unit/kafka/utils/json/JsonValueTest.scala b/core/src/test/scala/unit/kafka/utils/json/JsonValueTest.scala
index 640feed..04e5925 100644
--- a/core/src/test/scala/unit/kafka/utils/json/JsonValueTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/json/JsonValueTest.scala
@@ -17,6 +17,8 @@
 
 package kafka.utils.json
 
+import scala.collection.Seq
+
 import com.fasterxml.jackson.databind.{ObjectMapper, JsonMappingException}
 import org.junit.Test
 import org.junit.Assert._
diff --git a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
index 5a4336b..19c0e96 100644
--- a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
@@ -38,7 +38,7 @@ import org.junit.{After, Test}
 import org.scalatest.Assertions.intercept
 
 import scala.collection.JavaConverters._
-import scala.collection.{Map, immutable}
+import scala.collection.{Map, Seq, immutable}
 
 class AdminZkClientTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
 
diff --git a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
index 32ca969..288b62e 100755
--- a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
+++ b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
@@ -48,7 +48,7 @@ class EmbeddedZookeeper() extends Logging {
   factory.startup(zookeeper)
   val port = zookeeper.getClientPort
 
-  def shutdown() {
+  def shutdown(): Unit = {
     CoreUtils.swallow(zookeeper.shutdown(), this)
     CoreUtils.swallow(factory.shutdown(), this)
 
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 5d8846e..ddadac0 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -785,9 +785,10 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   val initialLeaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch] =
     leaderIsrAndControllerEpochs(0, 0)
 
-  val initialLeaderIsrs: Map[TopicPartition, LeaderAndIsr] = initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr)
+  val initialLeaderIsrs: Map[TopicPartition, LeaderAndIsr] = initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr).toMap
+
   private def leaderIsrs(state: Int, zkVersion: Int): Map[TopicPartition, LeaderAndIsr] =
-    leaderIsrAndControllerEpochs(state, zkVersion).mapValues(_.leaderAndIsr)
+    leaderIsrAndControllerEpochs(state, zkVersion).mapValues(_.leaderAndIsr).toMap
 
   private def checkUpdateLeaderAndIsrResult(
                   expectedSuccessfulPartitions: Map[TopicPartition, LeaderAndIsr],
@@ -849,7 +850,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
       topicPartition20 -> LeaderAndIsr(leader = 0, leaderEpoch = 2, isr = List(3, 4), zkVersion = 0))
 
     checkUpdateLeaderAndIsrResult(
-      leaderIsrs(state = 2, zkVersion = 2).filterKeys{_ == topicPartition10},
+      leaderIsrs(state = 2, zkVersion = 2).filter { case (tp, _) => tp == topicPartition10 },
       ArrayBuffer(topicPartition11),
       Map(
         topicPartition20 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic2/partitions/0/state")),
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index d500373..7ed76aa 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -21,6 +21,8 @@ import java.util.UUID
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 import java.util.concurrent.{ArrayBlockingQueue, ConcurrentLinkedQueue, CountDownLatch, Executors, Semaphore, TimeUnit}
 
+import scala.collection.Seq
+
 import com.yammer.metrics.Metrics
 import com.yammer.metrics.core.{Gauge, Meter, MetricName}
 import kafka.zk.ZooKeeperTestHarness
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 01f10f5..f3924e0 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -100,9 +100,10 @@ versions += [
   powermock: "2.0.2",
   reflections: "0.9.11",
   rocksDB: "5.18.3",
+  scalaCollectionCompat: "2.1.0",
   scalafmt: "1.5.1",
-  scalatest: "3.0.8",
   scalaJava8Compat : "0.9.0",
+  scalatest: "3.0.8",
   scoverage: "1.4.0",
   scoveragePlugin: "2.5.0",
   shadowPlugin: "4.0.4",
@@ -166,11 +167,12 @@ libs += [
   powermockEasymock: "org.powermock:powermock-api-easymock:$versions.powermock",
   reflections: "org.reflections:reflections:$versions.reflections",
   rocksDBJni: "org.rocksdb:rocksdbjni:$versions.rocksDB",
+  scalaCollectionCompat: "org.scala-lang.modules:scala-collection-compat_$versions.baseScala:$versions.scalaCollectionCompat",
+  scalaJava8Compat: "org.scala-lang.modules:scala-java8-compat_$versions.baseScala:$versions.scalaJava8Compat",
   scalaLibrary: "org.scala-lang:scala-library:$versions.scala",
   scalaLogging: "com.typesafe.scala-logging:scala-logging_$versions.baseScala:$versions.scalaLogging",
   scalaReflect: "org.scala-lang:scala-reflect:$versions.scala",
   scalatest: "org.scalatest:scalatest_$versions.baseScala:$versions.scalatest",
-  scalaJava8Compat: "org.scala-lang.modules:scala-java8-compat_$versions.baseScala:$versions.scalaJava8Compat",
   scoveragePlugin: "org.scoverage:scalac-scoverage-plugin_$versions.baseScala:$versions.scoverage",
   scoverageRuntime: "org.scoverage:scalac-scoverage-runtime_$versions.baseScala:$versions.scoverage",
   slf4jApi: "org.slf4j:slf4j-api:$versions.slf4j",
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index 6f91df0..e843d9f 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -76,7 +76,9 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
             NM_METHOD_NAMING_CONVENTION: Method names should start with a lower case letter.
             EC_NULL_ARG: Call to equals(null)
             NP_ALWAYS_NULL: Null pointer dereference
-            MS_CANNOT_BE_FINAL: Field isn't final and can't be protected from malicious code -->
+            MS_CANNOT_BE_FINAL: Field isn't final and can't be protected from malicious code
+            IC_INIT_CIRCULARITY: Initialization circularity
+            SE_NO_SUITABLE_CONSTRUCTOR: Class is Serializable but its superclass doesn't define a void constructor -->
         <Source name="~.*\.scala" />
         <Or>
             <Bug pattern="NP_LOAD_OF_KNOWN_NULL_VALUE"/>
@@ -96,6 +98,8 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
             <Bug pattern="EC_NULL_ARG"/>
             <Bug pattern="NP_ALWAYS_NULL"/>
             <Bug pattern="MS_CANNOT_BE_FINAL"/>
+            <Bug pattern="IC_INIT_CIRCULARITY"/>
+            <Bug pattern="SE_NO_SUITABLE_CONSTRUCTOR"/>
         </Or>
     </Match>
 
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
index 98fb12b..f3a9b43 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
@@ -21,7 +21,13 @@ package org.apache.kafka.streams.scala
 package kstream
 
 import org.apache.kafka.streams.kstream.internals.KTableImpl
-import org.apache.kafka.streams.kstream.{KGroupedStream => KGroupedStreamJ, KTable => KTableJ, _}
+import org.apache.kafka.streams.kstream.{
+  SessionWindows,
+  Window,
+  Windows,
+  KGroupedStream => KGroupedStreamJ,
+  KTable => KTableJ
+}
 import org.apache.kafka.streams.scala.ImplicitConversions._
 import org.apache.kafka.streams.scala.FunctionsCompatConversions._
 
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index 5df9de8..382b040 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -21,7 +21,15 @@ package org.apache.kafka.streams.scala
 package kstream
 
 import org.apache.kafka.streams.KeyValue
-import org.apache.kafka.streams.kstream.{KStream => KStreamJ, _}
+import org.apache.kafka.streams.kstream.{
+  GlobalKTable,
+  JoinWindows,
+  Printed,
+  TransformerSupplier,
+  ValueTransformerSupplier,
+  ValueTransformerWithKeySupplier,
+  KStream => KStreamJ
+}
 import org.apache.kafka.streams.processor.{Processor, ProcessorSupplier, TopicNameExtractor}
 import org.apache.kafka.streams.scala.FunctionsCompatConversions._
 import org.apache.kafka.streams.scala.ImplicitConversions._
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
index 20ee08b..56549ad 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
@@ -21,7 +21,7 @@ package org.apache.kafka.streams.scala
 package kstream
 
 import org.apache.kafka.common.utils.Bytes
-import org.apache.kafka.streams.kstream.{KTable => KTableJ, _}
+import org.apache.kafka.streams.kstream.{Suppressed, ValueTransformerWithKeySupplier, KTable => KTableJ}
 import org.apache.kafka.streams.scala.FunctionsCompatConversions._
 import org.apache.kafka.streams.scala.ImplicitConversions._
 import org.apache.kafka.streams.state.KeyValueStore
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
index f820c3e..8752805 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
@@ -18,7 +18,7 @@ package org.apache.kafka.streams.scala
 
 import java.util.Properties
 
-import org.apache.kafka.streams._
+import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig}
 import org.apache.kafka.streams.scala.ImplicitConversions._
 import org.apache.kafka.streams.scala.kstream._
 import org.apache.kafka.streams.scala.utils.StreamToTableJoinScalaIntegrationTestBase
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index a1c516b..a265f15 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -47,7 +47,7 @@ import org.apache.kafka.streams.processor.{AbstractProcessor, ProcessorContext,
 import org.apache.kafka.streams.scala.ImplicitConversions._
 import org.apache.kafka.streams.scala.Serdes._
 import org.apache.kafka.streams.scala.kstream._
-import org.apache.kafka.streams.{StreamsBuilder => StreamsBuilderJ, _}
+import org.apache.kafka.streams.{KeyValue, StreamsConfig, TopologyDescription, StreamsBuilder => StreamsBuilderJ}
 import org.junit.Assert._
 import org.junit._
 
@@ -103,7 +103,7 @@ class TopologyTest {
     // build the Scala topology
     def getTopologyScala: TopologyDescription = {
 
-      import Serdes._
+      import org.apache.kafka.streams.scala.Serdes._
 
       val streamBuilder = new StreamsBuilder
       val textLines = streamBuilder.stream[String, String](inputTopic)
@@ -230,12 +230,12 @@ class TopologyTest {
           .transform(new TransformerSupplier[String, String, KeyValue[String, String]] {
             override def get(): Transformer[String, String, KeyValue[String, String]] =
               new Transformer[String, String, KeyValue[String, String]] {
-                override def init(context: ProcessorContext): Unit = Unit
+                override def init(context: ProcessorContext): Unit = ()
 
                 override def transform(key: String, value: String): KeyValue[String, String] =
                   new KeyValue(key, value.toLowerCase)
 
-                override def close(): Unit = Unit
+                override def close(): Unit = ()
               }
           })
           .groupBy((_, v) => v)
@@ -254,12 +254,12 @@ class TopologyTest {
         .transform(new TransformerSupplier[String, String, KeyValue[String, String]] {
           override def get(): Transformer[String, String, KeyValue[String, String]] =
             new Transformer[String, String, KeyValue[String, String]] {
-              override def init(context: ProcessorContext): Unit = Unit
+              override def init(context: ProcessorContext): Unit = ()
 
               override def transform(key: String, value: String): KeyValue[String, String] =
                 new KeyValue(key, value.toLowerCase)
 
-              override def close(): Unit = Unit
+              override def close(): Unit = ()
             }
         })
 
@@ -385,7 +385,7 @@ class TopologyTest {
         .mapValues[String](valueMapper)
         .process(processorSupplier)
 
-      val stream2 = mappedStream.groupByKey
+      val stream2: KStreamJ[String, Integer] = mappedStream.groupByKey
         .aggregate(initializer, aggregator, MaterializedJ.`with`(Serdes.String, SerdesJ.Integer))
         .toStream
       stream2.to(AGGREGATION_TOPIC, Produced.`with`(Serdes.String, SerdesJ.Integer))
@@ -407,10 +407,10 @@ class TopologyTest {
         .filter(new Predicate[String, String] {
           override def test(key: String, value: String): Boolean = key == "A"
         })
-        .join(stream2,
-              valueJoiner2,
-              JoinWindows.of(Duration.ofMillis(5000)),
-              JoinedJ.`with`(Serdes.String, Serdes.String, SerdesJ.Integer))
+        .join[Integer, String](stream2,
+                               valueJoiner2,
+                               JoinWindows.of(Duration.ofMillis(5000)),
+                               JoinedJ.`with`(Serdes.String, Serdes.String, SerdesJ.Integer))
         .to(JOINED_TOPIC)
 
       mappedStream
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
index 8980efd..101647e 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
@@ -25,16 +25,15 @@ import java.util.regex.Pattern
 import org.junit.Assert._
 import org.junit._
 import org.junit.rules.TemporaryFolder
-import org.apache.kafka.streams.KeyValue
-import org.apache.kafka.streams._
+import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig}
 import org.apache.kafka.streams.scala.kstream._
 import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, IntegrationTestUtils}
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.clients.producer.ProducerConfig
-import org.apache.kafka.common.serialization._
 import org.apache.kafka.common.utils.MockTime
 import org.apache.kafka.test.{IntegrationTest, TestUtils}
 import ImplicitConversions._
+import org.apache.kafka.common.serialization.{LongDeserializer, StringDeserializer, StringSerializer}
 import org.junit.experimental.categories.Category
 
 /**


Mime
View raw message