kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject [2/2] git commit: KAFKA-1046 Added support for Scala 2.10 builds while maintaining compatibility with 2.8.x; reviewed by Neha and Jun
Date Fri, 13 Sep 2013 19:56:02 GMT
KAFKA-1046 Added support for Scala 2.10 builds while maintaining compatibility with 2.8.x; reviewed by Neha and Jun


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8e554c4d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8e554c4d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8e554c4d

Branch: refs/heads/trunk
Commit: 8e554c4d2acf5108805905b9f06198f20398ee3a
Parents: 75d95d9
Author: Christopher Freeman <cfreeman@linkedin.com>
Authored: Fri Sep 13 12:55:43 2013 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Fri Sep 13 12:55:53 2013 -0700

----------------------------------------------------------------------
 core/build.sbt                                  |  1 +
 core/src/main/scala/kafka/Kafka.scala           |  2 +-
 .../kafka/admin/AddPartitionsCommand.scala      |  2 +-
 .../src/main/scala/kafka/admin/AdminUtils.scala | 14 +++++---
 .../scala/kafka/admin/DeleteTopicCommand.scala  |  2 +-
 .../PreferredReplicaLeaderElectionCommand.scala |  6 ++--
 .../kafka/admin/ReassignPartitionsCommand.scala |  4 +--
 .../main/scala/kafka/client/ClientUtils.scala   |  2 +-
 core/src/main/scala/kafka/cluster/Broker.scala  |  2 +-
 .../scala/kafka/consumer/ConsoleConsumer.scala  |  6 ++--
 .../kafka/consumer/ConsumerFetcherManager.scala |  4 +--
 .../scala/kafka/consumer/SimpleConsumer.scala   |  2 +-
 .../main/scala/kafka/consumer/TopicCount.scala  |  2 +-
 .../consumer/ZookeeperConsumerConnector.scala   | 10 +++---
 .../consumer/ZookeeperTopicEventWatcher.scala   |  2 +-
 .../controller/ControllerChannelManager.scala   |  4 +--
 .../kafka/controller/KafkaController.scala      | 16 ++++-----
 .../controller/PartitionStateMachine.scala      | 20 ++++++-----
 .../kafka/controller/ReplicaStateMachine.scala  |  4 +--
 .../main/scala/kafka/javaapi/FetchRequest.scala |  7 ++--
 .../main/scala/kafka/javaapi/Implicits.scala    |  6 ++++
 .../kafka/javaapi/OffsetCommitRequest.scala     |  5 ++-
 .../kafka/javaapi/OffsetCommitResponse.scala    |  3 +-
 .../kafka/javaapi/OffsetFetchRequest.scala      |  6 +++-
 .../kafka/javaapi/OffsetFetchResponse.scala     |  3 +-
 .../scala/kafka/javaapi/OffsetRequest.scala     |  7 ++--
 .../scala/kafka/javaapi/TopicMetadata.scala     | 24 +++++++++----
 .../kafka/javaapi/TopicMetadataRequest.scala    |  8 +++--
 .../consumer/ZookeeperConsumerConnector.scala   | 15 +++++---
 .../javaapi/message/ByteBufferMessageSet.scala  |  4 ++-
 .../scala/kafka/javaapi/producer/Producer.scala |  3 +-
 core/src/main/scala/kafka/log/Log.scala         | 14 +++++---
 core/src/main/scala/kafka/log/LogConfig.scala   |  3 +-
 core/src/main/scala/kafka/log/LogManager.scala  |  4 +--
 .../network/BoundedByteBufferReceive.scala      |  2 +-
 .../scala/kafka/producer/SyncProducer.scala     |  2 +-
 .../producer/async/DefaultEventHandler.scala    |  4 +--
 .../producer/async/ProducerSendThread.scala     |  4 +--
 .../kafka/server/AbstractFetcherThread.scala    |  6 ++--
 .../src/main/scala/kafka/server/KafkaApis.scala | 12 +++----
 .../kafka/server/KafkaServerStartable.scala     |  4 +--
 .../scala/kafka/server/ReplicaManager.scala     |  2 +-
 .../scala/kafka/server/TopicConfigManager.scala |  6 ++--
 .../kafka/server/ZookeeperLeaderElector.scala   |  2 +-
 .../scala/kafka/tools/ImportZkOffsets.scala     |  2 +-
 core/src/main/scala/kafka/tools/JmxTool.scala   |  2 +-
 .../main/scala/kafka/tools/MirrorMaker.scala    |  4 +--
 .../scala/kafka/tools/SimpleConsumerShell.scala |  2 +-
 .../main/scala/kafka/utils/Annotations.scala    | 36 --------------------
 core/src/main/scala/kafka/utils/Json.scala      |  2 +-
 .../main/scala/kafka/utils/KafkaScheduler.scala |  2 +-
 .../src/main/scala/kafka/utils/Mx4jLoader.scala |  2 +-
 core/src/main/scala/kafka/utils/Pool.scala      | 12 +++++--
 .../kafka/utils/VerifiableProperties.scala      |  5 ++-
 core/src/main/scala/kafka/utils/ZkUtils.scala   | 24 ++++++-------
 .../unit/kafka/admin/AddPartitionsTest.scala    |  4 +--
 .../ZookeeperConsumerConnectorTest.scala        |  8 +++--
 .../ZookeeperConsumerConnectorTest.scala        |  9 ++---
 .../message/BaseMessageSetTestCases.scala       |  7 ++--
 .../src/test/scala/unit/kafka/log/LogTest.scala |  2 +-
 .../unit/kafka/metrics/KafkaTimerTest.scala     |  5 +--
 .../unit/kafka/producer/AsyncProducerTest.scala |  7 ++--
 .../unit/kafka/producer/ProducerTest.scala      | 14 ++++----
 .../unit/kafka/producer/SyncProducerTest.scala  |  4 +--
 .../test/scala/unit/kafka/utils/TestUtils.scala |  2 +-
 .../scala/kafka/perf/ConsumerPerformance.scala  |  2 +-
 project/Build.scala                             |  3 +-
 67 files changed, 230 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/build.sbt
----------------------------------------------------------------------
diff --git a/core/build.sbt b/core/build.sbt
index c54cf44..b5bcb44 100644
--- a/core/build.sbt
+++ b/core/build.sbt
@@ -23,6 +23,7 @@ libraryDependencies ++= Seq(
 libraryDependencies <<= (scalaVersion, libraryDependencies) { (sv, deps) =>
   deps :+ (sv match {
     case "2.8.0" => "org.scalatest" %  "scalatest" % "1.2" % "test"
+    case v if v.startsWith("2.10") =>  "org.scalatest" %% "scalatest" % "1.9.1" % "test"
     case _       => "org.scalatest" %% "scalatest" % "1.8" % "test"
   })
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/Kafka.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala
index dafb1ee..988014a 100644
--- a/core/src/main/scala/kafka/Kafka.scala
+++ b/core/src/main/scala/kafka/Kafka.scala
@@ -47,7 +47,7 @@ object Kafka extends Logging {
       kafkaServerStartble.awaitShutdown
     }
     catch {
-      case e => fatal(e)
+      case e: Throwable => fatal(e)
     }
     System.exit(0)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
index fd41661..c74d9c2 100644
--- a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
@@ -68,7 +68,7 @@ object AddPartitionsCommand extends Logging {
       addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr)
       println("adding partitions succeeded!")
     } catch {
-      case e =>
+      case e: Throwable =>
         println("adding partitions failed because of " + e.getMessage)
         println(Utils.stackTrace(e))
     } finally {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 83ba729..6560fc6 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -138,7 +138,7 @@ object AdminUtils extends Logging {
       debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData))
     } catch {
       case e: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic))
