kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [2/2] kafka git commit: KAFKA-4377; remove deprecated scala.collection.JavaConversions calls
Date Fri, 18 Nov 2016 13:54:56 GMT
KAFKA-4377; remove deprecated scala.collection.JavaConversions calls

JavaConversions are deprecated in 2.12 in favour of JavaConverters.

Author: Bernard Leach <leachbj@bouncycastle.org>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #2101 from leachbj/4377-java-converters


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0e7ba700
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0e7ba700
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0e7ba700

Branch: refs/heads/trunk
Commit: 0e7ba70008fa8961ae55861a4dd3a5370d1d5a69
Parents: 079ea89
Author: Bernard Leach <leachbj@bouncycastle.org>
Authored: Fri Nov 18 04:10:43 2016 +0000
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Fri Nov 18 04:29:55 2016 +0000

----------------------------------------------------------------------
 core/src/main/scala/kafka/Kafka.scala           |  4 ++--
 .../main/scala/kafka/admin/AdminClient.scala    | 16 ++++++-------
 .../src/main/scala/kafka/admin/AdminUtils.scala |  8 ++-----
 .../main/scala/kafka/admin/ConfigCommand.scala  | 24 ++++++++++----------
 .../main/scala/kafka/admin/TopicCommand.scala   | 16 ++++++-------
 .../main/scala/kafka/api/FetchResponse.scala    |  8 +++----
 .../scala/kafka/consumer/BaseConsumer.scala     | 22 ++++--------------
 .../consumer/ZookeeperConsumerConnector.scala   | 17 +++++++-------
 .../consumer/ZookeeperTopicEventWatcher.scala   |  8 +++----
 .../controller/PartitionStateMachine.scala      | 13 ++++-------
 .../kafka/controller/ReplicaStateMachine.scala  | 16 ++++++-------
 .../kafka/javaapi/OffsetCommitRequest.scala     |  7 ++----
 .../kafka/javaapi/OffsetCommitResponse.scala    |  7 ++----
 .../kafka/javaapi/OffsetFetchRequest.scala      | 10 ++------
 .../kafka/javaapi/OffsetFetchResponse.scala     |  7 ++----
 .../scala/kafka/javaapi/OffsetRequest.scala     |  8 ++-----
 .../scala/kafka/javaapi/TopicMetadata.scala     | 22 ++++--------------
 .../kafka/javaapi/TopicMetadataRequest.scala    |  7 ++----
 .../consumer/ZookeeperConsumerConnector.scala   | 11 ++++-----
 core/src/main/scala/kafka/log/Log.scala         | 13 ++++-------
 core/src/main/scala/kafka/log/LogManager.scala  |  7 +++---
 .../scala/kafka/metrics/KafkaMetricsGroup.scala |  3 ++-
 .../scala/kafka/server/ClientQuotaManager.scala |  4 ++--
 .../src/main/scala/kafka/server/KafkaApis.scala | 22 ++++++++----------
 .../main/scala/kafka/server/KafkaConfig.scala   | 19 +++-------------
 .../scala/kafka/tools/ConsoleProducer.scala     |  7 +++---
 .../scala/kafka/tools/ConsumerPerformance.scala | 16 +++++++------
 .../scala/kafka/tools/EndToEndLatency.scala     | 11 ++++-----
 .../scala/kafka/tools/ExportZkOffsets.scala     |  5 ++--
 core/src/main/scala/kafka/tools/JmxTool.scala   |  8 +++----
 .../main/scala/kafka/tools/MirrorMaker.scala    |  6 ++---
 .../scala/kafka/tools/ReplayLogProducer.scala   |  4 ++--
 .../scala/kafka/tools/SimpleConsumerShell.scala |  4 ++--
 core/src/main/scala/kafka/utils/Pool.scala      | 18 +++++----------
 .../kafka/utils/VerifiableProperties.scala      |  8 +++----
 core/src/main/scala/kafka/utils/ZkUtils.scala   | 11 +++------
 .../integration/kafka/api/AdminClientTest.scala | 11 +++++----
 .../kafka/api/ConsumerBounceTest.scala          | 17 +++++++-------
 .../other/kafka/TestPurgatoryPerformance.scala  |  4 ++--
 .../message/BaseMessageSetTestCases.scala       | 10 +++-----
 .../unit/kafka/log/BrokerCompressionTest.scala  |  6 ++---
 .../scala/unit/kafka/metrics/MetricsTest.scala  |  6 ++---
 .../kafka/server/ApiVersionsRequestTest.scala   |  4 ++--
 .../test/scala/unit/kafka/utils/TestUtils.scala |  3 +--
 44 files changed, 181 insertions(+), 277 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/Kafka.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala
index 6b551ce..88508b5 100755
--- a/core/src/main/scala/kafka/Kafka.scala
+++ b/core/src/main/scala/kafka/Kafka.scala
@@ -24,7 +24,7 @@ import kafka.server.{KafkaServer, KafkaServerStartable}
 import kafka.utils.{CommandLineUtils, Logging}
 import org.apache.kafka.common.utils.Utils
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 object Kafka extends Logging {
 
@@ -47,7 +47,7 @@ object Kafka extends Logging {
         CommandLineUtils.printUsageAndDie(optionParser, "Found non argument parameters: " + options.nonOptionArguments().toArray.mkString(","))
       }
 
-      props.putAll(CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt)))
+      props.putAll(CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt).asScala))
     }
     props
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 592fecf..9cd4823 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -13,7 +13,7 @@
 package kafka.admin
 
 import java.nio.ByteBuffer
-import java.util.Properties
+import java.util.{Collections, Properties}
 import java.util.concurrent.atomic.AtomicInteger
 
 import kafka.common.KafkaException
@@ -23,7 +23,6 @@ import org.apache.kafka.clients._
 import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol, RequestFuture}
 import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
-import org.apache.kafka.common.errors.DisconnectException
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.Selector
 import org.apache.kafka.common.protocol.types.Struct
@@ -32,7 +31,6 @@ import org.apache.kafka.common.requests._
 import org.apache.kafka.common.utils.{SystemTime, Time, Utils}
 import org.apache.kafka.common.{Cluster, Node, TopicPartition}
 