-      case e2 => throw new AdminOperationException(e2.toString)
+      case e2: Throwable => throw new AdminOperationException(e2.toString)
     }
   }
   
@@ -162,7 +162,11 @@ object AdminUtils extends Logging {
    */
   private def writeTopicConfig(zkClient: ZkClient, topic: String, config: Properties) {
     if(config.size > 0) {
-      val map = Map("version" -> 1, "config" -> JavaConversions.asMap(config))
+      val configMap: mutable.Map[String, String] = {
+        import JavaConversions._
+        config
+      }
+      val map = Map("version" -> 1, "config" -> configMap)
       ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicConfigPath(topic), Json.encode(map))
     }
   }
@@ -222,7 +226,7 @@ object AdminUtils extends Logging {
               try {
                 Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head)
               } catch {
-                case e => throw new LeaderNotAvailableException("Leader not available for partition [%s,%d]".format(topic, partition), e)
+                case e: Throwable => throw new LeaderNotAvailableException("Leader not available for partition [%s,%d]".format(topic, partition), e)
               }
             case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition)
           }
@@ -230,7 +234,7 @@ object AdminUtils extends Logging {
             replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt))
             isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas)
           } catch {
-            case e => throw new ReplicaNotAvailableException(e)
+            case e: Throwable => throw new ReplicaNotAvailableException(e)
           }
           if(replicaInfo.size < replicas.size)
             throw new ReplicaNotAvailableException("Replica information not available for following brokers: " +
@@ -240,7 +244,7 @@ object AdminUtils extends Logging {
               inSyncReplicas.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
           new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
         } catch {
-          case e =>
+          case e: Throwable =>
             debug("Error while fetching metadata for partition [%s,%d]".format(topic, partition), e)
             new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo,
               ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala b/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala
index 3da4518..804b331 100644
--- a/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala
@@ -54,7 +54,7 @@ object DeleteTopicCommand {
       println("deletion succeeded!")
     }
     catch {
-      case e =>
+      case e: Throwable =>
         println("delection failed because of " + e.getMessage)
         println(Utils.stackTrace(e))
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index 53fc433..26beb96 100644
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -60,7 +60,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
       preferredReplicaElectionCommand.moveLeaderToPreferredReplica()
       println("Successfully started preferred replica election for partitions %s".format(partitionsForPreferredReplicaElection))
     } catch {
-      case e =>
+      case e: Throwable =>
         println("Failed to start preferred replica election")
         println(Utils.stackTrace(e))
     } finally {
@@ -105,7 +105,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
           PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(ZkUtils.readData(zkClient, zkPath)._1)
         throw new AdminOperationException("Preferred replica leader election currently in progress for " +
           "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection))
-      case e2 => throw new AdminOperationException(e2.toString)
+      case e2: Throwable => throw new AdminOperationException(e2.toString)
     }
   }
 }
@@ -117,7 +117,7 @@ class PreferredReplicaLeaderElectionCommand(zkClient: ZkClient, partitions: scal
       val validPartitions = partitions.filter(p => validatePartition(zkClient, p.topic, p.partition))
       PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, validPartitions)
     } catch {
-      case e => throw new AdminCommandFailedException("Admin command failed", e)
+      case e: Throwable => throw new AdminCommandFailedException("Admin command failed", e)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index aa61fa1..f333d29 100644
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -119,7 +119,7 @@ object ReassignPartitionsCommand extends Logging {
           "The replica assignment is \n" + partitionsToBeReassigned.toString())
       }
     } catch {
-      case e =>
+      case e: Throwable =>
         println("Partitions reassignment failed due to " + e.getMessage)
         println(Utils.stackTrace(e))
     } finally {
@@ -142,7 +142,7 @@ class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.Map[T
         val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient)
         throw new AdminCommandFailedException("Partition reassignment currently in " +
         "progress for %s. Aborting operation".format(partitionsBeingReassigned))
-      case e => error("Admin command failed", e); false
+      case e: Throwable => error("Admin command failed", e); false
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index cc526ec..1d2f81b 100644
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -54,7 +54,7 @@ object ClientUtils extends Logging{
         fetchMetaDataSucceeded = true
       }
       catch {
-        case e =>
+        case e: Throwable =>
           warn("Fetching topic metadata with correlation id %d for topics [%s] from broker [%s] failed"
             .format(correlationId, topics, shuffledBrokers(i).toString), e)
           t = e

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/cluster/Broker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index b03dea2..9407ed2 100644
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -42,7 +42,7 @@ private[kafka] object Broker {
           throw new BrokerNotAvailableException("Broker id %d does not exist".format(id))
       }
     } catch {
-      case t => throw new KafkaException("Failed to parse the broker info from zookeeper: " + brokerInfoString, t)
+      case t: Throwable => throw new KafkaException("Failed to parse the broker info from zookeeper: " + brokerInfoString, t)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
index 140f2e3..dc066c2 100644
--- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
@@ -204,7 +204,7 @@ object ConsoleConsumer extends Logging {
           formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out)
           numMessages += 1
         } catch {
-          case e =>
+          case e: Throwable =>
             if (skipMessageOnError)
               error("Error processing message, skipping this message: ", e)
             else
@@ -220,7 +220,7 @@ object ConsoleConsumer extends Logging {
         }
       }
     } catch {
-      case e => error("Error processing message, stopping consumer: ", e)
+      case e: Throwable => error("Error processing message, stopping consumer: ", e)
     }
     System.err.println("Consumed %d messages".format(numMessages))
     System.out.flush()
@@ -247,7 +247,7 @@ object ConsoleConsumer extends Logging {
       zk.deleteRecursive(dir)
       zk.close()
     } catch {
-      case _ => // swallow
+      case _: Throwable => // swallow
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index fa6b213..8c03308 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -79,7 +79,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
           }
         }
       } catch {
-        case t => {
+        case t: Throwable => {
             if (!isRunning.get())
               throw t /* If this thread is stopped, propagate this exception to kill the thread. */
             else
@@ -95,7 +95,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
           try {
             addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker)
           } catch {
-            case t => {
+            case t: Throwable => {
                 if (!isRunning.get())
                   throw t /* If this thread is stopped, propagate this exception to kill the thread. */
                 else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index 77e1ce2..24f7fb5 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -84,7 +84,7 @@ class SimpleConsumer(val host: String,
               disconnect()
               throw ioe
           }
-        case e => throw e
+        case e: Throwable => throw e
       }
       response
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/consumer/TopicCount.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala
index c8e8406..a3eb53e 100644
--- a/core/src/main/scala/kafka/consumer/TopicCount.scala
+++ b/core/src/main/scala/kafka/consumer/TopicCount.scala
@@ -67,7 +67,7 @@ private[kafka] object TopicCount extends Logging {
         case None => throw new KafkaException("error constructing TopicCount : " + topicCountString)
       }
     } catch {
-      case e =>
+      case e: Throwable =>
         error("error parsing consumer json string " + topicCountString, e)
         throw e
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 08c9e4f..857fd4d 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -178,7 +178,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
             zkClient = null
           }
         } catch {
-          case e =>
+          case e: Throwable =>
             fatal("error during consumer connector shutdown", e)
         }
         info("ZKConsumerConnector shut down completed")
@@ -335,7 +335,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
             if (doRebalance)
               syncedRebalance
           } catch {
-            case t => error("error during syncedRebalance", t)
+            case t: Throwable => error("error during syncedRebalance", t)
           }
         }
         info("stopping watcher executor thread for consumer " + consumerIdString)
@@ -387,7 +387,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
               cluster = getCluster(zkClient)
               done = rebalance(cluster)
             } catch {
-              case e =>
+              case e: Throwable =>
                 /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating.
                  * For example, a ZK node can disappear between the time we get all children and the time we try to get
                  * the value of a child. Just let this go since another rebalance will be triggered.
@@ -464,7 +464,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
             " for topic " + topic + " with consumers: " + curConsumers)
 
           for (consumerThreadId <- consumerThreadIdSet) {
-            val myConsumerPosition = curConsumers.findIndexOf(_ == consumerThreadId)
+            val myConsumerPosition = curConsumers.indexOf(consumerThreadId)
             assert(myConsumerPosition >= 0)
             val startPart = nPartsPerConsumer*myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
             val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)
@@ -584,7 +584,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
             // The node hasn't been deleted by the original owner. So wait a bit and retry.
             info("waiting for the partition ownership to be deleted: " + partition)
             false
-          case e2 => throw e2
+          case e2: Throwable => throw e2
         }
       }
       val hasPartitionOwnershipFailed = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => sum + (if(decision) 0 else 1))

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
index df83baa..a67c193 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
@@ -75,7 +75,7 @@ class ZookeeperTopicEventWatcher(val config:ConsumerConfig,
           }
         }
         catch {
-          case e =>
+          case e: Throwable =>
             error("error in handling child changes", e)
         }
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index ed1ce0b..beca460 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -93,7 +93,7 @@ class ControllerChannelManager (private val controllerContext: ControllerContext
       brokerStateInfo(brokerId).requestSendThread.shutdown()
       brokerStateInfo.remove(brokerId)
     }catch {
-      case e => error("Error while removing broker by the controller", e)
+      case e: Throwable => error("Error while removing broker by the controller", e)
     }
   }
 
@@ -142,7 +142,7 @@ class RequestSendThread(val controllerId: Int,
         }
       }
     } catch {
-      case e =>
+      case e: Throwable =>
         warn("Controller %d fails to send a request to broker %d".format(controllerId, toBrokerId), e)
         // If there is any socket error (eg, socket timeout), the channel is no longer usable and needs to be recreated.
         channel.disconnect()

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index ab18b7a..aef41ad 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -89,14 +89,14 @@ object KafkaController extends Logging {
         case None => throw new KafkaException("Failed to parse the controller info json [%s].".format(controllerInfoString))
       }
     } catch {
-      case t =>
+      case t: Throwable =>
         // It may be due to an incompatible controller register version
         warn("Failed to parse the controller info as json. "
           + "Probably this controller is still using the old format [%s] to store the broker id in zookeeper".format(controllerInfoString))
         try {
           return controllerInfoString.toInt
         } catch {
-          case t => throw new KafkaException("Failed to parse the controller info: " + controllerInfoString + ". This is neither the new or the old format.", t)
+          case t: Throwable => throw new KafkaException("Failed to parse the controller info: " + controllerInfoString + ". This is neither the new or the old format.", t)
         }
     }
   }
@@ -436,7 +436,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
           .format(topicAndPartition))
       }
     } catch {
-      case e => error("Error completing reassignment of partition %s".format(topicAndPartition), e)
+      case e: Throwable => error("Error completing reassignment of partition %s".format(topicAndPartition), e)
       // remove the partition from the admin path to unblock the admin client
       removePartitionFromReassignedPartitions(topicAndPartition)
     }
@@ -448,7 +448,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
       controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
       partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector)
     } catch {
-      case e => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
+      case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
     } finally {
       removePartitionsFromPreferredReplicaElection(partitions)
     }
@@ -514,9 +514,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
         } catch {
           case e: ZkNodeExistsException => throw new ControllerMovedException("Controller moved to another broker. " +
             "Aborting controller startup procedure")
-          case oe => error("Error while incrementing controller epoch", oe)
+          case oe: Throwable => error("Error while incrementing controller epoch", oe)
         }
-      case oe => error("Error while incrementing controller epoch", oe)
+      case oe: Throwable => error("Error while incrementing controller epoch", oe)
 
     }
     info("Controller %d incremented epoch to %d".format(config.brokerId, controllerContext.epoch))
@@ -693,7 +693,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
       debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap))
     } catch {
       case e: ZkNoNodeException => throw new IllegalStateException("Topic %s doesn't exist".format(topicAndPartition.topic))
-      case e2 => throw new KafkaException(e2.toString)
+      case e2: Throwable => throw new KafkaException(e2.toString)
     }
   }
 
@@ -905,7 +905,7 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic:
         }
       }
     }catch {
-      case e => error("Error while handling partition reassignment", e)
+      case e: Throwable => error("Error while handling partition reassignment", e)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index a084830..829163a 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -17,7 +17,8 @@
 package kafka.controller
 
 import collection._
-import collection.JavaConversions._
+import collection.JavaConversions
+import collection.mutable.Buffer
 import java.util.concurrent.atomic.AtomicBoolean
 import kafka.api.LeaderAndIsr
 import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
@@ -91,7 +92,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
       }
       brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)
     } catch {
-      case e => error("Error while moving some partitions to the online state", e)
+      case e: Throwable => error("Error while moving some partitions to the online state", e)
       // TODO: It is not enough to bail out and log an error, it is important to trigger leader election for those partitions
     }
   }
@@ -111,7 +112,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
       }
       brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)
     }catch {
-      case e => error("Error while moving some partitions to %s state".format(targetState), e)
+      case e: Throwable => error("Error while moving some partitions to %s state".format(targetState), e)
       // TODO: It is not enough to bail out and log an error, it is important to trigger state changes for those partitions
     }
   }
@@ -321,7 +322,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
     } catch {
       case lenne: LeaderElectionNotNeededException => // swallow
       case nroe: NoReplicaOnlineException => throw nroe
-      case sce =>
+      case sce: Throwable =>
         val failMsg = "encountered error while electing leader for partition %s due to: %s.".format(topicAndPartition, sce.getMessage)
         stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)
         throw new StateChangeFailedException(failMsg, sce)