-import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 
 class AdminClient(val time: Time,
@@ -76,11 +74,11 @@ class AdminClient(val time: Time,
   def listGroups(node: Node): List[GroupOverview] = {
     val response = send(node, ApiKeys.LIST_GROUPS, new ListGroupsRequest()).asInstanceOf[ListGroupsResponse]
     Errors.forCode(response.errorCode()).maybeThrow()
-    response.groups().map(group => GroupOverview(group.groupId(), group.protocolType())).toList
+    response.groups().asScala.map(group => GroupOverview(group.groupId(), group.protocolType())).toList
   }
 
   private def findAllBrokers(): List[Node] = {
-    val request = new MetadataRequest(List[String]())
+    val request = new MetadataRequest(Collections.emptyList[String])
     val response = sendAnyNode(ApiKeys.METADATA, request).asInstanceOf[MetadataResponse]
     val errors = response.errors()
     if (!errors.isEmpty)
@@ -135,7 +133,7 @@ class AdminClient(val time: Time,
 
   def describeConsumerGroup(groupId: String): ConsumerGroupSummary = {
     val coordinator = findCoordinator(groupId)
-    val responseBody = send(coordinator, ApiKeys.DESCRIBE_GROUPS, new DescribeGroupsRequest(List(groupId).asJava))
+    val responseBody = send(coordinator, ApiKeys.DESCRIBE_GROUPS, new DescribeGroupsRequest(Collections.singletonList(groupId)))
     val response = responseBody.asInstanceOf[DescribeGroupsResponse]
     val metadata = response.groups.get(groupId)
     if (metadata == null)
@@ -144,11 +142,11 @@ class AdminClient(val time: Time,
       throw new IllegalArgumentException(s"Consumer Group $groupId with protocol type '${metadata.protocolType}' is not a valid consumer group")
 
     Errors.forCode(metadata.errorCode()).maybeThrow()
-    val consumers = metadata.members.map { consumer =>
+    val consumers = metadata.members.asScala.map { consumer =>
       ConsumerSummary(consumer.memberId, consumer.clientId, consumer.clientHost, metadata.state match {
         case "Stable" =>
           val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(Utils.readBytes(consumer.memberAssignment)))
-          assignment.partitions.toList
+          assignment.partitions.asScala.toList
         case _ =>
           List()
       })
@@ -190,7 +188,7 @@ object AdminClient {
     config
   }
 
-  class AdminConfig(originals: Map[_,_]) extends AbstractConfig(AdminConfigDef, originals, false)
+  class AdminConfig(originals: Map[_,_]) extends AbstractConfig(AdminConfigDef, originals.asJava, false)
 
   def createSimplePlaintext(brokerUrl: String): AdminClient = {
     val config = Map(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> brokerUrl)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index d3ce217..e95d327 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
 import org.apache.kafka.common.requests.MetadataResponse
 
 import scala.collection._
-import JavaConverters._
+import scala.collection.JavaConverters._
 import mutable.ListBuffer
 import scala.collection.mutable
 import collection.Map
@@ -560,11 +560,7 @@ object AdminUtils extends Logging with AdminUtilities {
    * Write out the entity config to zk, if there is any
    */
   private def writeEntityConfig(zkUtils: ZkUtils, entityPath: String, config: Properties) {
-    val configMap: mutable.Map[String, String] = {
-      import JavaConversions._
-      config
-    }
-    val map = Map("version" -> 1, "config" -> configMap)
+    val map = Map("version" -> 1, "config" -> config.asScala)
     zkUtils.updatePersistentPath(entityPath, Json.encode(map))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 34df6b0..aa3780e 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -20,13 +20,13 @@ package kafka.admin
 import java.util.Properties
 import joptsimple._
 import kafka.common.Config
-import kafka.log.{LogConfig}
+import kafka.log.LogConfig
 import kafka.server.{ConfigEntityName, QuotaId}
 import kafka.server.{DynamicConfig, ConfigType}
 import kafka.utils.{CommandLineUtils, ZkUtils}
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.utils.Utils
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection._
 
 
@@ -113,7 +113,7 @@ object ConfigCommand extends Config {
       // When describing all users, don't include empty user nodes with only <user, client> quota overrides.
       if (!configs.isEmpty || !describeAllUsers) {
         println("Configs for %s are %s"
-          .format(entity, configs.map(kv => kv._1 + "=" + kv._2).mkString(",")))
+          .format(entity, configs.asScala.map(kv => kv._1 + "=" + kv._2).mkString(",")))
       }
     }
   }
@@ -138,7 +138,7 @@ object ConfigCommand extends Config {
 
   private[admin] def parseConfigsToBeDeleted(opts: ConfigCommandOptions): Seq[String] = {
     if (opts.options.has(opts.deleteConfig)) {
-      val configsToBeDeleted = opts.options.valuesOf(opts.deleteConfig).map(_.trim())
+      val configsToBeDeleted = opts.options.valuesOf(opts.deleteConfig).asScala.map(_.trim())
       val propsToBeDeleted = new Properties
       configsToBeDeleted.foreach(propsToBeDeleted.setProperty(_, ""))
       configsToBeDeleted
@@ -214,7 +214,7 @@ object ConfigCommand extends Config {
   }
 
   private[admin] def parseEntity(opts: ConfigCommandOptions): ConfigEntity = {
-    val entityTypes = opts.options.valuesOf(opts.entityType)
+    val entityTypes = opts.options.valuesOf(opts.entityType).asScala
     if (entityTypes.head == ConfigType.User || entityTypes.head == ConfigType.Client)
       parseQuotaEntity(opts)
     else {
@@ -225,9 +225,9 @@ object ConfigCommand extends Config {
   }
 
   private def parseQuotaEntity(opts: ConfigCommandOptions): ConfigEntity = {
-    val types = opts.options.valuesOf(opts.entityType)
+    val types = opts.options.valuesOf(opts.entityType).asScala
     val namesIterator = opts.options.valuesOf(opts.entityName).iterator
-    val names = opts.options.specs
+    val names = opts.options.specs.asScala
                     .filter(spec => spec.options.contains("entity-name") || spec.options.contains("entity-default"))
                     .map(spec => if (spec.options.contains("entity-name")) namesIterator.next else "")
 
@@ -235,7 +235,7 @@ object ConfigCommand extends Config {
       throw new IllegalArgumentException("--entity-name or --entity-default must be specified with each --entity-type for --alter")
 
     val reverse = types.size == 2 && types(0) == ConfigType.Client
-    val entityTypes = if (reverse) types.reverse else types.toBuffer
+    val entityTypes = if (reverse) types.reverse else types
     val sortedNames = (if (reverse && names.length == 2) names.reverse else names).iterator
 
     def sanitizeName(entityType: String, name: String) = {
@@ -276,9 +276,9 @@ object ConfigCommand extends Config {
     val nl = System.getProperty("line.separator")
     val addConfig = parser.accepts("add-config", "Key Value pairs of configs to add. Square brackets can be used to group values which contain commas: 'k1=v1,k2=[v1,v2,v2],k3=v3'. The following is a list of valid configurations: " +
             "For entity_type '" + ConfigType.Topic + "': " + LogConfig.configNames.map("\t" + _).mkString(nl, nl, nl) +
-            "For entity_type '" + ConfigType.Broker + "': " + DynamicConfig.Broker.names.map("\t" + _).mkString(nl, nl, nl) +
-            "For entity_type '" + ConfigType.User + "': " + DynamicConfig.Client.names.map("\t" + _).mkString(nl, nl, nl) +
-            "For entity_type '" + ConfigType.Client + "': " + DynamicConfig.Client.names.map("\t" + _).mkString(nl, nl, nl) +
+            "For entity_type '" + ConfigType.Broker + "': " + DynamicConfig.Broker.names.asScala.map("\t" + _).mkString(nl, nl, nl) +
+            "For entity_type '" + ConfigType.User + "': " + DynamicConfig.Client.names.asScala.map("\t" + _).mkString(nl, nl, nl) +
+            "For entity_type '" + ConfigType.Client + "': " + DynamicConfig.Client.names.asScala.map("\t" + _).mkString(nl, nl, nl) +
             s"Entity types '${ConfigType.User}' and '${ConfigType.Client}' may be specified together to update config for clients of a specific user.")
             .withRequiredArg
             .ofType(classOf[String])
@@ -302,7 +302,7 @@ object ConfigCommand extends Config {
       CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, entityType)
       CommandLineUtils.checkInvalidArgs(parser, options, alterOpt, Set(describeOpt))
       CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, Set(alterOpt, addConfig, deleteConfig))
-      val entityTypeVals = options.valuesOf(entityType)
+      val entityTypeVals = options.valuesOf(entityType).asScala
       if(options.has(alterOpt)) {
         if (entityTypeVals.contains(ConfigType.User) || entityTypeVals.contains(ConfigType.Client)) {
           if (!options.has(entityName) && !options.has(entityDefault))

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 2fcc2ce..1078ba2 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -20,8 +20,8 @@ package kafka.admin
 import java.util.Properties
 import joptsimple._
 import kafka.common.{AdminCommandFailedException, Topic}
-import kafka.consumer.{ConsumerConfig => OldConsumerConfig, Whitelist}
-import kafka.log.{Defaults, LogConfig}
+import kafka.consumer.Whitelist
+import kafka.log.LogConfig
 import kafka.server.ConfigType
 import kafka.utils.ZkUtils._
 import kafka.utils._
@@ -29,9 +29,9 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import org.apache.kafka.common.errors.TopicExistsException
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.utils.Utils
-import scala.collection.JavaConversions._
+
+import scala.collection.JavaConverters._
 import scala.collection._
-import org.apache.kafka.clients.consumer.{ConsumerConfig => NewConsumerConfig}
 
 
 object TopicCommand extends Logging {
@@ -199,8 +199,8 @@ object TopicCommand extends Logging {
           val describePartitions: Boolean = !reportOverriddenConfigs
           val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
           if (describeConfigs) {
-            val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
-            if (!reportOverriddenConfigs || configs.size() != 0) {
+            val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic).asScala
+            if (!reportOverriddenConfigs || configs.nonEmpty) {
               val numPartitions = topicPartitionAssignment.size
               val replicationFactor = topicPartitionAssignment.head._2.size
               println("Topic:%s\tPartitionCount:%d\tReplicationFactor:%d\tConfigs:%s"
@@ -229,7 +229,7 @@ object TopicCommand extends Logging {
   }
 
   def parseTopicConfigsToBeAdded(opts: TopicCommandOptions): Properties = {
-    val configsToBeAdded = opts.options.valuesOf(opts.configOpt).map(_.split("""\s*=\s*"""))
+    val configsToBeAdded = opts.options.valuesOf(opts.configOpt).asScala.map(_.split("""\s*=\s*"""))
     require(configsToBeAdded.forall(config => config.length == 2),
       "Invalid topic config: all configs to be added must be in the format \"key=val\".")
     val props = new Properties
@@ -244,7 +244,7 @@ object TopicCommand extends Logging {
 
   def parseTopicConfigsToBeDeleted(opts: TopicCommandOptions): Seq[String] = {
     if (opts.options.has(opts.deleteConfigOpt)) {
-      val configsToBeDeleted = opts.options.valuesOf(opts.deleteConfigOpt).map(_.trim())
+      val configsToBeDeleted = opts.options.valuesOf(opts.deleteConfigOpt).asScala.map(_.trim())
       val propsToBeDeleted = new Properties
       configsToBeDeleted.foreach(propsToBeDeleted.setProperty(_, ""))
       LogConfig.validateNames(propsToBeDeleted)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/api/FetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala
index d31d4ba..5e2a999 100644
--- a/core/src/main/scala/kafka/api/FetchResponse.scala
+++ b/core/src/main/scala/kafka/api/FetchResponse.scala
@@ -21,14 +21,14 @@ import java.nio.ByteBuffer
 import java.nio.channels.GatheringByteChannel
 
 import kafka.common.TopicAndPartition
-import kafka.message.{MessageSet, ByteBufferMessageSet}
+import kafka.message.{ByteBufferMessageSet, MessageSet}
 import kafka.api.ApiUtils._
 import org.apache.kafka.common.KafkaException
-import org.apache.kafka.common.network.{Send, MultiSend}
+import org.apache.kafka.common.network.{MultiSend, Send}
 import org.apache.kafka.common.protocol.Errors
 
 import scala.collection._
-import JavaConverters._
+import scala.collection.JavaConverters._
 
 object FetchResponsePartitionData {
   def readFrom(buffer: ByteBuffer): FetchResponsePartitionData = {
@@ -139,7 +139,7 @@ class TopicDataSend(val dest: String, val topicData: TopicData) extends Send {
   buffer.rewind()
 
   private val sends = new MultiSend(dest,
-                            JavaConversions.seqAsJavaList(topicData.partitionData.toList.map(d => new PartitionDataSend(d._1, d._2))))
+    topicData.partitionData.map(d => new PartitionDataSend(d._1, d._2): Send).asJava)
 
   override def writeTo(channel: GatheringByteChannel): Long = {
     if (completed)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/consumer/BaseConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
index 6e232a8..c1ee7cd 100644
--- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
@@ -17,24 +17,14 @@
 
 package kafka.consumer
 
-import java.util.Properties
+import java.util.{Collections, Properties}
 import java.util.regex.Pattern
 
-import kafka.api.FetchRequestBuilder
 import kafka.api.OffsetRequest
-import kafka.api.Request
-import kafka.client.ClientUtils
-import kafka.cluster.BrokerEndPoint
 import kafka.common.StreamEndException
 import kafka.message.Message
-import kafka.common.TopicAndPartition
-import kafka.message.MessageAndOffset
-import kafka.utils.ToolsUtils
-
-import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.common.record.TimestampType
-import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.TopicPartition
 
 /**
@@ -60,8 +50,6 @@ case class BaseConsumerRecord(topic: String,
 class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], offset: Option[Long], whitelist: Option[String], consumerProps: Properties, val timeoutMs: Long = Long.MaxValue) extends BaseConsumer {
   import org.apache.kafka.clients.consumer.KafkaConsumer
 
-  import scala.collection.JavaConversions._
-
   val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
   consumerInit()
   var recordIter = consumer.poll(0).iterator
@@ -74,7 +62,7 @@ class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], offset:
         // default to latest if no offset is provided
         seek(topic, partitionId, OffsetRequest.LatestTime)
       case (Some(topic), None, None, None) =>
-        consumer.subscribe(List(topic))
+        consumer.subscribe(Collections.singletonList(topic))
       case (None, None, None, Some(whitelist)) =>
         consumer.subscribe(Pattern.compile(whitelist), new NoOpConsumerRebalanceListener())
       case _ =>
@@ -87,10 +75,10 @@ class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], offset:
 
   def seek(topic: String, partitionId: Int, offset: Long) {
     val topicPartition = new TopicPartition(topic, partitionId)
-    consumer.assign(List(topicPartition))
+    consumer.assign(Collections.singletonList(topicPartition))
     offset match {
-      case OffsetRequest.EarliestTime => consumer.seekToBeginning(List(topicPartition))
-      case OffsetRequest.LatestTime => consumer.seekToEnd(List(topicPartition))
+      case OffsetRequest.EarliestTime => consumer.seekToBeginning(Collections.singletonList(topicPartition))
+      case OffsetRequest.LatestTime => consumer.seekToEnd(Collections.singletonList(topicPartition))
       case _ => consumer.seek(topicPartition, offset)
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index fbe7089..22a0c9a 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -42,7 +42,7 @@ import org.apache.kafka.common.security.JaasUtils
 import org.apache.zookeeper.Watcher.Event.KeeperState
 
 import scala.collection._
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 /**
  * This class handles the consumers interaction with zookeeper
@@ -696,9 +696,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
             if (topicRegistry.size == 0)
               new java.util.HashMap[String, java.util.Set[java.lang.Integer]]
             else
-              mapAsJavaMap(topicRegistry.map(topics =>
-                topics._1 -> topics._2.keys
-              ).toMap).asInstanceOf[java.util.Map[String, java.util.Set[java.lang.Integer]]]
+              topicRegistry.map(topics =>
+                topics._1 -> topics._2.keys   // note this is incorrect, see KAFKA-2284
+              ).toMap.asJava.asInstanceOf[java.util.Map[String, java.util.Set[java.lang.Integer]]]
           )
         }
         releasePartitionOwnership(topicRegistry)
@@ -755,14 +755,13 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                 case (topic, partitionOwnerShips) =>
                   val partitionOwnershipForTopicScalaMap = partitionOwnerShips.map({
                     case (topicAndPartition, consumerThreadId) =>
-                      topicAndPartition.partition -> consumerThreadId
-                  })
-                  topic -> mapAsJavaMap(collection.mutable.Map(partitionOwnershipForTopicScalaMap.toSeq:_*))
-                    .asInstanceOf[java.util.Map[java.lang.Integer, ConsumerThreadId]]
+                      (topicAndPartition.partition: Integer) -> consumerThreadId
+                  }).toMap
+                  topic -> partitionOwnershipForTopicScalaMap.asJava
               })
               consumerRebalanceListener.beforeStartingFetchers(
                 consumerIdString,
-                mapAsJavaMap(collection.mutable.Map(partitionAssigmentMapForCallback.toSeq:_*))
+                partitionAssigmentMapForCallback.asJava
               )
             }
             updateFetcher(cluster)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
index 0cd22f0..b9f2d41 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
@@ -17,9 +17,9 @@
 
 package kafka.consumer
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import kafka.utils.{ZkUtils, Logging}
-import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
+import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener}
 import org.apache.zookeeper.Watcher.Event.KeeperState
 
 class ZookeeperTopicEventWatcher(val zkUtils: ZkUtils,
@@ -37,7 +37,7 @@ class ZookeeperTopicEventWatcher(val zkUtils: ZkUtils,
       new ZkSessionExpireListener(topicEventListener))
 
     val topics = zkUtils.zkClient.subscribeChildChanges(
-      ZkUtils.BrokerTopicsPath, topicEventListener).toList
+      ZkUtils.BrokerTopicsPath, topicEventListener)
 
     // call to bootstrap topic list
     topicEventListener.handleChildChange(ZkUtils.BrokerTopicsPath, topics)
@@ -64,7 +64,7 @@ class ZookeeperTopicEventWatcher(val zkUtils: ZkUtils,
       lock.synchronized {
         try {
           if (zkUtils != null) {
-            val latestTopics = zkUtils.zkClient.getChildren(ZkUtils.BrokerTopicsPath).toList
+            val latestTopics = zkUtils.zkClient.getChildren(ZkUtils.BrokerTopicsPath).asScala
             debug("all topics: %s".format(latestTopics))
             eventHandler.handleTopicEvent(latestTopics)
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index ee94b46..a5285c3 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -17,8 +17,7 @@
 package kafka.controller
 
 import collection._
-import collection.JavaConversions
-import collection.mutable.Buffer
+import collection.JavaConverters._
 import java.util.concurrent.atomic.AtomicBoolean
 import kafka.api.LeaderAndIsr
 import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
@@ -416,9 +415,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
         if (hasStarted.get) {
           try {
             val currentChildren = {
-              import JavaConversions._
-              debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
-              (children: Buffer[String]).toSet
+              debug("Topic change listener fired for path %s with children %s".format(parentPath, children.asScala.mkString(",")))
+              children.asScala.toSet
             }
             val newTopics = currentChildren -- controllerContext.allTopics
             val deletedTopics = controllerContext.allTopics -- currentChildren
@@ -456,10 +454,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
     @throws(classOf[Exception])
     def handleChildChange(parentPath : String, children : java.util.List[String]) {
       inLock(controllerContext.controllerLock) {
-        var topicsToBeDeleted = {
-          import JavaConversions._
-          (children: Buffer[String]).toSet
-        }
+        var topicsToBeDeleted = children.asScala.toSet
         debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(",")))
         val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics
         if(nonExistentTopics.nonEmpty) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 03887ae..a26f95a 100755
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -17,13 +17,13 @@
 package kafka.controller
 
 import collection._
-import collection.JavaConversions._
+import collection.JavaConverters._
 import java.util.concurrent.atomic.AtomicBoolean
-import kafka.common.{TopicAndPartition, StateChangeFailedException}
-import kafka.utils.{ZkUtils, ReplicationUtils, Logging}
+
+import kafka.common.{StateChangeFailedException, TopicAndPartition}
+import kafka.controller.Callbacks.CallbackBuilder
+import kafka.utils.{Logging, ReplicationUtils, ZkUtils}
 import org.I0Itec.zkclient.IZkChildListener
-import org.apache.log4j.Logger
-import kafka.controller.Callbacks._
 import kafka.utils.CoreUtils._
 
 /**
@@ -300,7 +300,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
   }
 
   def replicasInDeletionStates(topic: String): Set[PartitionAndReplica] = {
-    val deletionStates = Set(ReplicaDeletionStarted, ReplicaDeletionSuccessful, ReplicaDeletionIneligible)
+    val deletionStates = Set[ReplicaState](ReplicaDeletionStarted, ReplicaDeletionSuccessful, ReplicaDeletionIneligible)
     replicaState.filter(r => r._1.topic.equals(topic) && deletionStates.contains(r._2)).keySet
   }
 
@@ -351,12 +351,12 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
   class BrokerChangeListener() extends IZkChildListener with Logging {
     this.logIdent = "[BrokerChangeListener on Controller " + controller.config.brokerId + "]: "
     def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
-      info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.sorted.mkString(",")))
+      info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.asScala.sorted.mkString(",")))
       inLock(controllerContext.controllerLock) {
         if (hasStarted.get) {
           ControllerStats.leaderElectionTimer.time {
             try {
-              val curBrokers = currentBrokerList.map(_.toInt).toSet.flatMap(zkUtils.getBrokerInfo)
+              val curBrokers = currentBrokerList.asScala.map(_.toInt).toSet.flatMap(zkUtils.getBrokerInfo)
               val curBrokerIds = curBrokers.map(_.id)
               val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
               val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
index 456c3c4..1924d5e 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
@@ -18,6 +18,7 @@
 package kafka.javaapi
 
 import kafka.common.{OffsetAndMetadata, TopicAndPartition}
+import scala.collection.JavaConverters._
 
 class OffsetCommitRequest(groupId: String,
                           requestInfo: java.util.Map[TopicAndPartition, OffsetAndMetadata],
@@ -25,11 +26,7 @@ class OffsetCommitRequest(groupId: String,
                           clientId: String,
                           versionId: Short) {
   val underlying = {
-    val scalaMap: collection.immutable.Map[TopicAndPartition, OffsetAndMetadata] = {
-      import collection.JavaConversions._
-
-      requestInfo.toMap
-    }
+    val scalaMap: collection.immutable.Map[TopicAndPartition, OffsetAndMetadata] = requestInfo.asScala.toMap
     kafka.api.OffsetCommitRequest(
       groupId = groupId,
       requestInfo = scalaMap,

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala b/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
index b222329..c79f5b6 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
@@ -20,14 +20,11 @@ package kafka.javaapi
 import java.nio.ByteBuffer
 
 import kafka.common.TopicAndPartition
-import collection.JavaConversions
+import scala.collection.JavaConverters._
 
 class OffsetCommitResponse(private val underlying: kafka.api.OffsetCommitResponse) {
 
-  def errors: java.util.Map[TopicAndPartition, Short] = {
-    import JavaConversions._
-    underlying.commitStatus
-  }
+  def errors: java.util.Map[TopicAndPartition, Short] = underlying.commitStatus.asJava
 
   def hasError = underlying.hasError
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala
index 818ae33..8eb0d47 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala
@@ -18,9 +18,7 @@
 package kafka.javaapi
 
 import kafka.common.TopicAndPartition
-import scala.collection.mutable
-import collection.JavaConversions
-import java.nio.ByteBuffer
+import collection.JavaConverters._
 
 class OffsetFetchRequest(groupId: String,
                          requestInfo: java.util.List[TopicAndPartition],
@@ -37,13 +35,9 @@ class OffsetFetchRequest(groupId: String,
   }
 
   val underlying = {
-    val scalaSeq = {
-      import JavaConversions._
-      requestInfo: mutable.Buffer[TopicAndPartition]
-    }
     kafka.api.OffsetFetchRequest(
       groupId = groupId,
-      requestInfo = scalaSeq,
+      requestInfo = requestInfo.asScala,
       versionId = versionId,
       correlationId = correlationId,
       clientId = clientId

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala b/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala
index c4bdb12..01aa8e8 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala
@@ -20,14 +20,11 @@ package kafka.javaapi
 import java.nio.ByteBuffer
 
 import kafka.common.{TopicAndPartition, OffsetMetadataAndError}
-import collection.JavaConversions
+import collection.JavaConverters._
 
 class OffsetFetchResponse(private val underlying: kafka.api.OffsetFetchResponse) {
 
-  def offsets: java.util.Map[TopicAndPartition, OffsetMetadataAndError] = {
-    import JavaConversions._
-    underlying.requestInfo
-  }
+  def offsets: java.util.Map[TopicAndPartition, OffsetMetadataAndError] = underlying.requestInfo.asJava
 
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
index c8a0ded..21997d3 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
@@ -19,8 +19,7 @@ package kafka.javaapi
 
 import kafka.common.TopicAndPartition
 import kafka.api.{Request, PartitionOffsetRequestInfo}
-import scala.collection.mutable
-import java.nio.ByteBuffer
+import scala.collection.JavaConverters._
 
 
 class OffsetRequest(requestInfo: java.util.Map[TopicAndPartition, PartitionOffsetRequestInfo],
@@ -28,10 +27,7 @@ class OffsetRequest(requestInfo: java.util.Map[TopicAndPartition, PartitionOffse
                     clientId: String) {
 
   val underlying = {
-    val scalaMap = {
-      import collection.JavaConversions._
-      (requestInfo: mutable.Map[TopicAndPartition, PartitionOffsetRequestInfo]).toMap
-    }
+    val scalaMap = requestInfo.asScala.toMap
     kafka.api.OffsetRequest(
       requestInfo = scalaMap,
       versionId = versionId,

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
index 4ef8321..4e2631f 100644
--- a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
+++ b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
@@ -17,20 +17,14 @@
 package kafka.javaapi
 
 import kafka.cluster.BrokerEndPoint
-import scala.collection.JavaConversions
+import scala.collection.JavaConverters._
 
 private[javaapi] object MetadataListImplicits {
   implicit def toJavaTopicMetadataList(topicMetadataSeq: Seq[kafka.api.TopicMetadata]):
-  java.util.List[kafka.javaapi.TopicMetadata] = {
-    import JavaConversions._
-    topicMetadataSeq.map(new kafka.javaapi.TopicMetadata(_))
-  }
+  java.util.List[kafka.javaapi.TopicMetadata] = topicMetadataSeq.map(new kafka.javaapi.TopicMetadata(_)).asJava
 
   implicit def toPartitionMetadataList(partitionMetadataSeq: Seq[kafka.api.PartitionMetadata]):
-  java.util.List[kafka.javaapi.PartitionMetadata] = {
-    import JavaConversions._
-    partitionMetadataSeq.map(new kafka.javaapi.PartitionMetadata(_))
-  }
+  java.util.List[kafka.javaapi.PartitionMetadata] = partitionMetadataSeq.map(new kafka.javaapi.PartitionMetadata(_)).asJava
 }
 
 class TopicMetadata(private val underlying: kafka.api.TopicMetadata) {
@@ -57,15 +51,9 @@ class PartitionMetadata(private val underlying: kafka.api.PartitionMetadata) {
     underlying.leader
   }
 
-  def replicas: java.util.List[BrokerEndPoint] = {
-    import JavaConversions._
-    underlying.replicas
-  }
+  def replicas: java.util.List[BrokerEndPoint] = underlying.replicas.asJava
 
-  def isr: java.util.List[BrokerEndPoint] = {
-    import JavaConversions._
-    underlying.isr
-  }
+  def isr: java.util.List[BrokerEndPoint] = underlying.isr.asJava
 
   def errorCode: Short = underlying.errorCode
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
index f625ba0..efd5405 100644
--- a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
 import kafka.api._
 import org.apache.kafka.common.protocol.ApiKeys
 
-import scala.collection.mutable
+import scala.collection.JavaConverters._
 
 class TopicMetadataRequest(val versionId: Short,
                            val correlationId: Int,
@@ -29,10 +29,7 @@ class TopicMetadataRequest(val versionId: Short,
                            val topics: java.util.List[String])
     extends RequestOrResponse(Some(ApiKeys.METADATA.id)) {
 
-  val underlying: kafka.api.TopicMetadataRequest = {
-    import scala.collection.JavaConversions._
-    new kafka.api.TopicMetadataRequest(versionId, correlationId, clientId, topics: mutable.Buffer[String])
-  }
+  val underlying: kafka.api.TopicMetadataRequest = new kafka.api.TopicMetadataRequest(versionId, correlationId, clientId, topics.asScala)
 
   def this(topics: java.util.List[String]) =
     this(kafka.api.TopicMetadataRequest.CurrentVersion, 0, kafka.api.TopicMetadataRequest.DefaultClientId, topics)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
index 6347bfd..e145075 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
@@ -19,7 +19,7 @@ package kafka.javaapi.consumer
 import kafka.serializer._
 import kafka.consumer._
 import kafka.common.{OffsetAndMetadata, TopicAndPartition, MessageStreamsExistException}
-import scala.collection.{immutable, mutable, JavaConversions}
+import scala.collection.mutable
 import java.util.concurrent.atomic.AtomicBoolean
 import scala.collection.JavaConverters._
 
@@ -79,8 +79,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       throw new MessageStreamsExistException(this.getClass.getSimpleName +
                                    " can create message streams at most once",null)
     val scalaTopicCountMap: Map[String, Int] = {
-      import JavaConversions._
-      Map.empty[String, Int] ++ (topicCountMap.asInstanceOf[java.util.Map[String, Int]]: mutable.Map[String, Int])
+      Map.empty[String, Int] ++ topicCountMap.asInstanceOf[java.util.Map[String, Int]].asScala
     }
     val scalaReturn = underlying.consume(scalaTopicCountMap, keyDecoder, valueDecoder)
     val ret = new java.util.HashMap[String,java.util.List[KafkaStream[K,V]]]
@@ -96,10 +95,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   def createMessageStreams(topicCountMap: java.util.Map[String,java.lang.Integer]): java.util.Map[String,java.util.List[KafkaStream[Array[Byte],Array[Byte]]]] =
     createMessageStreams(topicCountMap, new DefaultDecoder(), new DefaultDecoder())
 
-  def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, numStreams: Int, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) = {
-    import JavaConversions._
-    underlying.createMessageStreamsByFilter(topicFilter, numStreams, keyDecoder, valueDecoder)
-  }
+  def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, numStreams: Int, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) =
+    underlying.createMessageStreamsByFilter(topicFilter, numStreams, keyDecoder, valueDecoder).asJava
 
   def createMessageStreamsByFilter(topicFilter: TopicFilter, numStreams: Int) =
     createMessageStreamsByFilter(topicFilter, numStreams, new DefaultDecoder(), new DefaultDecoder())

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 6b57696..4c286d2 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -32,7 +32,8 @@ import org.apache.kafka.common.errors.{UnsupportedForMessageFormatException, Cor
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.requests.ListOffsetRequest
 
-import scala.collection.{Seq, JavaConversions}
+import scala.collection.Seq
+import scala.collection.JavaConverters._
 import com.yammer.metrics.core.Gauge
 import org.apache.kafka.common.utils.Utils
 
@@ -902,23 +903,19 @@ class Log(val dir: File,
   /**
    * All the log segments in this log ordered from oldest to newest
    */
-  def logSegments: Iterable[LogSegment] = {
-    import JavaConversions._
-    segments.values
-  }
+  def logSegments: Iterable[LogSegment] = segments.values.asScala
 
   /**
    * Get all segments beginning with the segment that includes "from" and ending with the segment
    * that includes up to "to-1" or the end of the log (if to > logEndOffset)
    */
   def logSegments(from: Long, to: Long): Iterable[LogSegment] = {
-    import JavaConversions._
     lock synchronized {
       val floor = segments.floorKey(from)
       if(floor eq null)
-        segments.headMap(to).values
+        segments.headMap(to).values.asScala
       else
-        segments.subMap(floor, true, to, false).values
+        segments.subMap(floor, true, to, false).values.asScala
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 7806eda..a5beb49 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit
 import kafka.utils._
 
 import scala.collection._
+import scala.collection.JavaConverters._
 import kafka.common.{KafkaException, TopicAndPartition}
 import kafka.server.{BrokerState, OffsetCheckpoint, RecoveringFromUncleanShutdown}
 import java.util.concurrent.{ExecutionException, ExecutorService, Executors, Future}
@@ -366,10 +367,10 @@ class LogManager(val logDirs: Array[File],
                     time)
       logs.put(topicAndPartition, log)
       info("Created log for partition [%s,%d] in %s with properties {%s}."
-           .format(topicAndPartition.topic, 
-                   topicAndPartition.partition, 
+           .format(topicAndPartition.topic,
+                   topicAndPartition.partition,
                    dataDir.getAbsolutePath,
-                   {import JavaConversions._; config.originals.mkString(", ")}))
+                   config.originals.asScala.mkString(", ")))
       log
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index 13b57e3..ede49c4 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -27,6 +27,7 @@ import kafka.producer.{ProducerRequestStatsRegistry, ProducerStatsRegistry, Prod
 import kafka.utils.Logging
 
 import scala.collection.immutable
+import scala.collection.JavaConverters._
 import javax.management.ObjectName
 
 
@@ -198,7 +199,7 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
   private def removeAllMetricsInList(metricNameList: immutable.List[MetricName], clientId: String) {
     metricNameList.foreach(metric => {
       val pattern = (".*clientId=" + clientId + ".*").r
-      val registeredMetrics = scala.collection.JavaConversions.asScalaSet(Metrics.defaultRegistry().allMetrics().keySet())
+      val registeredMetrics = Metrics.defaultRegistry().allMetrics().keySet().asScala
       for (registeredMetric <- registeredMetrics) {
         if (registeredMetric.getGroup == metric.getGroup &&
           registeredMetric.getName == metric.getName &&

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 0c7c26b..eb536f7 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -27,7 +27,7 @@ import org.apache.kafka.common.metrics._
 import org.apache.kafka.common.metrics.stats.{Total, Rate, Avg}
 import org.apache.kafka.common.utils.Time
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 /**
  * Represents the sensors aggregated per client
@@ -442,7 +442,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
             metric.config(getQuotaMetricConfig(newQuota))
           }
       } else {
-          allMetrics.filterKeys(n => n.name == quotaMetricName.name && n.group == quotaMetricName.group).foreach {
+          allMetrics.asScala.filterKeys(n => n.name == quotaMetricName.name && n.group == quotaMetricName.group).foreach {
             case (metricName, metric) =>
               val userTag = if (metricName.tags.containsKey("user")) metricName.tags.get("user") else ""
               val clientIdTag = if (metricName.tags.containsKey("client-id")) metricName.tags.get("client-id") else ""

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 94ae419..f1a9506 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -19,8 +19,8 @@ package kafka.server
 
 import java.nio.ByteBuffer
 import java.lang.{Long => JLong, Short => JShort}
+import java.util.{Collections, Properties}
 import java.util
-import java.util.Properties
 
 import kafka.admin.{AdminUtils, RackAwareMode}
 import kafka.api.{ControlledShutdownRequest, ControlledShutdownResponse, FetchResponsePartitionData}
@@ -345,10 +345,10 @@ class KafkaApis(val requestChannel: RequestChannel,
     // the callback for sending a produce response
     def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
 
-      val mergedResponseStatus = responseStatus ++ 
+      val mergedResponseStatus = responseStatus ++
         unauthorizedForWriteRequestInfo.mapValues(_ =>
-           new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, Message.NoTimestamp)) ++ 
-        nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => 
+           new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, Message.NoTimestamp)) ++
+        nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ =>
            new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, -1, Message.NoTimestamp))
 
       var errorInResponse = false
@@ -1002,15 +1002,13 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleJoinGroupRequest(request: RequestChannel.Request) {
-    import JavaConversions._
-
     val joinGroupRequest = request.body.asInstanceOf[JoinGroupRequest]
 
     // the callback for sending a join-group response
     def sendResponseCallback(joinResult: JoinGroupResult) {
       val members = joinResult.members map { case (memberId, metadataArray) => (memberId, ByteBuffer.wrap(metadataArray)) }
       val responseBody = new JoinGroupResponse(request.header.apiVersion, joinResult.errorCode, joinResult.generationId,
-        joinResult.subProtocol, joinResult.memberId, joinResult.leaderId, members)
+        joinResult.subProtocol, joinResult.memberId, joinResult.leaderId, members.asJava)
 
       trace("Sending join group response %s for correlation id %d to client %s."
         .format(responseBody, request.header.correlationId, request.header.clientId))
@@ -1025,11 +1023,11 @@ class KafkaApis(val requestChannel: RequestChannel,
         JoinGroupResponse.UNKNOWN_PROTOCOL,
         JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
         JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
-        Map.empty[String, ByteBuffer])
+        Collections.emptyMap())
       requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
     } else {
       // let the coordinator to handle join-group
-      val protocols = joinGroupRequest.groupProtocols().map(protocol =>
+      val protocols = joinGroupRequest.groupProtocols().asScala.map(protocol =>
         (protocol.name, Utils.toArray(protocol.metadata))).toList
       coordinator.handleJoinGroup(
         joinGroupRequest.groupId,
@@ -1045,8 +1043,6 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleSyncGroupRequest(request: RequestChannel.Request) {
-    import JavaConversions._
-
     val syncGroupRequest = request.body.asInstanceOf[SyncGroupRequest]
 
     def sendResponseCallback(memberState: Array[Byte], errorCode: Short) {
@@ -1061,7 +1057,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         syncGroupRequest.groupId(),
         syncGroupRequest.generationId(),
         syncGroupRequest.memberId(),
-        syncGroupRequest.groupAssignment().mapValues(Utils.toArray),
+        syncGroupRequest.groupAssignment().asScala.mapValues(Utils.toArray),
         sendResponseCallback
       )
     }
@@ -1189,7 +1185,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val (authorizedTopics, unauthorizedForDeleteTopics) = existingAndAuthorizedForDescribeTopics.partition { topic =>
       authorize(request.session, Delete, new Resource(auth.Topic, topic))
     }
-    
+
     def sendResponseCallback(results: Map[String, Errors]): Unit = {
       val completeResults = nonExistingOrUnauthorizedForDescribeTopics.map(topic => (topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toMap ++
           unauthorizedForDeleteTopics.map(topic => (topic, Errors.TOPIC_AUTHORIZATION_FAILED)).toMap ++ results

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 80568b3..08e50dd 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -33,8 +33,8 @@ import org.apache.kafka.common.metrics.MetricsReporter
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.record.TimestampType
 
-import scala.collection.{JavaConverters, Map, immutable}
-import JavaConverters._
+import scala.collection.{Map, immutable}
+import scala.collection.JavaConverters._
 
 object Defaults {
   /** ********* Zookeeper Configuration ***********/
@@ -757,20 +757,7 @@ object KafkaConfig {
 
   }
 
-  def configNames() = {
-    import scala.collection.JavaConversions._
-    configDef.names().toList.sorted
-  }
-
-  /**
-    * Check that property names are valid
-    */
-  def validateNames(props: Properties) {
-    import scala.collection.JavaConversions._
-    val names = configDef.names()
-    for (name <- props.keys)
-      require(names.contains(name), "Unknown Kafka configuration \"%s\".".format(name))
-  }
+  def configNames() = configDef.names().asScala.toList.sorted
 
   def fromProps(props: Properties): KafkaConfig =
     fromProps(props, true)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/tools/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index c536359..c6dcfce 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -29,6 +29,8 @@ import joptsimple._
 import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.utils.Utils
 
+import scala.collection.JavaConverters._
+
 object ConsoleProducer {
 
   def main(args: Array[String]) {
@@ -251,7 +253,6 @@ object ConsoleProducer {
       CommandLineUtils.printUsageAndDie(parser, "Read data from standard input and publish it to Kafka.")
     CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, brokerListOpt)
 
-    import scala.collection.JavaConversions._
     val useOldProducer = options.has(useOldProducerOpt)
     val topic = options.valueOf(topicOpt)
     val brokerList = options.valueOf(brokerListOpt)
@@ -275,8 +276,8 @@ object ConsoleProducer {
     val valueEncoderClass = options.valueOf(valueEncoderOpt)
     val readerClass = options.valueOf(messageReaderOpt)
     val socketBuffer = options.valueOf(socketBufferSizeOpt)
-    val cmdLineProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt))
-    val extraProducerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(producerPropertyOpt))
+    val cmdLineProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt).asScala)
+    val extraProducerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(producerPropertyOpt).asScala)
     /* new producer related configs */
     val maxMemoryBytes = options.valueOf(maxMemoryBytesOpt)
     val maxPartitionMemoryBytes = options.valueOf(maxPartitionMemoryBytesOpt)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index 2b3f56d..b7087f2 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -19,16 +19,18 @@ package kafka.tools
 
 import java.util
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import java.util.concurrent.atomic.AtomicLong
 import java.nio.channels.ClosedByInterruptException
+
 import org.apache.log4j.Logger
 import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer}
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.TopicPartition
 import kafka.utils.CommandLineUtils
-import java.util.{ Random, Properties }
+import java.util.{Collections, Properties, Random}
+
 import kafka.consumer.Consumer
 import kafka.consumer.ConsumerConnector
 import kafka.consumer.KafkaStream
@@ -60,7 +62,7 @@ object ConsumerPerformance {
     var startMs, endMs = 0L
     if (!config.useOldConsumer) {
       val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](config.props)
-      consumer.subscribe(List(config.topic))
+      consumer.subscribe(Collections.singletonList(config.topic))
       startMs = System.currentTimeMillis
       consume(consumer, List(config.topic), config.numMessages, 1000, config, totalMessagesRead, totalBytesRead)
       endMs = System.currentTimeMillis
@@ -105,7 +107,7 @@ object ConsumerPerformance {
     // Wait for group join, metadata fetch, etc
     val joinTimeout = 10000
     val isAssigned = new AtomicBoolean(false)
-    consumer.subscribe(topics, new ConsumerRebalanceListener {
+    consumer.subscribe(topics.asJava, new ConsumerRebalanceListener {
       def onPartitionsAssigned(partitions: util.Collection[TopicPartition]) {
         isAssigned.set(true)
       }
@@ -119,7 +121,7 @@ object ConsumerPerformance {
       }
       consumer.poll(100)
     }
-    consumer.seekToBeginning(List[TopicPartition]())
+    consumer.seekToBeginning(Collections.emptyList())
 
     // Now start the benchmark
     val startMs = System.currentTimeMillis
@@ -128,9 +130,9 @@ object ConsumerPerformance {
     var currentTimeMillis = lastConsumedTime
 
     while (messagesRead < count && currentTimeMillis - lastConsumedTime <= timeout) {
-      val records = consumer.poll(100)
+      val records = consumer.poll(100).asScala
       currentTimeMillis = System.currentTimeMillis
-      if (records.count() > 0)
+      if (records.nonEmpty)
         lastConsumedTime = currentTimeMillis
       for (record <- records) {
         messagesRead += 1

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/tools/EndToEndLatency.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/EndToEndLatency.scala b/core/src/main/scala/kafka/tools/EndToEndLatency.scala
index 9aaad3e..b9c9e3e 100755
--- a/core/src/main/scala/kafka/tools/EndToEndLatency.scala
+++ b/core/src/main/scala/kafka/tools/EndToEndLatency.scala
@@ -17,14 +17,13 @@
 
 package kafka.tools
 
-import java.util.{Arrays, Properties}
+import java.util.{Arrays, Collections, Properties}
 
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.TopicPartition
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.util.Random
 
 
@@ -70,7 +69,7 @@ object EndToEndLatency {
     consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0") //ensure we have no temporal batching
 
     val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
-    consumer.subscribe(List(topic))
+    consumer.subscribe(Collections.singletonList(topic))
 
     val producerProps = loadProps
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
@@ -89,7 +88,7 @@ object EndToEndLatency {
 
     //Ensure we are at latest offset. seekToEnd evaluates lazily, that is to say actually performs the seek only when
     //a poll() or position() request is issued. Hence we need to poll after we seek to ensure we see our first write.
-    consumer.seekToEnd(List[TopicPartition]())
+    consumer.seekToEnd(Collections.emptyList())
     consumer.poll(0)
 
     var totalTime = 0.0
@@ -122,7 +121,7 @@ object EndToEndLatency {
 
       //Check we only got the one message
       if (recordIter.hasNext) {
-        val count = 1 + recordIter.size
+        val count = 1 + recordIter.asScala.size
         throw new RuntimeException(s"Only one result was expected during this test. We found [$count]")
       }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
index 907e8ac..0b11ec8 100644
--- a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
@@ -20,8 +20,8 @@ package kafka.tools
 import java.io.FileWriter
 import joptsimple._
 import kafka.utils.{Logging, ZkUtils, ZKGroupTopicDirs, CommandLineUtils}
-import org.I0Itec.zkclient.ZkClient
 import org.apache.kafka.common.security.JaasUtils
+import scala.collection.JavaConverters._
 
 
 /**
@@ -88,8 +88,7 @@ object ExportZkOffsets extends Logging {
         consumerGroups = zkUtils.getChildren(ZkUtils.ConsumersPath).toList
       }
       else {
-        import scala.collection.JavaConversions._
-        consumerGroups = groups
+        consumerGroups = groups.asScala
       }
       
       for (consumerGrp <- consumerGroups) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/tools/JmxTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala
index 1dcfb19..3538874 100644
--- a/core/src/main/scala/kafka/tools/JmxTool.scala
+++ b/core/src/main/scala/kafka/tools/JmxTool.scala
@@ -23,7 +23,7 @@ import java.text.SimpleDateFormat
 import javax.management._
 import javax.management.remote._
 import joptsimple.OptionParser
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.math._
 import kafka.utils.{CommandLineUtils, Logging}
@@ -85,11 +85,11 @@ object JmxTool extends Logging {
 
     val queries: Iterable[ObjectName] =
       if(options.has(objectNameOpt))
-        options.valuesOf(objectNameOpt).map(new ObjectName(_))
+        options.valuesOf(objectNameOpt).asScala.map(new ObjectName(_))
       else
         List(null)
 
-    val names = queries.flatMap((name: ObjectName) => mbsc.queryNames(name, null): mutable.Set[ObjectName])
+    val names = queries.flatMap((name: ObjectName) => mbsc.queryNames(name, null).asScala)
 
     val numExpectedAttributes: Map[ObjectName, Int] =
       if (attributesWhitelistExists)
@@ -123,7 +123,7 @@ object JmxTool extends Logging {
     var attributes = new mutable.HashMap[String, Any]()
     for(name <- names) {
       val mbean = mbsc.getMBeanInfo(name)
-      for(attrObj <- mbsc.getAttributes(name, mbean.getAttributes.map(_.getName))) {
+      for(attrObj <- mbsc.getAttributes(name, mbean.getAttributes.map(_.getName)).asScala) {
         val attr = attrObj.asInstanceOf[Attribute]
         attributesWhitelist match {
           case Some(allowedAttributes) =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 1a6ba69..2cfcb95 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -40,7 +40,7 @@ import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.errors.WakeupException
 import org.apache.kafka.common.record.Record
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.mutable.HashMap
 import scala.util.control.ControlThrowable
 import org.apache.kafka.clients.consumer.{ConsumerConfig => NewConsumerConfig}
@@ -431,7 +431,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
               val data = mirrorMakerConsumer.receive()
               trace("Sending message with value size %d and offset %d".format(data.value.length, data.offset))
               val records = messageHandler.handle(data)
-              records.foreach(producer.send)
+              records.asScala.foreach(producer.send)
               maybeFlushAndCommitOffsets()
             }
           } catch {
@@ -607,7 +607,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
     }
 
     override def commit() {
-      consumer.commitSync(offsets.map { case (tp, offset) =>  (tp, new OffsetAndMetadata(offset, ""))})
+      consumer.commitSync(offsets.map { case (tp, offset) =>  (tp, new OffsetAndMetadata(offset, ""))}.asJava)
       offsets.clear()
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
index 4e2c7ef..049f129 100644
--- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
+++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
@@ -24,6 +24,7 @@ import kafka.consumer._
 import kafka.utils.{ToolsUtils, CommandLineUtils, Logging, ZkUtils}
 import kafka.api.OffsetRequest
 import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer, ProducerConfig}
+import scala.collection.JavaConverters._
 
 object ReplayLogProducer extends Logging {
 
@@ -114,8 +115,7 @@ object ReplayLogProducer extends Logging {
     val outputTopic = options.valueOf(outputTopicOpt)
     val reportingInterval = options.valueOf(reportingIntervalOpt).intValue
     val isSync = options.has(syncOpt)
-    import scala.collection.JavaConversions._
-    val producerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt))
+    val producerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt).asScala)
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
     producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
     producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index 9368bda..1ce0289 100755
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -24,7 +24,7 @@ import kafka.client.ClientUtils
 import kafka.api.{FetchRequestBuilder, OffsetRequest, Request}
 import kafka.cluster.BrokerEndPoint
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import kafka.common.{MessageFormatter, TopicAndPartition}
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.common.utils.Utils
@@ -117,7 +117,7 @@ object SimpleConsumerShell extends Logging {
     val noWaitAtEndOfLog = options.has(noWaitAtEndOfLogOpt)
 
     val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
-    val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt))
+    val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt).asScala)
 
     val fetchRequestBuilder = new FetchRequestBuilder()
                        .clientId(clientId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/utils/Pool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Pool.scala b/core/src/main/scala/kafka/utils/Pool.scala
index f4aa628..1d96238 100644
--- a/core/src/main/scala/kafka/utils/Pool.scala
+++ b/core/src/main/scala/kafka/utils/Pool.scala
@@ -17,10 +17,10 @@
 
 package kafka.utils
 
-import java.util.ArrayList
 import java.util.concurrent._
+
 import collection.mutable
-import collection.JavaConversions
+import collection.JavaConverters._
 import kafka.common.KafkaException
 
 class Pool[K,V](valueFactory: Option[(K) => V] = None) extends Iterable[(K, V)] {
@@ -72,16 +72,10 @@ class Pool[K,V](valueFactory: Option[(K) => V] = None) extends Iterable[(K, V)]
 
   def remove(key: K, value: V): Boolean = pool.remove(key, value)
 
-  def keys: mutable.Set[K] = {
-    import JavaConversions._
-    pool.keySet()
-  }
-  
-  def values: Iterable[V] = {
-    import JavaConversions._
-    new ArrayList[V](pool.values())
-  }
-  
+  def keys: mutable.Set[K] = pool.keySet().asScala
+
+  def values: Iterable[V] = pool.values().asScala
+
   def clear() { pool.clear() }
   
   override def size = pool.size

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/utils/VerifiableProperties.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/VerifiableProperties.scala b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
index 9600b0a..de4f654 100755
--- a/core/src/main/scala/kafka/utils/VerifiableProperties.scala
+++ b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
@@ -21,11 +21,12 @@ import java.util.Properties
 import java.util.Collections
 import scala.collection._
 import kafka.message.{CompressionCodec, NoCompressionCodec}
+import scala.collection.JavaConverters._
 
 
 class VerifiableProperties(val props: Properties) extends Logging {
   private val referenceSet = mutable.HashSet[String]()
-  
+
   def this() = this(new Properties)
 
   def containsKey(name: String): Boolean = {
@@ -215,10 +216,7 @@ class VerifiableProperties(val props: Properties) extends Logging {
 
   def verify() {
     info("Verifying properties")
-    val propNames = {
-      import JavaConversions._
-      Collections.list(props.propertyNames).map(_.toString).sorted
-    }
+    val propNames = Collections.list(props.propertyNames).asScala.map(_.toString).sorted
     for(key <- propNames) {
       if (!referenceSet.contains(key) && !key.startsWith("external"))
         warn("Property %s is not valid".format(key))

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 787cb8f..de56fe2 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -38,6 +38,7 @@ import org.apache.zookeeper.data.{ACL, Stat}
 import org.apache.zookeeper.{CreateMode, KeeperException, ZooDefs, ZooKeeper}
 
 import scala.collection._
+import scala.collection.JavaConverters._
 
 object ZkUtils {
   val ConsumersPath = "/consumers"
@@ -630,17 +631,11 @@ class ZkUtils(val zkClient: ZkClient,
     dataAndStat
   }
 
-  def getChildren(path: String): Seq[String] = {
-    import scala.collection.JavaConversions._
-    // triggers implicit conversion from java list to scala Seq
-    zkClient.getChildren(path)
-  }
+  def getChildren(path: String): Seq[String] = zkClient.getChildren(path).asScala
 
   def getChildrenParentMayNotExist(path: String): Seq[String] = {
-    import scala.collection.JavaConversions._
-    // triggers implicit conversion from java list to scala Seq
     try {
-      zkClient.getChildren(path)
+      zkClient.getChildren(path).asScala
     } catch {
       case _: ZkNoNodeException => Nil
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
index f13f59f..a62922c 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
@@ -16,15 +16,16 @@
  */
 package kafka.api
 
+import java.util.Collections
+
 import kafka.admin.AdminClient
 import kafka.server.KafkaConfig
-import kafka.utils.{TestUtils, Logging}
+import kafka.utils.{Logging, TestUtils}
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.kafka.common.TopicPartition
 import org.junit.{Before, Test}
 import org.junit.Assert._
-import scala.collection.JavaConversions._
 
 class AdminClientTest extends IntegrationTestHarness with Logging {
 
@@ -63,7 +64,7 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
 
   @Test
   def testListGroups() {
-    consumers.head.subscribe(List(topic))
+    consumers.head.subscribe(Collections.singletonList(topic))
     TestUtils.waitUntilTrue(() => {
       consumers.head.poll(0)
       !consumers.head.assignment.isEmpty
@@ -78,7 +79,7 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
 
   @Test
   def testGetConsumerGroupSummary() {
-    consumers.head.subscribe(List(topic))
+    consumers.head.subscribe(Collections.singletonList(topic))
     TestUtils.waitUntilTrue(() => {
       consumers.head.poll(0)
       !consumers.head.assignment.isEmpty
@@ -97,7 +98,7 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
 
   @Test
   def testDescribeConsumerGroup() {
-    consumers.head.subscribe(List(topic))
+    consumers.head.subscribe(Collections.singletonList(topic))
     TestUtils.waitUntilTrue(() => {
       consumers.head.poll(0)
       !consumers.head.assignment.isEmpty

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 0900d43..3b81e25 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -13,18 +13,17 @@
 
 package kafka.api
 
-import java.util
+import java.util.Collections
 
 import kafka.server.KafkaConfig
 import kafka.utils.{Logging, ShutdownableThread, TestUtils}
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
-import org.apache.kafka.common.errors.{IllegalGenerationException, UnknownMemberIdException}
 import org.apache.kafka.common.TopicPartition
 import org.junit.Assert._
-import org.junit.{Test, Before}
+import org.junit.{Before, Test}
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 
 
@@ -81,13 +80,13 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
     var consumed = 0L
     val consumer = this.consumers.head
 
-    consumer.subscribe(List(topic))
+    consumer.subscribe(Collections.singletonList(topic))
 
     val scheduler = new BounceBrokerScheduler(numIters)
     scheduler.start()
 
     while (scheduler.isRunning.get()) {
-      for (record <- consumer.poll(100)) {
+      for (record <- consumer.poll(100).asScala) {
         assertEquals(consumed, record.offset())
         consumed += 1
       }
@@ -97,7 +96,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
         assertEquals(consumer.position(tp), consumer.committed(tp).offset)
 
         if (consumer.position(tp) == numRecords) {
-          consumer.seekToBeginning(List[TopicPartition]())
+          consumer.seekToBeginning(Collections.emptyList())
           consumed = 0
         }
       } catch {
@@ -118,7 +117,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
     this.producers.foreach(_.close)
 
     val consumer = this.consumers.head
-    consumer.assign(List(tp))
+    consumer.assign(Collections.singletonList(tp))
     consumer.seek(tp, 0)
 
     // wait until all the followers have synced the last HW with leader
@@ -133,7 +132,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
       val coin = TestUtils.random.nextInt(3)
       if (coin == 0) {
         info("Seeking to end of log")
-        consumer.seekToEnd(List[TopicPartition]())
+        consumer.seekToEnd(Collections.emptyList())
         assertEquals(numRecords.toLong, consumer.position(tp))
       } else if (coin == 1) {
         val pos = TestUtils.random.nextInt(numRecords).toLong

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
index 6ccac29..7cb5b6e 100644
--- a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
+++ b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
@@ -27,7 +27,7 @@ import kafka.server.{DelayedOperationPurgatory, DelayedOperation}
 import kafka.utils._
 
 import scala.math._
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 /**
  * This is a benchmark test of the purgatory.
@@ -90,7 +90,7 @@ object TestPurgatoryPerformance {
     val pct50 = options.valueOf(pct50Opt).doubleValue
     val verbose = options.valueOf(verboseOpt).booleanValue
 
-    val gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans().sortBy(_.getName)
+    val gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans().asScala.sortBy(_.getName)
     val osMXBean = ManagementFactory.getOperatingSystemMXBean
     val latencySamples = new LatencySamples(1000000, pct75, pct50)
     val intervalSamples = new IntervalSamples(1000000, requestRate)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
index 80f809e..4a1be11 100644
--- a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -22,15 +22,13 @@ import org.scalatest.junit.JUnitSuite
 import org.junit.Test
 import kafka.utils.TestUtils
 import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, CompressionCodec, Message}
+import scala.collection.JavaConverters._
 
 trait BaseMessageSetTestCases extends JUnitSuite {
   
   val messages = Array(new Message("abcd".getBytes()), new Message("efgh".getBytes()))
   def createMessageSet(messages: Seq[Message], compressed: CompressionCodec = NoCompressionCodec): MessageSet
-  def toMessageIterator(messageSet: MessageSet): Iterator[Message] = {
-    import scala.collection.JavaConversions._
-    messageSet.map(m => m.message).iterator
-  }
+  def toMessageIterator(messageSet: MessageSet): Iterator[Message] = messageSet.asScala.map(m => m.message).iterator
 
   @Test
   def testWrittenEqualsRead {
@@ -40,7 +38,6 @@ trait BaseMessageSetTestCases extends JUnitSuite {
 
   @Test
   def testIteratorIsConsistent() {
-    import scala.collection.JavaConversions._
     val m = createMessageSet(messages)
     // two iterators over the same set should give the same results
     TestUtils.checkEquals(m.iterator, m.iterator)
@@ -48,7 +45,6 @@ trait BaseMessageSetTestCases extends JUnitSuite {
 
   @Test
   def testIteratorIsConsistentWithCompression() {
-    import scala.collection.JavaConversions._
     val m = createMessageSet(messages, DefaultCompressionCodec)
     // two iterators over the same set should give the same results
     TestUtils.checkEquals(m.iterator, m.iterator)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
index 7487bc5..a050bb3 100755
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
@@ -28,7 +28,7 @@ import org.junit.runners.Parameterized.Parameters
 import org.apache.kafka.common.record.CompressionType
 import org.apache.kafka.common.utils.Utils
 import java.util.{Collection, Properties}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 @RunWith(value = classOf[Parameterized])
 class BrokerCompressionTest(messageCompression: String, brokerCompression: String) extends JUnitSuite {
@@ -73,8 +73,8 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin
 object BrokerCompressionTest {
   @Parameters
   def parameters: Collection[Array[String]] = {
-     for (brokerCompression <- BrokerCompressionCodec.brokerCompressionOptions;
+    (for (brokerCompression <- BrokerCompressionCodec.brokerCompressionOptions;
          messageCompression <- CompressionType.values
-    ) yield Array(messageCompression.name, brokerCompression)
+    ) yield Array(messageCompression.name, brokerCompression)).asJava
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index d82ec58..16f0636 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -31,7 +31,7 @@ import kafka.admin.AdminUtils
 import kafka.utils.TestUtils._
 
 import scala.collection._
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.util.matching.Regex
 import kafka.consumer.{ConsumerConfig, ZookeeperConsumerConnector}
 
@@ -96,7 +96,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
   def testClusterIdMetric(): Unit ={
     // Check if clusterId metric exists.
     val metrics = Metrics.defaultRegistry().allMetrics
-    assertEquals(metrics.keySet.filter(_.getMBeanName().equals("kafka.server:type=KafkaServer,name=ClusterId")).size, 1)
+    assertEquals(metrics.keySet.asScala.count(_.getMBeanName().equals("kafka.server:type=KafkaServer,name=ClusterId")), 1)
   }
 
   @deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
@@ -114,7 +114,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
   private def checkTopicMetricsExists(topic: String): Boolean = {
     val topicMetricRegex = new Regex(".*("+topic+")$")
     val metricGroups = Metrics.defaultRegistry().groupedMetrics(MetricPredicate.ALL).entrySet()
-    for(metricGroup <- metricGroups) {
+    for(metricGroup <- metricGroups.asScala) {
       if (topicMetricRegex.pattern.matcher(metricGroup.getKey()).matches)
         return true
     }


Mime
View raw message