@@ -359,8 +360,11 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
       controllerContext.controllerLock synchronized {
         if (hasStarted.get) {
           try {
-            debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
-            val currentChildren = JavaConversions.asBuffer(children).toSet
+            val currentChildren = {
+              import JavaConversions._
+              debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
+              (children: Buffer[String]).toSet
+            }
             val newTopics = currentChildren -- controllerContext.allTopics
             val deletedTopics = controllerContext.allTopics -- currentChildren
             //        val deletedPartitionReplicaAssignment = replicaAssignment.filter(p => deletedTopics.contains(p._1._1))
@@ -375,7 +379,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
             if(newTopics.size > 0)
               controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSet)
           } catch {
-            case e => error("Error while handling new topic", e )
+            case e: Throwable => error("Error while handling new topic", e )
           }
           // TODO: kafka-330  Handle deleted topics
         }
@@ -399,7 +403,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
           info("New partitions to be added [%s]".format(partitionsRemainingToBeAdded))
           controller.onNewPartitionCreation(partitionsRemainingToBeAdded.keySet.toSet)
         } catch {
-          case e => error("Error while handling add partitions for data path " + dataPath, e )
+          case e: Throwable => error("Error while handling add partitions for data path " + dataPath, e )
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index c964857..212c05d 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -89,7 +89,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
       replicas.foreach(r => handleStateChange(r.topic, r.partition, r.replica, targetState))
       brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)
     }catch {
-      case e => error("Error while moving some replicas to %s state".format(targetState), e)
+      case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e)
     }
   }
 
@@ -273,7 +273,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
               if(deadBrokerIds.size > 0)
                 controller.onBrokerFailure(deadBrokerIds.toSeq)
             } catch {
-              case e => error("Error while handling broker changes", e)
+              case e: Throwable => error("Error while handling broker changes", e)
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/javaapi/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/FetchRequest.scala b/core/src/main/scala/kafka/javaapi/FetchRequest.scala
index 83d8cbc..4060077 100644
--- a/core/src/main/scala/kafka/javaapi/FetchRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/FetchRequest.scala
@@ -17,10 +17,10 @@
 
 package kafka.javaapi
 
-import scala.collection.JavaConversions
 import java.nio.ByteBuffer
 import kafka.common.TopicAndPartition
 import kafka.api.{Request, PartitionFetchInfo}
+import scala.collection.mutable
 
 class FetchRequest(correlationId: Int,
                    clientId: String,
@@ -29,7 +29,10 @@ class FetchRequest(correlationId: Int,
                    requestInfo: java.util.Map[TopicAndPartition, PartitionFetchInfo]) {
 
   val underlying = {
-    val scalaMap = JavaConversions.asMap(requestInfo).toMap
+    val scalaMap: Map[TopicAndPartition, PartitionFetchInfo] = {
+      import scala.collection.JavaConversions._
+      (requestInfo: mutable.Map[TopicAndPartition, PartitionFetchInfo]).toMap
+    }
     kafka.api.FetchRequest(
       correlationId = correlationId,
       clientId = clientId,

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/javaapi/Implicits.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/Implicits.scala b/core/src/main/scala/kafka/javaapi/Implicits.scala
index 9a63914..8baf4d4 100644
--- a/core/src/main/scala/kafka/javaapi/Implicits.scala
+++ b/core/src/main/scala/kafka/javaapi/Implicits.scala
@@ -46,4 +46,10 @@ private[javaapi] object Implicits extends Logging {
       case None => null.asInstanceOf[T]
     }
   }
+
+  // used explicitly by ByteBufferMessageSet constructor as due to SI-4141 which affects Scala 2.8.1, implicits are not visible in constructors
+  implicit def javaListToScalaBuffer[A](l: java.util.List[A]) = {
+    import scala.collection.JavaConversions._
+    l: collection.mutable.Buffer[A]
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
index 32033d6..57b9d2a 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
@@ -27,7 +27,10 @@ class OffsetCommitRequest(groupId: String,
                           correlationId: Int,
                           clientId: String) {
   val underlying = {
-    val scalaMap = JavaConversions.asMap(requestInfo).toMap
+    val scalaMap: Map[TopicAndPartition, OffsetMetadataAndError] = {
+      import JavaConversions._
+      requestInfo.toMap
+    }
     kafka.api.OffsetCommitRequest(
       groupId = groupId,
       requestInfo = scalaMap,

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala b/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
index d1c50c4..570bf31 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
@@ -23,7 +23,8 @@ import collection.JavaConversions
 class OffsetCommitResponse(private val underlying: kafka.api.OffsetCommitResponse) {
 
   def errors: java.util.Map[TopicAndPartition, Short] = {
-    JavaConversions.asMap(underlying.requestInfo) 
+    import JavaConversions._
+    underlying.requestInfo
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala
index 64d134b..5b4f4bb 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala
@@ -18,6 +18,7 @@
 package kafka.javaapi
 
 import kafka.common.TopicAndPartition
+import scala.collection.mutable
 import collection.JavaConversions
 import java.nio.ByteBuffer
 
@@ -28,7 +29,10 @@ class OffsetFetchRequest(groupId: String,
                          clientId: String) {
 
   val underlying = {
-    val scalaSeq = JavaConversions.asBuffer(requestInfo)
+    val scalaSeq = {
+      import JavaConversions._
+      requestInfo: mutable.Buffer[TopicAndPartition]
+    }
     kafka.api.OffsetFetchRequest(
       groupId = groupId,
       requestInfo = scalaSeq,

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala b/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala
index 9f83c1b..60924d2 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala
@@ -23,7 +23,8 @@ import collection.JavaConversions
 class OffsetFetchResponse(private val underlying: kafka.api.OffsetFetchResponse) {
 
   def offsets: java.util.Map[TopicAndPartition, OffsetMetadataAndError] = {
-    JavaConversions.asMap(underlying.requestInfo)
+    import JavaConversions._
+    underlying.requestInfo
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
index 3565a15..c8a0ded 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
@@ -19,7 +19,7 @@ package kafka.javaapi
 
 import kafka.common.TopicAndPartition
 import kafka.api.{Request, PartitionOffsetRequestInfo}
-import collection.JavaConversions
+import scala.collection.mutable
 import java.nio.ByteBuffer
 
 
@@ -28,7 +28,10 @@ class OffsetRequest(requestInfo: java.util.Map[TopicAndPartition, PartitionOffse
                     clientId: String) {
 
   val underlying = {
-    val scalaMap = JavaConversions.asMap(requestInfo).toMap
+    val scalaMap = {
+      import collection.JavaConversions._
+      (requestInfo: mutable.Map[TopicAndPartition, PartitionOffsetRequestInfo]).toMap
+    }
     kafka.api.OffsetRequest(
       requestInfo = scalaMap,
       versionId = versionId,

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
index 97b6dcd..d08c3f4 100644
--- a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
+++ b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
@@ -17,16 +17,20 @@
 package kafka.javaapi
 
 import kafka.cluster.Broker
-import scala.collection.JavaConversions.asList
+import scala.collection.JavaConversions
 
 private[javaapi] object MetadataListImplicits {
   implicit def toJavaTopicMetadataList(topicMetadataSeq: Seq[kafka.api.TopicMetadata]):
-  java.util.List[kafka.javaapi.TopicMetadata] =
-    asList(topicMetadataSeq.map(new kafka.javaapi.TopicMetadata(_)))
+  java.util.List[kafka.javaapi.TopicMetadata] = {
+    import JavaConversions._
+    topicMetadataSeq.map(new kafka.javaapi.TopicMetadata(_))
+  }
 
   implicit def toPartitionMetadataList(partitionMetadataSeq: Seq[kafka.api.PartitionMetadata]):
-  java.util.List[kafka.javaapi.PartitionMetadata] =
-    asList(partitionMetadataSeq.map(new kafka.javaapi.PartitionMetadata(_)))
+  java.util.List[kafka.javaapi.PartitionMetadata] = {
+    import JavaConversions._
+    partitionMetadataSeq.map(new kafka.javaapi.PartitionMetadata(_))
+  }
 }
 
 class TopicMetadata(private val underlying: kafka.api.TopicMetadata) {
@@ -51,9 +55,15 @@ class PartitionMetadata(private val underlying: kafka.api.PartitionMetadata) {
     underlying.leader
   }
 
-  def replicas: java.util.List[Broker] = asList(underlying.replicas)
+  def replicas: java.util.List[Broker] = {
+    import JavaConversions._
+    underlying.replicas
+  }
 
-  def isr: java.util.List[Broker] = asList(underlying.isr)
+  def isr: java.util.List[Broker] = {
+    import JavaConversions._
+    underlying.isr
+  }
 
   def errorCode: Short = underlying.errorCode
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
index 5f80df7..05757a1 100644
--- a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
@@ -18,7 +18,7 @@ package kafka.javaapi
 
 import kafka.api._
 import java.nio.ByteBuffer
-import scala.collection.JavaConversions
+import scala.collection.mutable
 
 class TopicMetadataRequest(val versionId: Short,
                            override val correlationId: Int,
@@ -26,8 +26,10 @@ class TopicMetadataRequest(val versionId: Short,
                            val topics: java.util.List[String])
     extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey), correlationId) {
 
-  val underlying: kafka.api.TopicMetadataRequest =
-    new kafka.api.TopicMetadataRequest(versionId, correlationId, clientId, JavaConversions.asBuffer(topics))
+  val underlying: kafka.api.TopicMetadataRequest = {
+    import scala.collection.JavaConversions._
+    new kafka.api.TopicMetadataRequest(versionId, correlationId, clientId, topics: mutable.Buffer[String])
+  }
 
   def this(topics: java.util.List[String]) =
     this(kafka.api.TopicMetadataRequest.CurrentVersion, 0, kafka.api.TopicMetadataRequest.DefaultClientId, topics)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
index 14c4c8a..58e83f6 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
@@ -18,7 +18,8 @@ package kafka.javaapi.consumer
 
 import kafka.serializer._
 import kafka.consumer._
-import scala.collection.JavaConversions.asList
+import scala.collection.mutable
+import scala.collection.JavaConversions
 
 
 /**
@@ -71,9 +72,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
         keyDecoder: Decoder[K],
         valueDecoder: Decoder[V])
       : java.util.Map[String,java.util.List[KafkaStream[K,V]]] = {
-    import scala.collection.JavaConversions._
 
-    val scalaTopicCountMap: Map[String, Int] = Map.empty[String, Int] ++ asMap(topicCountMap.asInstanceOf[java.util.Map[String, Int]])
+    val scalaTopicCountMap: Map[String, Int] = {
+      import JavaConversions._
+      Map.empty[String, Int] ++ (topicCountMap.asInstanceOf[java.util.Map[String, Int]]: mutable.Map[String, Int])
+    }
     val scalaReturn = underlying.consume(scalaTopicCountMap, keyDecoder, valueDecoder)
     val ret = new java.util.HashMap[String,java.util.List[KafkaStream[K,V]]]
     for ((topic, streams) <- scalaReturn) {
@@ -88,8 +91,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   def createMessageStreams(topicCountMap: java.util.Map[String,java.lang.Integer]): java.util.Map[String,java.util.List[KafkaStream[Array[Byte],Array[Byte]]]] =
     createMessageStreams(topicCountMap, new DefaultDecoder(), new DefaultDecoder())
     
-  def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, numStreams: Int, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) =
-    asList(underlying.createMessageStreamsByFilter(topicFilter, numStreams, keyDecoder, valueDecoder))
+  def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, numStreams: Int, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) = {
+    import JavaConversions._
+    underlying.createMessageStreamsByFilter(topicFilter, numStreams, keyDecoder, valueDecoder)
+  }
 
   def createMessageStreamsByFilter(topicFilter: TopicFilter, numStreams: Int) = 
     createMessageStreamsByFilter(topicFilter, numStreams, new DefaultDecoder(), new DefaultDecoder())

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
index 0a95248..fecee8d 100644
--- a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
@@ -20,12 +20,14 @@ import java.util.concurrent.atomic.AtomicLong
 import scala.reflect.BeanProperty
 import java.nio.ByteBuffer
 import kafka.message._
+import kafka.javaapi.Implicits.javaListToScalaBuffer
 
 class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends MessageSet {
   private val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer)
   
   def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) {
-    this(new kafka.message.ByteBufferMessageSet(compressionCodec, new AtomicLong(0), scala.collection.JavaConversions.asBuffer(messages): _*).buffer)
+    // due to SI-4141 which affects Scala 2.8.1, implicits are not visible in constructors and must be used explicitly
+    this(new kafka.message.ByteBufferMessageSet(compressionCodec, new AtomicLong(0), javaListToScalaBuffer(messages).toSeq : _*).buffer)
   }
 
   def this(messages: java.util.List[Message]) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/javaapi/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/producer/Producer.scala b/core/src/main/scala/kafka/javaapi/producer/Producer.scala
index 7265328..c465da5 100644
--- a/core/src/main/scala/kafka/javaapi/producer/Producer.scala
+++ b/core/src/main/scala/kafka/javaapi/producer/Producer.scala
@@ -19,6 +19,7 @@ package kafka.javaapi.producer
 
 import kafka.producer.ProducerConfig
 import kafka.producer.KeyedMessage
+import scala.collection.mutable
 
 class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for testing only
 {
@@ -38,7 +39,7 @@ class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for
    */
   def send(messages: java.util.List[KeyedMessage[K,V]]) {
     import collection.JavaConversions._
-    underlying.send(asBuffer(messages):_*)
+    underlying.send((messages: mutable.Buffer[KeyedMessage[K,V]]).toSeq: _*)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 626eb8f..9fe61ff 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -21,7 +21,7 @@ import java.io.{IOException, File}
 import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
 import java.util.concurrent.atomic._
 import kafka.utils._
-import scala.collection.JavaConversions.asIterable;
+import scala.collection.JavaConversions
 import java.text.NumberFormat
 import kafka.message._
 import kafka.common._
@@ -162,7 +162,7 @@ class Log(val dir: File,
   }
   
   private def recoverLog() {
-    val lastOffset = try {activeSegment.nextOffset} catch {case _ => -1L}
+    val lastOffset = try {activeSegment.nextOffset} catch {case _: Throwable => -1L}
     if(lastOffset <= this.recoveryPoint) {
       info("Log '%s' is fully intact, skipping recovery".format(name))
       this.recoveryPoint = lastOffset
@@ -581,13 +581,19 @@ class Log(val dir: File,
   /**
    * All the log segments in this log ordered from oldest to newest
    */
-  def logSegments: Iterable[LogSegment] = asIterable(segments.values)
+  def logSegments: Iterable[LogSegment] = {
+    import JavaConversions._
+    segments.values
+  }
   
   /**
    * Get all segments beginning with the segment that includes "from" and ending with the segment
    * that includes up to "to-1" or the end of the log (if to > logEndOffset)
    */
-  def logSegments(from: Long, to: Long) = asIterable(segments.subMap(from, true, to, false).values)
+  def logSegments(from: Long, to: Long): Iterable[LogSegment] = {
+    import JavaConversions._
+    segments.subMap(from, true, to, false).values
+  }
   
   override def toString() = "Log(" + dir + ")"
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 48660bc..51ec796 100644
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -133,7 +133,8 @@ object LogConfig {
    * Check that property names are valid
    */
   private def validateNames(props: Properties) {
-    for(name <- JavaConversions.asMap(props).keys)
+    import JavaConversions._
+    for(name <- props.keys)
       require(LogConfig.ConfigNames.contains(name), "Unknown configuration \"%s\".".format(name))
   }
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index d039f9d..4719715 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -228,7 +228,7 @@ class LogManager(val logDirs: Array[File],
            .format(topicAndPartition.topic, 
                    topicAndPartition.partition, 
                    dataDir.getAbsolutePath,
-                   JavaConversions.asMap(config.toProps).mkString(", ")))
+                   {import JavaConversions._; config.toProps.mkString(", ")}))
       log
     }
   }
@@ -320,7 +320,7 @@ class LogManager(val logDirs: Array[File],
         if(timeSinceLastFlush >= log.config.flushMs)
           log.flush
       } catch {
-        case e =>
+        case e: Throwable =>
           error("Error flushing topic " + topicAndPartition.topic, e)
           e match {
             case _: IOException =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala b/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
index cab1864..a442545 100644
--- a/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
+++ b/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
@@ -82,7 +82,7 @@ private[kafka] class BoundedByteBufferReceive(val maxSize: Int) extends Receive
       case e: OutOfMemoryError =>
         error("OOME with size " + size, e)
         throw e
-      case e2 =>
+      case e2: Throwable =>
         throw e2
     }
     buffer

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/producer/SyncProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index 306f200..419156e 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -79,7 +79,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
           // no way to tell if write succeeded. Disconnect and re-throw exception to let client handle retry
           disconnect()
           throw e
-        case e => throw e
+        case e: Throwable => throw e
       }
       response
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 65613ce..c8326a8 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -129,7 +129,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
         else
           serializedMessages += new KeyedMessage[K,Message](topic = e.topic, key = e.key, partKey = e.partKey, message = new Message(bytes = encoder.toBytes(e.message)))
       } catch {
-        case t =>
+        case t: Throwable =>
           producerStats.serializationErrorRate.mark()
           if (isSync) {
             throw t
@@ -178,7 +178,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
     }catch {    // Swallow recoverable exceptions and return None so that they can be retried.
       case ute: UnknownTopicOrPartitionException => warn("Failed to collate messages by topic,partition due to: " + ute.getMessage); None
       case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to: " + lnae.getMessage); None
-      case oe => error("Failed to collate messages by topic, partition due to: " + oe.getMessage); None
+      case oe: Throwable => error("Failed to collate messages by topic, partition due to: " + oe.getMessage); None
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
index 2b41a49..42e9c74 100644
--- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
+++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
@@ -43,7 +43,7 @@ class ProducerSendThread[K,V](val threadName: String,
     try {
       processEvents
     }catch {
-      case e => error("Error in sending events: ", e)
+      case e: Throwable => error("Error in sending events: ", e)
     }finally {
       shutdownLatch.countDown
     }
@@ -103,7 +103,7 @@ class ProducerSendThread[K,V](val threadName: String,
       if(size > 0)
         handler.handle(events)
     }catch {
-      case e => error("Error in handling batch of " + size + " events", e)
+      case e: Throwable => error("Error in handling batch of " + size + " events", e)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index d5addb3..a5fc96d 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -95,7 +95,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
       trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
       response = simpleConsumer.fetch(fetchRequest)
     } catch {
-      case t =>
+      case t: Throwable =>
         if (isRunning.get) {
           warn("Error in fetch %s".format(fetchRequest), t)
           partitionMapLock synchronized {
@@ -136,7 +136,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
                       // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and
                       //    should get fixed in the subsequent fetches
                       logger.warn("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentOffset.get + " error " + ime.getMessage)
-                    case e =>
+                    case e: Throwable =>
                       throw new KafkaException("error processing data for partition [%s,%d] offset %d"
                                                .format(topic, partitionId, currentOffset.get), e)
                   }
@@ -147,7 +147,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
                     warn("Current offset %d for partition [%s,%d] out of range; reset offset to %d"
                       .format(currentOffset.get, topic, partitionId, newOffset))
                   } catch {
-                    case e =>
+                    case e: Throwable =>
                       warn("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e)
                       partitionsWithError += topicAndPartition
                   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0287f87..338d1cc 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -277,7 +277,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
                producerRequest.correlationId, producerRequest.clientId, topicAndPartition, nle.getMessage))
           new ProduceResult(topicAndPartition, nle)
-        case e =>
+        case e: Throwable =>
           BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark()
           BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark()
           error("Error processing ProducerRequest with correlation id %d from client %s on partition %s"
@@ -366,7 +366,7 @@ class KafkaApis(val requestChannel: RequestChannel,
               warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format(
                 fetchRequest.correlationId, fetchRequest.clientId, topic, partition, nle.getMessage))
               new FetchResponsePartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty)
-            case t =>
+            case t: Throwable =>
               BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
               BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark()
               error("Error when processing fetch request for partition [%s,%d] offset %d from %s with correlation id %d"
@@ -446,7 +446,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           warn("Offset request with correlation id %d from client %s on partition %s failed due to %s".format(
                offsetRequest.correlationId, offsetRequest.clientId, topicAndPartition,nle.getMessage))
           (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), Nil) )
-        case e =>
+        case e: Throwable =>
           warn("Error while responding to offset request", e)
           (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Nil) )
       }
@@ -550,7 +550,7 @@ class KafkaApis(val requestChannel: RequestChannel,
                     isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
                 new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
               } catch {
-                case e =>
+                case e: Throwable =>
                   error("Error while fetching metadata for partition %s".format(topicAndPartition), e)
                   new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo, isrInfo,
                     ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
@@ -609,7 +609,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             (topicAndPartition, ErrorMapping.NoError)
           }
         } catch {
-          case e => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+          case e: Throwable => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
         }
       }
     }
@@ -635,7 +635,7 @@ class KafkaApis(val requestChannel: RequestChannel,
                           ErrorMapping.UnknownTopicOrPartitionCode))
         }
       } catch {
-        case e => 
+        case e: Throwable =>
           (t, OffsetMetadataAndError(OffsetMetadataAndError.InvalidOffset, OffsetMetadataAndError.NoMetadata,
              ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])))
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/server/KafkaServerStartable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServerStartable.scala b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
index 5be65e9..acda52b 100644
--- a/core/src/main/scala/kafka/server/KafkaServerStartable.scala
+++ b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
@@ -34,7 +34,7 @@ class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
       server.startup()
     }
     catch {
-      case e =>
+      case e: Throwable =>
         fatal("Fatal error during KafkaServerStable startup. Prepare to shutdown", e)
         shutdown()
         System.exit(1)
@@ -46,7 +46,7 @@ class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
       server.shutdown()
     }
     catch {
-      case e =>
+      case e: Throwable =>
         fatal("Fatal error during KafkaServerStable shutdown. Prepare to halt", e)
         System.exit(1)
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 3ca4419..ee1cc0c 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -225,7 +225,7 @@ class ReplicaManager(val config: KafkaConfig,
             makeFollower(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders,
                          leaderAndISRRequest.correlationId)
         } catch {
-          case e =>
+          case e: Throwable =>
             val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d " +
                             "epoch %d for partition %s").format(localBrokerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId,
                                                                 leaderAndISRRequest.controllerEpoch, topicAndPartition)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/server/TopicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/TopicConfigManager.scala b/core/src/main/scala/kafka/server/TopicConfigManager.scala
index 5814cb7..56cae58 100644
--- a/core/src/main/scala/kafka/server/TopicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/TopicConfigManager.scala
@@ -77,7 +77,8 @@ class TopicConfigManager(private val zkClient: ZkClient,
    */
   private def processAllConfigChanges() {
     val configChanges = zkClient.getChildren(ZkUtils.TopicConfigChangesPath)
-    processConfigChanges(JavaConversions.asBuffer(configChanges).sorted)
+    import JavaConversions._
+    processConfigChanges((configChanges: mutable.Buffer[String]).sorted)
   }
 
   /**
@@ -123,7 +124,8 @@ class TopicConfigManager(private val zkClient: ZkClient,
   object ConfigChangeListener extends IZkChildListener {
     override def handleChildChange(path: String, chillins: java.util.List[String]) {
       try {
-        processConfigChanges(JavaConversions.asBuffer(chillins))
+        import JavaConversions._
+        processConfigChanges(chillins: mutable.Buffer[String])
       } catch {
         case e: Exception => error("Error processing config change:", e)
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index f1f0625..33b7360 100644
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -72,7 +72,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
           }
           if (leaderId != -1)
             debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
-        case e2 =>
+        case e2: Throwable =>
           error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
           leaderId = -1
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
index 55709b5..c8023ee 100644
--- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
@@ -102,7 +102,7 @@ object ImportZkOffsets extends Logging {
       try {
         ZkUtils.updatePersistentPath(zkClient, partition, offset.toString)
       } catch {
-        case e => e.printStackTrace()
+        case e: Throwable => e.printStackTrace()
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/tools/JmxTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala
index 7e424e7..747a675 100644
--- a/core/src/main/scala/kafka/tools/JmxTool.scala
+++ b/core/src/main/scala/kafka/tools/JmxTool.scala
@@ -86,7 +86,7 @@ object JmxTool extends Logging {
       else
         List(null)
 
-    val names = queries.map((name: ObjectName) => asSet(mbsc.queryNames(name, null))).flatten
+    val names = queries.map((name: ObjectName) => mbsc.queryNames(name, null): mutable.Set[ObjectName]).flatten
     val allAttributes: Iterable[(ObjectName, Array[String])] =
       names.map((name: ObjectName) => (name, mbsc.getMBeanInfo(name).getAttributes().map(_.getName)))
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 6fb545a..f0f871c 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -129,7 +129,7 @@ object MirrorMaker extends Logging {
     try {
       streams = connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue(), new DefaultDecoder(), new DefaultDecoder())).flatten
     } catch {
-      case t =>
+      case t: Throwable =>
         fatal("Unable to create stream - shutting down mirror maker.")
         connectors.foreach(_.shutdown)
     }
@@ -204,7 +204,7 @@ object MirrorMaker extends Logging {
           }
         }
       } catch {
-        case e =>
+        case e: Throwable =>
           fatal("Stream unexpectedly exited.", e)
       } finally {
         shutdownLatch.countDown()

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index 7629329..747e072 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -217,7 +217,7 @@ object SimpleConsumerShell extends Logging {
                 formatter.writeTo(key, if(message.isNull) null else Utils.readBytes(message.payload), System.out)
                 numMessagesConsumed += 1
               } catch {
-                case e =>
+                case e: Throwable =>
                   if (skipMessageOnError)
                     error("Error processing message, skipping this message: ", e)
                   else

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/utils/Annotations.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Annotations.scala b/core/src/main/scala/kafka/utils/Annotations.scala
deleted file mode 100644
index 28269eb..0000000
--- a/core/src/main/scala/kafka/utils/Annotations.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-/* Some helpful annotations */
-
-/**
- * Indicates that the annotated class is meant to be threadsafe. For an abstract class it is an part of the interface that an implementation 
- * must respect
- */
-class threadsafe extends StaticAnnotation
-
-/**
- * Indicates that the annotated class is not threadsafe
- */
-class nonthreadsafe extends StaticAnnotation
-
-/**
- * Indicates that the annotated class is immutable
- */
-class immutable extends StaticAnnotation

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/utils/Json.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Json.scala b/core/src/main/scala/kafka/utils/Json.scala
index 3f1252c..d110284 100644
--- a/core/src/main/scala/kafka/utils/Json.scala
+++ b/core/src/main/scala/kafka/utils/Json.scala
@@ -36,7 +36,7 @@ object Json extends Logging {
       try {
         JSON.parseFull(input)
       } catch {
-        case t =>
+        case t: Throwable =>
           throw new KafkaException("Can't parse json string: %s".format(input), t)
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/utils/KafkaScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
index 73457e1..8e37505 100644
--- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala
+++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
@@ -99,7 +99,7 @@ class KafkaScheduler(val threads: Int,
           trace("Begining execution of scheduled task '%s'.".format(name))
           fun()
         } catch {
-          case t => error("Uncaught exception in scheduled task '" + name +"'", t)
+          case t: Throwable => error("Uncaught exception in scheduled task '" + name +"'", t)
         } finally {
           trace("Completed execution of scheduled task '%s'.".format(name))
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/utils/Mx4jLoader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Mx4jLoader.scala b/core/src/main/scala/kafka/utils/Mx4jLoader.scala
index 64d84cc..db9f20b 100644
--- a/core/src/main/scala/kafka/utils/Mx4jLoader.scala
+++ b/core/src/main/scala/kafka/utils/Mx4jLoader.scala
@@ -64,7 +64,7 @@ object Mx4jLoader extends Logging {
 	  case e: ClassNotFoundException => {
         info("Will not load MX4J, mx4j-tools.jar is not in the classpath");
       }
-      case e => {
+      case e: Throwable => {
         warn("Could not start register mbean in JMX", e);
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/utils/Pool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Pool.scala b/core/src/main/scala/kafka/utils/Pool.scala
index 9a86eab..9ddcde7 100644
--- a/core/src/main/scala/kafka/utils/Pool.scala
+++ b/core/src/main/scala/kafka/utils/Pool.scala
@@ -19,6 +19,7 @@ package kafka.utils
 
 import java.util.ArrayList
 import java.util.concurrent._
+import collection.mutable
 import collection.JavaConversions
 import kafka.common.KafkaException
 import java.lang.Object
@@ -71,10 +72,15 @@ class Pool[K,V](valueFactory: Option[(K) => V] = None) extends Iterable[(K, V)]
   
   def remove(key: K): V = pool.remove(key)
   
-  def keys = JavaConversions.asSet(pool.keySet())
+  def keys: mutable.Set[K] = {
+    import JavaConversions._
+    pool.keySet()
+  }
   
-  def values: Iterable[V] = 
-    JavaConversions.asIterable(new ArrayList[V](pool.values()))
+  def values: Iterable[V] = {
+    import JavaConversions._
+    new ArrayList[V](pool.values())
+  }
   
   def clear() { pool.clear() }
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/utils/VerifiableProperties.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/VerifiableProperties.scala b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
index 9009a9d..a288ad5 100644
--- a/core/src/main/scala/kafka/utils/VerifiableProperties.scala
+++ b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
@@ -195,7 +195,10 @@ class VerifiableProperties(val props: Properties) extends Logging {
 
   def verify() {
     info("Verifying properties")
-    val propNames = JavaConversions.asBuffer(Collections.list(props.propertyNames)).map(_.toString).sorted
+    val propNames = {
+      import JavaConversions._
+      Collections.list(props.propertyNames).map(_.toString).sorted
+    }
     for(key <- propNames) {
       if (!referenceSet.contains(key))
         warn("Property %s is not valid".format(key))

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 4094dcb..d1c4b3d 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -278,7 +278,7 @@ object ZkUtils extends Logging {
           storedData = readData(client, path)._1
         } catch {
           case e1: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this
-          case e2 => throw e2
+          case e2: Throwable => throw e2
         }
         if (storedData == null || storedData != data) {
           info("conflict in " + path + " data: " + data + " stored data: " + storedData)
@@ -288,7 +288,7 @@ object ZkUtils extends Logging {
           info(path + " exists with value " + data + " during connection loss; this is ok")
         }
       }
-      case e2 => throw e2
+      case e2: Throwable => throw e2
     }
   }
 
@@ -328,7 +328,7 @@ object ZkUtils extends Logging {
             case None => // the node disappeared; retry creating the ephemeral node immediately
           }
         }
-        case e2 => throw e2
+        case e2: Throwable => throw e2
       }
     }
   }
@@ -367,10 +367,10 @@ object ZkUtils extends Logging {
         } catch {
           case e: ZkNodeExistsException =>
             client.writeData(path, data)
-          case e2 => throw e2
+          case e2: Throwable => throw e2
         }
       }
-      case e2 => throw e2
+      case e2: Throwable => throw e2
     }
   }
 
@@ -423,7 +423,7 @@ object ZkUtils extends Logging {
         createParentPath(client, path)
         client.createEphemeral(path, data)
       }
-      case e2 => throw e2
+      case e2: Throwable => throw e2
     }
   }
   
@@ -435,7 +435,7 @@ object ZkUtils extends Logging {
         // this can happen during a connection loss event, return normally
         info(path + " deleted during connection loss; this is ok")
         false
-      case e2 => throw e2
+      case e2: Throwable => throw e2
     }
   }
 
@@ -446,7 +446,7 @@ object ZkUtils extends Logging {
       case e: ZkNoNodeException =>
         // this can happen during a connection loss event, return normally
         info(path + " deleted during connection loss; this is ok")
-      case e2 => throw e2
+      case e2: Throwable => throw e2
     }
   }
   
@@ -456,7 +456,7 @@ object ZkUtils extends Logging {
       zk.deleteRecursive(dir)
       zk.close()
     } catch {
-      case _ => // swallow
+      case _: Throwable => // swallow
     }
   }
 
@@ -473,7 +473,7 @@ object ZkUtils extends Logging {
                       } catch {
                         case e: ZkNoNodeException =>
                           (None, stat)
-                        case e2 => throw e2
+                        case e2: Throwable => throw e2
                       }
     dataAndStat
   }
@@ -491,7 +491,7 @@ object ZkUtils extends Logging {
       client.getChildren(path)
     } catch {
       case e: ZkNoNodeException => return Nil
-      case e2 => throw e2
+      case e2: Throwable => throw e2
     }
   }
 
@@ -682,7 +682,7 @@ object ZkUtils extends Logging {
           case nne: ZkNoNodeException =>
             ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
             debug("Created path %s with %s for partition reassignment".format(zkPath, jsonData))
-          case e2 => throw new AdminOperationException(e2.toString)
+          case e2: Throwable => throw new AdminOperationException(e2.toString)
         }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index abcbed8..09254cc 100644
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -104,7 +104,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
       fail("Topic should not exist")
     } catch {
       case e: AdminOperationException => //this is good
-      case e2 => throw e2
+      case e2: Throwable => throw e2
     }
   }
 
@@ -114,7 +114,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
       fail("Add partitions should fail")
     } catch {
       case e: AdminOperationException => //this is good
-      case e2 => throw e2
+      case e2: Throwable => throw e2
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index 121b6c5..8fe7259 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -83,7 +83,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
         fail("should get an exception")
       } catch {
         case e: ConsumerTimeoutException => // this is ok
-        case e => throw e
+        case e: Throwable => throw e
       }
     }
 
@@ -406,10 +406,12 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
   }
 
   def getZKChildrenValues(path : String) : Seq[Tuple2[String,String]] = {
-    import scala.collection.JavaConversions
     val children = zkClient.getChildren(path)
     Collections.sort(children)
-    val childrenAsSeq : Seq[java.lang.String] = JavaConversions.asBuffer(children)
+    val childrenAsSeq : Seq[java.lang.String] = {
+      import JavaConversions._
+      children.toSeq
+    }
     childrenAsSeq.map(partition =>
       (partition, zkClient.readData(path + "/" + partition).asInstanceOf[String]))
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e554c4d/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
index 9f243f0..43af649 100644
--- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
@@ -21,7 +21,7 @@ import junit.framework.Assert._
 import kafka.integration.KafkaServerTestHarness
 import kafka.server._
 import org.scalatest.junit.JUnit3Suite
-import scala.collection.JavaConversions._
+import scala.collection.JavaConversions
 import org.apache.log4j.{Level, Logger}
 import kafka.message._
 import kafka.serializer._
@@ -84,8 +84,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     for (partition <- 0 until numParts) {
       val ms = 0.until(messagesPerNode).map(x => header + conf.brokerId + "-" + partition + "-" + x)
       messages ++= ms
-      import scala.collection.JavaConversions._
-      javaProducer.send(asList(ms.map(new KeyedMessage[Int, String](topic, partition, _))))
+      import JavaConversions._
+      javaProducer.send(ms.map(new KeyedMessage[Int, String](topic, partition, _)): java.util.List[KeyedMessage[Int, String]])
     }
     javaProducer.close
     messages
@@ -103,7 +103,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
   def getMessages(nMessagesPerThread: Int, 
                   jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaStream[String, String]]]): List[String] = {
     var messages: List[String] = Nil
-    val topicMessageStreams = asMap(jTopicMessageStreams)
+    import scala.collection.JavaConversions._
+    val topicMessageStreams: collection.mutable.Map[String, java.util.List[KafkaStream[String, String]]] = jTopicMessageStreams
     for ((topic, messageStreams) <- topicMessageStreams) {
       for (messageStream <- messageStreams) {
         val iterator = messageStream.iterator


Mime
View raw message