kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [2/4] kafka git commit: MINOR: A bunch of clean-ups related to usage of unused variables
Date Tue, 25 Oct 2016 02:17:45 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index 653b40c..b7fc657 100755
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -52,7 +52,7 @@ class ConsumerIteratorTest extends KafkaServerTestHarness {
   @Before
   override def setUp() {
     super.setUp()
-    topicInfos = configs.map(c => new PartitionTopicInfo(topic,
+    topicInfos = configs.map(_ => new PartitionTopicInfo(topic,
       0,
       queue,
       new AtomicLong(consumedOffset),
@@ -77,7 +77,7 @@ class ConsumerIteratorTest extends KafkaServerTestHarness {
                                                     new StringDecoder(), 
                                                     new StringDecoder(),
                                                     clientId = "")
-    val receivedMessages = (0 until 5).map(i => iter.next.message).toList
+    val receivedMessages = (0 until 5).map(_ => iter.next.message)
 
     assertFalse(iter.hasNext)
     assertEquals(0, queue.size) // Shutdown command has been consumed.
@@ -101,17 +101,14 @@ class ConsumerIteratorTest extends KafkaServerTestHarness {
       new FailDecoder(),
       clientId = "")
 
-    val receivedMessages = (0 until 5).map{ i =>
+    (0 until 5).foreach { i =>
       assertTrue(iter.hasNext)
       val message = iter.next
       assertEquals(message.offset, i + consumedOffset)
 
-      try {
-        message.message // should fail
-      }
+      try message.message // should fail
       catch {
-        case e: UnsupportedOperationException => // this is ok
-        case e2: Throwable => fail("Unexpected exception when iterating the message set. " + e2.getMessage)
+        case _: UnsupportedOperationException => // this is ok
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
index 818229c..9e568f8 100644
--- a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
@@ -35,49 +35,49 @@ class PartitionAssignorTest extends Logging {
     val assignor = new RoundRobinAssignor
 
     /** various scenarios with only wildcard consumers */
-    (1 to PartitionAssignorTest.TestCaseCount).foreach (testCase => {
+    (1 to PartitionAssignorTest.TestCaseCount).foreach { _ =>
       val consumerCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxConsumerCount + 1))
       val topicCount = PartitionAssignorTest.MinTopicCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxTopicCount + 1))
 
       val topicPartitionCounts = Map((1 to topicCount).map(topic => {
         ("topic-" + topic, PartitionAssignorTest.MinPartitionCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxPartitionCount)))
-      }).toSeq:_*)
+      }):_*)
       
-      val subscriptions = Map((1 to consumerCount).map(consumer => {
+      val subscriptions = Map((1 to consumerCount).map { consumer =>
         val streamCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxStreamCount + 1))
         ("g1c" + consumer, WildcardSubscriptionInfo(streamCount, ".*", isWhitelist = true))
-      }).toSeq:_*)
+      }:_*)
       val scenario = Scenario("g1", topicPartitionCounts, subscriptions)
       val zkUtils = PartitionAssignorTest.setupZkClientMock(scenario)
       EasyMock.replay(zkUtils.zkClient)
       PartitionAssignorTest.assignAndVerify(scenario, assignor, zkUtils, verifyAssignmentIsUniform = true)
-    })
+    }
   }
 
   @Test
   def testRangePartitionAssignor() {
     val assignor = new RangeAssignor
-    (1 to PartitionAssignorTest.TestCaseCount).foreach (testCase => {
+    (1 to PartitionAssignorTest.TestCaseCount).foreach { _ =>
       val consumerCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxConsumerCount + 1))
       val topicCount = PartitionAssignorTest.MinTopicCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxTopicCount + 1))
 
       val topicPartitionCounts = Map((1 to topicCount).map(topic => {
         ("topic-" + topic, PartitionAssignorTest.MinPartitionCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxPartitionCount)))
-      }).toSeq:_*)
+      }):_*)
 
-      val subscriptions = Map((1 to consumerCount).map(consumer => {
+      val subscriptions = Map((1 to consumerCount).map { consumer =>
         val streamCounts = Map((1 to topicCount).map(topic => {
             val streamCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxStreamCount + 1))
             ("topic-" + topic, streamCount)
-          }).toSeq:_*)
+          }):_*)
         ("g1c" + consumer, StaticSubscriptionInfo(streamCounts))
-      }).toSeq:_*)
+      }:_*)
       val scenario = Scenario("g1", topicPartitionCounts, subscriptions)
       val zkUtils = PartitionAssignorTest.setupZkClientMock(scenario)
       EasyMock.replay(zkUtils.zkClient)
 
       PartitionAssignorTest.assignAndVerify(scenario, assignor, zkUtils)
-    })
+    }
   }
 }
 
@@ -163,7 +163,7 @@ private object PartitionAssignorTest extends Logging {
 
   private def assignAndVerify(scenario: Scenario, assignor: PartitionAssignor, zkUtils: ZkUtils,
                               verifyAssignmentIsUniform: Boolean = false) {
-    val assignments = scenario.subscriptions.map{ case(consumer, subscription)  =>
+    val assignments = scenario.subscriptions.map { case (consumer, _)  =>
       val ctx = new AssignmentContext("g1", consumer, excludeInternalTopics = true, zkUtils)
       assignor.assign(ctx).get(consumer)
     }
@@ -200,7 +200,7 @@ private object PartitionAssignorTest extends Logging {
   /** For each consumer stream, count the number of partitions that it owns. */
   private def partitionCountPerStream(assignment: collection.Map[TopicAndPartition, ConsumerThreadId]) = {
     val ownedCounts = collection.mutable.Map[ConsumerThreadId, Int]()
-    assignment.foreach { case (topicPartition, owner) =>
+    assignment.foreach { case (_, owner) =>
       val updatedCount = ownedCounts.getOrElse(owner, 0) + 1
       ownedCounts.put(owner, updatedCount)
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index b054794..df80d1d 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -81,13 +81,12 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
 
     // no messages to consume, we should hit timeout;
     // also the iterator should support re-entrant, so loop it twice
-    for (i <- 0 until 2) {
+    for (_ <- 0 until 2) {
       try {
         getMessages(topicMessageStreams0, nMessages * 2)
         fail("should get an exception")
       } catch {
-        case e: ConsumerTimeoutException => // this is ok
-        case e: Throwable => throw e
+        case _: ConsumerTimeoutException => // this is ok
       }
     }
 
@@ -147,7 +146,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
     val consumerConfig3 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer3))
     val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
-    val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
+    zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
     // send some messages to each broker
     val sentMessages3 = sendMessages(servers, topic, nMessages, 0) ++
                         sendMessages(servers, topic, nMessages, 1)
@@ -164,10 +163,10 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
 
     // call createMesssageStreams twice should throw MessageStreamsExistException
     try {
-      val topicMessageStreams4 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
+      zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
       fail("Should fail with MessageStreamsExistException")
     } catch {
-      case e: MessageStreamsExistException => // expected
+      case _: MessageStreamsExistException => // expected
     }
 
     zkConsumerConnector1.shutdown
@@ -235,7 +234,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
     val consumerConfig3 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer3))
     val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
-    val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int](), new StringDecoder(), new StringDecoder())
+    zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int](), new StringDecoder(), new StringDecoder())
     // send some messages to each broker
     val sentMessages3 = sendMessages(servers, topic, nMessages, 0, GZIPCompressionCodec) ++
                         sendMessages(servers, topic, nMessages, 1, GZIPCompressionCodec)
@@ -304,10 +303,10 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
       zkConsumerConnector.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
 
     var receivedMessages: List[String] = Nil
-    for ((topic, messageStreams) <- topicMessageStreams) {
+    for (messageStreams <- topicMessageStreams.values) {
       for (messageStream <- messageStreams) {
         val iterator = messageStream.iterator
-        for (i <- 0 until nMessages * 2) {
+        for (_ <- 0 until nMessages * 2) {
           assertTrue(iterator.hasNext())
           val message = iterator.next().message
           receivedMessages ::= message
@@ -390,7 +389,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
     // Register consumer rebalance listener
     val rebalanceListener2 = new TestConsumerRebalanceListener()
     zkConsumerConnector2.setConsumerRebalanceListener(rebalanceListener2)
-    val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
+    zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
 
     // Consume messages from consumer 1 to make sure it has finished rebalance
     getMessages(topicMessageStreams1, nMessages)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
index 699715b..6430b33 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
@@ -100,9 +100,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
           controller.kafkaController.sendUpdateMetadataRequest(Seq(0), Set(topicPartition))
           log.info("Queue state %d %d".format(channelManager.queueCapacity(0), channelManager.queueSize(0)))
         } catch {
-          case e : Exception => {
-            log.info("Thread interrupted")
-          }
+          case _: Exception => log.info("Thread interrupted")
         }
       }
     })

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
index 0a1032f..a52fe48 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
@@ -156,7 +156,7 @@ class GroupMetadataManagerTest {
 
     val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
       protocolType, List(("protocol", Array[Byte]())))
-    member.awaitingJoinCallback = (joinGroupResult: JoinGroupResult) => {}
+    member.awaitingJoinCallback = _ => ()
     group.add(memberId, member)
     group.transitionTo(PreparingRebalance)
     group.initNextGeneration()
@@ -389,7 +389,7 @@ class GroupMetadataManagerTest {
 
     val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
       protocolType, List(("protocol", Array[Byte]())))
-    member.awaitingJoinCallback = (joinGroupResult: JoinGroupResult) => {}
+    member.awaitingJoinCallback = _ => ()
     group.add(memberId, member)
     group.transitionTo(PreparingRebalance)
     group.initNextGeneration()

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
index 8539340..bf695bf 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
@@ -252,7 +252,7 @@ class GroupMetadataTest extends JUnitSuite {
       protocolType, List(("roundrobin", Array.empty[Byte])))
 
     group.transitionTo(PreparingRebalance)
-    member.awaitingJoinCallback = (result) => {}
+    member.awaitingJoinCallback = _ => ()
     group.add(memberId, member)
 
     assertEquals(0, group.generationId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
index 4515b94..2221d90 100644
--- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
@@ -83,7 +83,7 @@ class AutoOffsetResetTest extends KafkaServerTestHarness with Logging {
       TestUtils.getBrokerListStrFromServers(servers),
       keyEncoder = classOf[StringEncoder].getName)
 
-    for(i <- 0 until numMessages)
+    for(_ <- 0 until numMessages)
       producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, "test".getBytes))
 
     // update offset in zookeeper for consumer to jump "forward" in time
@@ -103,12 +103,12 @@ class AutoOffsetResetTest extends KafkaServerTestHarness with Logging {
     var received = 0
     val iter = messageStream.iterator
     try {
-      for (i <- 0 until numMessages) {
+      for (_ <- 0 until numMessages) {
         iter.next // will throw a timeout exception if the message isn't there
         received += 1
       }
     } catch {
-      case e: ConsumerTimeoutException => 
+      case _: ConsumerTimeoutException =>
         info("consumer timed out after receiving " + received + " messages.")
     } finally {
       producer.close()

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 881837f..003c04c 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -48,7 +48,7 @@ class FetcherTest extends KafkaServerTestHarness {
 
     fetcher = new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkUtils)
     fetcher.stopConnections()
-    val topicInfos = configs.map(c =>
+    val topicInfos = configs.map(_ =>
       new PartitionTopicInfo(topic,
         0,
         queue,
@@ -81,13 +81,10 @@ class FetcherTest extends KafkaServerTestHarness {
 
   def fetch(expected: Int) {
     var count = 0
-    while(true) {
+    while (count < expected) {
       val chunk = queue.poll(2L, TimeUnit.SECONDS)
       assertNotNull("Timed out waiting for data chunk " + (count + 1), chunk)
-      for(message <- chunk.messages)
-        count += 1
-      if(count == expected)
-        return
+      count += chunk.messages.size
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index 3998a21..201fa87 100755
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -152,14 +152,14 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness {
         response.data.foreach(pdata => ErrorMapping.maybeThrowException(pdata._2.error))
         fail("Expected exception when fetching message with invalid offset")
       } catch {
-        case e: OffsetOutOfRangeException => // This is good.
+        case _: OffsetOutOfRangeException => // This is good.
       }
     }
 
     {
       // send some invalid partitions
       val builder = new FetchRequestBuilder()
-      for((topic, partition) <- topics)
+      for((topic, _) <- topics)
         builder.addFetch(topic, -1, 0, 10000)
 
       try {
@@ -168,7 +168,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness {
         response.data.foreach(pdata => ErrorMapping.maybeThrowException(pdata._2.error))
         fail("Expected exception when fetching message with invalid partition")
       } catch {
-        case e: UnknownTopicOrPartitionException => // This is good.
+        case _: UnknownTopicOrPartitionException => // This is good.
       }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
index 83cce77..7d8e0c2 100644
--- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
@@ -72,10 +72,10 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
 
     // call createMesssageStreams twice should throw MessageStreamsExistException
     try {
-      val topicMessageStreams2 = zkConsumerConnector1.createMessageStreams(toJavaMap(Map(topic -> numNodes*numParts/2)), new StringDecoder(), new StringDecoder())
+      zkConsumerConnector1.createMessageStreams(toJavaMap(Map(topic -> numNodes*numParts/2)), new StringDecoder(), new StringDecoder())
       fail("Should fail with MessageStreamsExistException")
     } catch {
-      case e: MessageStreamsExistException => // expected
+      case _: MessageStreamsExistException => // expected
     }
     zkConsumerConnector1.shutdown
     info("all consumer connectors stopped")

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
index 8702474..a7f0446 100644
--- a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
@@ -46,7 +46,7 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
   @Test
   def testFileSize() {
     assertEquals(messageSet.channel.size, messageSet.sizeInBytes)
-    for(i <- 0 until 20) {
+    for (_ <- 0 until 20) {
       messageSet.append(singleMessageSet("abcd".getBytes))
       assertEquals(messageSet.channel.size, messageSet.sizeInBytes)
     } 
@@ -66,9 +66,8 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
   
   def testPartialWrite(size: Int, messageSet: FileMessageSet) {
     val buffer = ByteBuffer.allocate(size)
-    val originalPosition = messageSet.channel.position
-    for(i <- 0 until size)
-      buffer.put(0.asInstanceOf[Byte])
+    for (_ <- 0 until size)
+      buffer.put(0: Byte)
     buffer.rewind()
     messageSet.channel.write(buffer)
     // appending those bytes should not change the contents
@@ -195,7 +194,7 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
       msgSet.truncateTo(43)
       fail("Should throw KafkaException")
     } catch {
-      case e: KafkaException => // expected
+      case _: KafkaException => // expected
     }
 
     EasyMock.verify(channelMock)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
index 14d24f7..36c61d6 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
@@ -120,9 +120,9 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Logging
   }
 
   private def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionCodec, timestamp: Long): Seq[(Int, Int)] = {
-    for(dup <- 0 until numDups; key <- 0 until numKeys) yield {
+    for (_ <- 0 until numDups; key <- 0 until numKeys) yield {
       val count = counter
-      val info = log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, codec = codec, key = key.toString.getBytes, timestamp = timestamp), assignOffsets = true)
+      log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, codec = codec, key = key.toString.getBytes, timestamp = timestamp), assignOffsets = true)
       counter += 1
       (key, count)
     }
@@ -185,4 +185,4 @@ object LogCleanerLagIntegrationTest {
       list.add(Array(codec.name))
     list
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 4fa73dc..6e5806f 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -173,15 +173,6 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
     cleanerManager
   }
 
-  private def appendMessagesAndExpireSegments(set: ByteBufferMessageSet, log: Log): Unit = {
-    // append some messages to create some segments
-    for (i <- 0 until 100)
-      log.append(set)
-
-    // expire all segments
-    log.logSegments.foreach(_.lastModified = time.milliseconds - 1000)
-  }
-
   private def createLog(segmentSize: Int, cleanupPolicy: String = "delete"): Log = {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, segmentSize: Integer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 24a6366..d80fba1 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -254,7 +254,7 @@ class LogCleanerTest extends JUnitSuite {
 
     // create 6 segments with only one message in each segment
     val messageSet = TestUtils.singleMessageSet(payload = Array.fill[Byte](50)(0), key = 1.toString.getBytes)
-    for (i <- 0 until 6)
+    for (_ <- 0 until 6)
       log.append(messageSet, assignOffsets = true)
 
     val logToClean = LogToClean(TopicAndPartition("test", 0), log, log.activeSegment.baseOffset, log.activeSegment.baseOffset)
@@ -272,7 +272,7 @@ class LogCleanerTest extends JUnitSuite {
 
     // create 6 segments with only one message in each segment
     val messageSet = TestUtils.singleMessageSet(payload = Array.fill[Byte](50)(0), key = 1.toString.getBytes)
-    for (i <- 0 until 6)
+    for (_ <- 0 until 6)
       log.append(messageSet, assignOffsets = true)
 
     // segments [0,1] are clean; segments [2, 3] are cleanable; segments [4,5] are uncleanable
@@ -281,7 +281,6 @@ class LogCleanerTest extends JUnitSuite {
 
     val expectedCleanSize = segs.take(2).map(_.size).sum
     val expectedCleanableSize = segs.slice(2, 4).map(_.size).sum
-    val expectedUncleanableSize = segs.drop(4).map(_.size).sum
 
     assertEquals("Uncleanable bytes of LogToClean should equal size of all segments prior the one containing first dirty",
       logToClean.cleanBytes, expectedCleanSize)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index 6d34c78..569264a 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -60,7 +60,7 @@ class LogConfigTest {
       case LogConfig.MinCleanableDirtyRatioProp => assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2")
       case LogConfig.MinInSyncReplicasProp => assertPropertyInvalid(name, "not_a_number", "0", "-1")
       case LogConfig.MessageFormatVersionProp => assertPropertyInvalid(name, "")
-      case positiveIntProperty => assertPropertyInvalid(name, "not_a_number", "-1")
+      case _ => assertPropertyInvalid(name, "not_a_number", "-1")
     })
   }
 
@@ -105,8 +105,4 @@ class LogConfigTest {
     })
   }
 
-  private def randFrom[T](choices: T*): T = {
-    import scala.util.Random
-    choices(Random.nextInt(choices.size))
-  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 95a1cb5..5421da9 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -88,8 +88,8 @@ class LogManagerTest {
   def testCleanupExpiredSegments() {
     val log = logManager.createLog(TopicAndPartition(name, 0), logConfig)
     var offset = 0L
-    for(i <- 0 until 200) {
-      var set = TestUtils.singleMessageSet("test".getBytes())
+    for(_ <- 0 until 200) {
+      val set = TestUtils.singleMessageSet("test".getBytes())
       val info = log.append(set)
       offset = info.lastOffset
     }
@@ -107,7 +107,7 @@ class LogManagerTest {
       log.read(0, 1024)
       fail("Should get exception from fetching earlier.")
     } catch {
-      case e: OffsetOutOfRangeException => // This is good.
+      case _: OffsetOutOfRangeException => // This is good.
     }
     // log should still be appendable
     log.append(TestUtils.singleMessageSet("test".getBytes()))
@@ -134,7 +134,7 @@ class LogManagerTest {
 
     // add a bunch of messages that should be larger than the retentionSize
     val numMessages = 200
-    for(i <- 0 until numMessages) {
+    for (_ <- 0 until numMessages) {
       val set = TestUtils.singleMessageSet("test".getBytes())
       val info = log.append(set)
       offset = info.firstOffset
@@ -152,7 +152,7 @@ class LogManagerTest {
       log.read(0, 1024)
       fail("Should get exception from fetching earlier.")
     } catch {
-      case e: OffsetOutOfRangeException => // This is good.
+      case _: OffsetOutOfRangeException => // This is good.
     }
     // log should still be appendable
     log.append(TestUtils.singleMessageSet("test".getBytes()))
@@ -168,8 +168,8 @@ class LogManagerTest {
     logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact + "," + LogConfig.Delete)
     val log = logManager.createLog(TopicAndPartition(name, 0), LogConfig.fromProps(logConfig.originals, logProps))
     var offset = 0L
-    for(i <- 0 until 200) {
-      var set = TestUtils.singleMessageSet("test".getBytes(), key="test".getBytes())
+    for (_ <- 0 until 200) {
+      val set = TestUtils.singleMessageSet("test".getBytes(), key="test".getBytes())
       val info = log.append(set)
       offset = info.lastOffset
     }
@@ -197,8 +197,8 @@ class LogManagerTest {
     logManager.startup
     val log = logManager.createLog(TopicAndPartition(name, 0), config)
     val lastFlush = log.lastFlushTime
-    for(i <- 0 until 200) {
-      var set = TestUtils.singleMessageSet("test".getBytes())
+    for (_ <- 0 until 200) {
+      val set = TestUtils.singleMessageSet("test".getBytes())
       log.append(set)
     }
     time.sleep(logManager.InitialTaskDelayMs)
@@ -215,7 +215,7 @@ class LogManagerTest {
                      TestUtils.tempDir(),
                      TestUtils.tempDir())
     logManager.shutdown()
-    logManager = createLogManager()
+    logManager = createLogManager(dirs)
 
     // verify that logs are always assigned to the least loaded partition
     for(partition <- 0 until 20) {
@@ -235,7 +235,7 @@ class LogManagerTest {
       createLogManager()
       fail("Should not be able to create a second log manager instance with the same data directory")
     } catch {
-      case e: KafkaException => // this is good
+      case _: KafkaException => // this is good
     }
   }
 
@@ -279,7 +279,7 @@ class LogManagerTest {
                                        logManager: LogManager) {
     val logs = topicAndPartitions.map(this.logManager.createLog(_, logConfig))
     logs.foreach(log => {
-      for(i <- 0 until 50)
+      for (_ <- 0 until 50)
         log.append(TestUtils.singleMessageSet("test".getBytes()))
 
       log.flush()

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 04d46de..7f78148 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -138,7 +138,7 @@ class LogSegmentTest {
   def testTruncate() {
     val seg = createSegment(40)
     var offset = 40
-    for(i <- 0 until 30) {
+    for (_ <- 0 until 30) {
       val ms1 = messages(offset, "hello")
       seg.append(offset, Message.NoTimestamp, -1L, ms1)
       val ms2 = messages(offset + 1, "hello")
@@ -160,7 +160,7 @@ class LogSegmentTest {
     val numMessages = 30
     val seg = createSegment(40, 2 * messages(0, "hello").sizeInBytes - 1)
     var offset = 40
-    for (i <- 0 until numMessages) {
+    for (_ <- 0 until numMessages) {
       seg.append(offset, offset, offset, messages(offset, "hello"))
       offset += 1
     }
@@ -279,7 +279,7 @@ class LogSegmentTest {
   @Test
   def testRecoveryWithCorruptMessage() {
     val messagesAppended = 20
-    for(iteration <- 0 until 10) {
+    for (_ <- 0 until 10) {
       val seg = createSegment(0)
       for(i <- 0 until messagesAppended)
         seg.append(i, Message.NoTimestamp, -1L, messages(i, i.toString))

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 9ecb651..d18719a 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -163,7 +163,7 @@ class LogTest extends JUnitSuite {
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     // segments expire in size
-    for (i<- 1 to (msgPerSeg + 1)) {
+    for (_ <- 1 to (msgPerSeg + 1)) {
       log.append(set)
     }
     assertEquals("There should be exactly 2 segments.", 2, log.numberOfSegments)
@@ -326,14 +326,14 @@ class LogTest extends JUnitSuite {
       log.read(0, 1000)
       fail("Reading below the log start offset should throw OffsetOutOfRangeException")
     } catch {
-      case e: OffsetOutOfRangeException => // This is good.
+      case _: OffsetOutOfRangeException => // This is good.
     }
 
     try {
       log.read(1026, 1000)
       fail("Reading at beyond the log end offset should throw OffsetOutOfRangeException")
     } catch {
-      case e: OffsetOutOfRangeException => // This is good.
+      case _: OffsetOutOfRangeException => // This is good.
     }
 
     assertEquals("Reading from below the specified maxOffset should produce 0 byte read.", 0, log.read(1025, 1000, Some(1024)).messageSet.sizeInBytes)
@@ -448,7 +448,7 @@ class LogTest extends JUnitSuite {
       log.append(messageSet)
       fail("message set should throw RecordBatchTooLargeException.")
     } catch {
-      case e: RecordBatchTooLargeException => // this is good
+      case _: RecordBatchTooLargeException => // this is good
     }
   }
 
@@ -475,19 +475,19 @@ class LogTest extends JUnitSuite {
       log.append(messageSetWithUnkeyedMessage)
       fail("Compacted topics cannot accept a message without a key.")
     } catch {
-      case e: CorruptRecordException => // this is good
+      case _: CorruptRecordException => // this is good
     }
     try {
       log.append(messageSetWithOneUnkeyedMessage)
       fail("Compacted topics cannot accept a message without a key.")
     } catch {
-      case e: CorruptRecordException => // this is good
+      case _: CorruptRecordException => // this is good
     }
     try {
       log.append(messageSetWithCompressedUnkeyedMessage)
       fail("Compacted topics cannot accept a message without a key.")
     } catch {
-      case e: CorruptRecordException => // this is good
+      case _: CorruptRecordException => // this is good
     }
 
     // the following should succeed without any InvalidMessageException
@@ -518,7 +518,7 @@ class LogTest extends JUnitSuite {
       log.append(second)
       fail("Second message set should throw MessageSizeTooLargeException.")
     } catch {
-      case e: RecordTooLargeException => // this is good
+      case _: RecordTooLargeException => // this is good
     }
   }
   /**
@@ -725,7 +725,7 @@ class LogTest extends JUnitSuite {
     val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
-    for (i<- 1 to msgPerSeg)
+    for (_ <- 1 to msgPerSeg)
       log.append(set)
 
     assertEquals("There should be exactly 1 segments.", 1, log.numberOfSegments)
@@ -746,7 +746,7 @@ class LogTest extends JUnitSuite {
     assertEquals("Should change offset", 0, log.logEndOffset)
     assertEquals("Should change log size", 0, log.size)
 
-    for (i<- 1 to msgPerSeg)
+    for (_ <- 1 to msgPerSeg)
       log.append(set)
 
     assertEquals("Should be back to original offset", log.logEndOffset, lastOffset)
@@ -755,7 +755,7 @@ class LogTest extends JUnitSuite {
     assertEquals("Should change offset", log.logEndOffset, lastOffset - (msgPerSeg - 1))
     assertEquals("Should change log size", log.size, 0)
 
-    for (i<- 1 to msgPerSeg)
+    for (_ <- 1 to msgPerSeg)
       log.append(set)
 
     assertTrue("Should be ahead of to original offset", log.logEndOffset > msgPerSeg)
@@ -831,7 +831,7 @@ class LogTest extends JUnitSuite {
     assertFalse("The second time index file should have been deleted.", bogusTimeIndex2.exists)
 
     // check that we can append to the log
-    for(i <- 0 until 10)
+    for (_ <- 0 until 10)
       log.append(set)
 
     log.delete()
@@ -857,7 +857,7 @@ class LogTest extends JUnitSuite {
                       time)
 
     // add enough messages to roll over several segments then close and re-open and attempt to truncate
-    for(i <- 0 until 100)
+    for (_ <- 0 until 100)
       log.append(set)
     log.close()
     log = new Log(logDir,
@@ -892,7 +892,7 @@ class LogTest extends JUnitSuite {
                       time)
 
     // append some messages to create some segments
-    for(i <- 0 until 100)
+    for (_ <- 0 until 100)
       log.append(set)
 
     // files should be renamed
@@ -934,7 +934,7 @@ class LogTest extends JUnitSuite {
                       time)
 
     // append some messages to create some segments
-    for(i <- 0 until 100)
+    for (_ <- 0 until 100)
       log.append(set)
 
     // expire all segments
@@ -986,7 +986,7 @@ class LogTest extends JUnitSuite {
     val config = LogConfig(logProps)
     val set = TestUtils.singleMessageSet("test".getBytes)
     val recoveryPoint = 50L
-    for(iteration <- 0 until 50) {
+    for (_ <- 0 until 50) {
       // create a log and write some messages to it
       logDir.mkdirs()
       var log = new Log(logDir,
@@ -995,7 +995,7 @@ class LogTest extends JUnitSuite {
                         time.scheduler,
                         time)
       val numMessages = 50 + TestUtils.random.nextInt(50)
-      for(i <- 0 until numMessages)
+      for (_ <- 0 until numMessages)
         log.append(set)
       val messages = log.logSegments.flatMap(_.log.iterator.toList)
       log.close()
@@ -1033,7 +1033,7 @@ class LogTest extends JUnitSuite {
       recoveryPoint = 0L,
       time.scheduler,
       time)
-    for(i <- 0 until 100)
+    for (_ <- 0 until 100)
       log.append(set)
     log.close()
 
@@ -1062,7 +1062,7 @@ class LogTest extends JUnitSuite {
       Log.parseTopicPartitionName(dir)
       fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
     } catch {
-      case e: Exception => // its GOOD!
+      case _: Exception => // its GOOD!
     }
   }
 
@@ -1073,7 +1073,7 @@ class LogTest extends JUnitSuite {
       Log.parseTopicPartitionName(dir)
       fail("KafkaException should have been thrown for dir: " + dir)
     } catch {
-      case e: Exception => // its GOOD!
+      case _: Exception => // its GOOD!
     }
   }
 
@@ -1086,7 +1086,7 @@ class LogTest extends JUnitSuite {
       Log.parseTopicPartitionName(dir)
       fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
     } catch {
-      case e: Exception => // its GOOD!
+      case _: Exception => // its GOOD!
     }
   }
 
@@ -1099,7 +1099,7 @@ class LogTest extends JUnitSuite {
       Log.parseTopicPartitionName(dir)
       fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
     } catch {
-      case e: Exception => // its GOOD!
+      case _: Exception => // its GOOD!
     }
   }
 
@@ -1112,7 +1112,7 @@ class LogTest extends JUnitSuite {
       Log.parseTopicPartitionName(dir)
       fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
     } catch {
-      case e: Exception => // its GOOD!
+      case _: Exception => // its GOOD!
     }
   }
 
@@ -1134,7 +1134,7 @@ class LogTest extends JUnitSuite {
       time)
 
     // append some messages to create some segments
-    for (i <- 0 until 100)
+    for (_ <- 0 until 100)
       log.append(set)
 
     // expire all segments
@@ -1143,7 +1143,7 @@ class LogTest extends JUnitSuite {
     assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
 
     // append some messages to create some segments
-    for (i <- 0 until 100)
+    for (_ <- 0 until 100)
       log.append(set)
 
     log.delete()
@@ -1158,7 +1158,7 @@ class LogTest extends JUnitSuite {
     val log = createLog(set.sizeInBytes, retentionBytes = set.sizeInBytes * 10)
 
     // append some messages to create some segments
-    for (i <- 0 until 15)
+    for (_ <- 0 until 15)
       log.append(set)
 
     log.deleteOldSegments
@@ -1171,7 +1171,7 @@ class LogTest extends JUnitSuite {
     val log = createLog(set.sizeInBytes, retentionBytes = set.sizeInBytes * 15)
 
     // append some messages to create some segments
-    for (i <- 0 until 15)
+    for (_ <- 0 until 15)
       log.append(set)
 
     log.deleteOldSegments
@@ -1184,7 +1184,7 @@ class LogTest extends JUnitSuite {
     val log = createLog(set.sizeInBytes, retentionMs = 10000)
 
     // append some messages to create some segments
-    for (i <- 0 until 15)
+    for (_ <- 0 until 15)
       log.append(set)
 
     log.deleteOldSegments()
@@ -1197,7 +1197,7 @@ class LogTest extends JUnitSuite {
     val log = createLog(set.sizeInBytes, retentionMs = 10000000)
 
     // append some messages to create some segments
-    for (i <- 0 until 15)
+    for (_ <- 0 until 15)
       log.append(set)
 
     log.deleteOldSegments()
@@ -1212,7 +1212,7 @@ class LogTest extends JUnitSuite {
       cleanupPolicy = "compact")
 
     // append some messages to create some segments
-    for (i <- 0 until 15)
+    for (_ <- 0 until 15)
       log.append(set)
 
     // mark oldest segment as older the retention.ms
@@ -1231,7 +1231,7 @@ class LogTest extends JUnitSuite {
       cleanupPolicy = "compact,delete")
 
     // append some messages to create some segments
-    for (i <- 0 until 15)
+    for (_ <- 0 until 15)
       log.append(set)
 
     log.deleteOldSegments()

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
index 869e618..7618cf7 100644
--- a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
+++ b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
@@ -157,7 +157,7 @@ class OffsetIndexTest extends JUnitSuite {
     val rand = new Random(1L)
     val vals = new mutable.ArrayBuffer[Int](len)
     var last = base
-    for (i <- 0 until len) {
+    for (_ <- 0 until len) {
       last += rand.nextInt(15) + 1
       vals += last
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala b/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
index a5bec17..b079f25 100644
--- a/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
+++ b/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
@@ -62,7 +62,6 @@ class OffsetMapTest extends JUnitSuite {
     val map = new SkimpyOffsetMap((items/loadFactor * 24).toInt)
     for(i <- 0 until items)
       map.put(key(i), i)
-    var misses = 0
     for(i <- 0 until items)
       assertEquals(map.get(key(i)), i.toLong)
     map
@@ -85,4 +84,4 @@ object OffsetMapTest {
     println(map.size + " entries in map of size " + map.slots + " in " + ellapsedMs + " ms")
     println("Collision rate: %.1f%%".format(100*map.collisionRate))
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
index 18a023c..e2cfb87 100644
--- a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
@@ -293,7 +293,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
                                                 messageTimestampDiffMaxMs = 1000L)
       fail("Should throw InvalidMessageException.")
     } catch {
-      case e: InvalidTimestampException =>
+      case _: InvalidTimestampException =>
     }
 
     try {
@@ -306,7 +306,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
                                                           messageTimestampDiffMaxMs = 1000L)
       fail("Should throw InvalidMessageException.")
     } catch {
-      case e: InvalidTimestampException =>
+      case _: InvalidTimestampException =>
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
index 1438523..e8abfe1 100644
--- a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
@@ -94,8 +94,7 @@ class MessageCompressionTest extends JUnitSuite {
       new org.xerial.snappy.SnappyOutputStream(new ByteArrayOutputStream())
       true
     } catch {
-      case e: UnsatisfiedLinkError => false
-      case e: org.xerial.snappy.SnappyError => false
+      case _: UnsatisfiedLinkError | _: org.xerial.snappy.SnappyError => false
     }
   }
 
@@ -104,7 +103,7 @@ class MessageCompressionTest extends JUnitSuite {
       new net.jpountz.lz4.LZ4BlockOutputStream(new ByteArrayOutputStream())
       true
     } catch {
-      case e: UnsatisfiedLinkError => false
+      case _: UnsatisfiedLinkError => false
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index e60f350..da6f260 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -159,7 +159,7 @@ class SocketServerTest extends JUnitSuite {
       outgoing.flush()
       receiveResponse(socket)
     } catch {
-      case e: IOException => // thats fine
+      case _: IOException => // thats fine
     }
   }
 
@@ -186,14 +186,14 @@ class SocketServerTest extends JUnitSuite {
       sendRequest(plainSocket, largeChunkOfBytes, Some(0))
       fail("expected exception when writing to closed plain socket")
     } catch {
-      case e: IOException => // expected
+      case _: IOException => // expected
     }
 
     try {
       sendRequest(traceSocket, largeChunkOfBytes, Some(0))
       fail("expected exception when writing to closed trace socket")
     } catch {
-      case e: IOException => // expected
+      case _: IOException => // expected
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 3088199..3093b93 100755
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -76,7 +76,7 @@ class AsyncProducerTest {
       fail("Queue should be full")
     }
     catch {
-      case e: QueueFullException => //expected
+      case _: QueueFullException => //expected
     }finally {
       producer.close()
     }
@@ -96,7 +96,7 @@ class AsyncProducerTest {
       fail("should complain that producer is already closed")
     }
     catch {
-      case e: ProducerClosedException => //expected
+      case _: ProducerClosedException => //expected
     }
   }
 
@@ -266,7 +266,7 @@ class AsyncProducerTest {
     }
     catch {
       // should not throw any exception
-      case e: Throwable => fail("Should not throw any exception")
+      case _: Throwable => fail("Should not throw any exception")
 
     }
   }
@@ -298,7 +298,7 @@ class AsyncProducerTest {
       fail("Should fail with FailedToSendMessageException")
     }
     catch {
-      case e: FailedToSendMessageException => // we retry on any exception now
+      case _: FailedToSendMessageException => // we retry on any exception now
     }
   }
 
@@ -317,8 +317,8 @@ class AsyncProducerTest {
       producer.send(getProduceData(1): _*)
       fail("Should fail with ClassCastException due to incompatible Encoder")
     } catch {
-      case e: ClassCastException =>
-    }finally {
+      case _: ClassCastException =>
+    } finally {
       producer.close()
     }
   }
@@ -352,9 +352,9 @@ class AsyncProducerTest {
     val partitionedDataOpt = handler.partitionAndCollate(producerDataList)
     partitionedDataOpt match {
       case Some(partitionedData) =>
-        for ((brokerId, dataPerBroker) <- partitionedData) {
-          for ( (TopicAndPartition(topic, partitionId), dataPerTopic) <- dataPerBroker)
-            assertTrue(partitionId == 0)
+        for (dataPerBroker <- partitionedData.values) {
+          for (tp <- dataPerBroker.keys)
+            assertTrue(tp.partition == 0)
         }
       case None =>
         fail("Failed to collate requests by topic, partition")
@@ -461,7 +461,7 @@ class AsyncProducerTest {
       fail("should complain about wrong config")
     }
     catch {
-      case e: IllegalArgumentException => //expected
+      case _: IllegalArgumentException => //expected
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index dc73db3..f4a339e 100755
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -115,15 +115,12 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
         keyEncoder = classOf[StringEncoder].getName,
         producerProps = props)
 
-    try{
+    try {
       producer1.send(new KeyedMessage[String, String](topic, "test", "test1"))
       fail("Test should fail because the broker list provided are not valid")
     } catch {
-      case e: FailedToSendMessageException => // this is expected
-      case oe: Throwable => fail("fails with exception", oe)
-    } finally {
-      producer1.close()
-    }
+      case _: FailedToSendMessageException => // this is expected
+    } finally producer1.close()
 
     val producer2 = TestUtils.createProducer[String, String](
       brokerList = "localhost:80," + TestUtils.getBrokerListStrFromServers(Seq(server1)),
@@ -216,9 +213,7 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
         fail("we don't support request.required.acks greater than 1")
     }
     catch {
-      case iae: IllegalArgumentException =>  // this is expected
-      case e: Throwable => fail("Not expected", e)
-
+      case _: IllegalArgumentException =>  // this is expected
     }
   }
 
@@ -261,7 +256,7 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
       fail("Should fail since no leader exists for the partition.")
     } catch {
       case e : TestFailedException => throw e // catch and re-throw the failure message
-      case e2: Throwable => // otherwise success
+      case _: Throwable => // otherwise success
     }
 
     // restart server 1
@@ -290,6 +285,10 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
 
   @Test
   def testAsyncSendCanCorrectlyFailWithTimeout() {
+    val topic = "new-topic"
+    // create topics in ZK
+    TestUtils.createTopic(zkUtils, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)
+
     val timeoutMs = 500
     val props = new Properties()
     props.put("request.timeout.ms", String.valueOf(timeoutMs))
@@ -303,10 +302,6 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
       partitioner = classOf[StaticPartitioner].getName,
       producerProps = props)
 
-    val topic = "new-topic"
-    // create topics in ZK
-    TestUtils.createTopic(zkUtils, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)
-
     // do a simple test to make sure plumbing is okay
     try {
       // this message should be assigned to partition 0 whose leader is on broker 0
@@ -315,30 +310,25 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
       val response1 = getConsumer1().fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
       val messageSet1 = response1.messageSet("new-topic", 0).iterator
       assertTrue("Message set should have 1 message", messageSet1.hasNext)
-      assertEquals(new Message("test".getBytes), messageSet1.next.message)
-    } catch {
-      case e: Throwable => case e: Exception => producer.close; fail("Not expected", e)
-    }
-
-    // stop IO threads and request handling, but leave networking operational
-    // any requests should be accepted and queue up, but not handled
-    server1.requestHandlerPool.shutdown()
-
-    val t1 = SystemTime.milliseconds
-    try {
-      // this message should be assigned to partition 0 whose leader is on broker 0, but
-      // broker 0 will not response within timeoutMs millis.
-      producer.send(new KeyedMessage[String, String](topic, "test", "test"))
-    } catch {
-      case e: FailedToSendMessageException => /* success */
-      case e: Exception => fail("Not expected", e)
-    } finally {
-      producer.close()
-    }
-    val t2 = SystemTime.milliseconds
-
-    // make sure we don't wait fewer than timeoutMs
-    assertTrue((t2-t1) >= timeoutMs)
+      assertEquals(ByteBuffer.wrap("test".getBytes), messageSet1.next.message.payload)
+
+      // stop IO threads and request handling, but leave networking operational
+      // any requests should be accepted and queue up, but not handled
+      server1.requestHandlerPool.shutdown()
+
+      val t1 = SystemTime.milliseconds
+      try {
+        // this message should be assigned to partition 0 whose leader is on broker 0, but
+        // broker 0 will not response within timeoutMs millis.
+        producer.send(new KeyedMessage[String, String](topic, "test", "test"))
+      } catch {
+        case _: FailedToSendMessageException => /* success */
+      }
+      val t2 = SystemTime.milliseconds
+      // make sure we don't wait fewer than timeoutMs
+      assertTrue((t2-t1) >= timeoutMs)
+
+    } finally producer.close()
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index 968ea4b..7e72eec 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -151,8 +151,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
       producer.send(produceRequest("test", 0,
         new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new Array[Byte](configs.head.messageMaxBytes + 1))), acks = 0))
     } catch {
-      case e : java.io.IOException => // success
-      case e2: Throwable => throw e2
+      case _ : java.io.IOException => // success
     }
   }
 
@@ -222,8 +221,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
       producer.send(request)
       fail("Should have received timeout exception since request handling is stopped.")
     } catch {
-      case e: SocketTimeoutException => /* success */
-      case e: Throwable => fail("Unexpected exception when expecting timeout: " + e)
+      case _: SocketTimeoutException => /* success */
     }
     val t2 = SystemTime.milliseconds
     // make sure we don't wait fewer than timeoutMs for a response

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala b/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala
index 0a28e8f..1df34ea 100644
--- a/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala
@@ -31,7 +31,7 @@ class OperationTest extends JUnitSuite {
       Operation.fromString("badName")
       fail("Expected exception on invalid operation name.")
     } catch {
-      case e: KafkaException => // expected
+      case _: KafkaException => // expected
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala b/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala
index 6cd1ad7..1b1c864 100644
--- a/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala
@@ -31,7 +31,7 @@ class PermissionTypeTest extends JUnitSuite {
       PermissionType.fromString("badName")
       fail("Expected exception on invalid PermissionType name.")
     } catch {
-      case e: KafkaException => // expected
+      case _: KafkaException => // expected
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala b/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala
index 3653a7d..546c92e 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala
@@ -31,7 +31,7 @@ class ResourceTypeTest extends JUnitSuite {
       ResourceType.fromString("badName")
       fail("Expected exception on invalid ResourceType name.")
     } catch {
-      case e: KafkaException => // expected
+      case _: KafkaException => // expected
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index 1f52af4..0765992 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -347,7 +347,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     val acl = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username), Allow, WildCardHost, All)
 
     // Alternate authorizer to keep adding and removing zookeeper path
-    val concurrentFuctions = (0 to 50).map { i =>
+    val concurrentFuctions = (0 to 50).map { _ =>
       () => {
         simpleAclAuthorizer.addAcls(Set(acl), resource)
         simpleAclAuthorizer2.removeAcls(Set(acl), resource)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index 9203130..8cec0c7 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -65,7 +65,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
       JaasUtils.isZkSecurityEnabled()
       fail("Should have thrown an exception")
     } catch {
-      case e: KafkaException => // Expected
+      case _: KafkaException => // Expected
     }
   }
 
@@ -286,7 +286,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
     
     // Fail the test if able to delete
     result match {
-      case Success(v) => // All done
+      case Success(_) => // All done
       case Failure(e) => fail(e.getMessage)
     }
   }
@@ -302,7 +302,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
         case "/" => deleteRecursive(zkUtils, s"/$child")
         case path => deleteRecursive(zkUtils, s"$path/$child")
       }) match {
-        case Success(v) => result
+        case Success(_) => result
         case Failure(e) => Failure(e)
       }
     path match {
@@ -314,7 +314,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
           zkUtils.deletePath(path)
           Failure(new Exception(s"Have been able to delete $path"))
         } catch {
-          case e: Exception => result
+          case _: Exception => result
         }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index 291e822..b581341 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -229,7 +229,7 @@ class ClientQuotaManagerTest {
       /* We have 10 second windows. Make sure that there is no quota violation
        * if we produce under the quota
        */
-      for (i <- 0 until 10) {
+      for (_ <- 0 until 10) {
         clientMetrics.recordAndMaybeThrottle("ANONYMOUS", "unknown", 400, callback)
         time.sleep(1000)
       }
@@ -256,7 +256,7 @@ class ClientQuotaManagerTest {
       assertEquals(11, numCallbacks)
 
       // Could continue to see delays until the bursty sample disappears
-      for (i <- 0 until 10) {
+      for (_ <- 0 until 10) {
         clientMetrics.recordAndMaybeThrottle("ANONYMOUS", "unknown", 400, callback)
         time.sleep(1000)
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index faa23f0..2e50d30 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -168,7 +168,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
       AdminUtils.changeTopicConfig(zkUtils, topic, logProps)
       fail("Should fail with AdminOperationException for topic doesn't exist")
     } catch {
-      case e: AdminOperationException => // expected
+      case _: AdminOperationException => // expected
     }
   }
 
@@ -198,7 +198,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
       fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap)
     }
     catch {
-      case t: Throwable =>
+      case _: Throwable =>
     }
     // Version is provided. EntityType is incorrect
     try {
@@ -207,7 +207,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
       fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap)
     }
     catch {
-      case t: Throwable =>
+      case _: Throwable =>
     }
 
     // EntityName isn't provided
@@ -217,7 +217,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
       fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap)
     }
     catch {
-      case t: Throwable =>
+      case _: Throwable =>
     }
 
     // Everything is provided

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 540a665..0051247 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -31,7 +31,6 @@ import java.util.concurrent.atomic.AtomicBoolean
 import kafka.message.MessageSet
 import org.apache.kafka.common.utils.{MockTime => JMockTime}
 
-
 class IsrExpirationTest {
 
   var topicPartitionIsr: Map[(String, Int), Seq[Int]] = new HashMap[(String, Int), Seq[Int]]()
@@ -52,7 +51,8 @@ class IsrExpirationTest {
 
   @Before
   def setUp() {
-    replicaManager = new ReplicaManager(configs.head, metrics, time, jTime, null, null, null, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, SystemTime).follower)
+    replicaManager = new ReplicaManager(configs.head, metrics, time, jTime, null, null, null, new AtomicBoolean(false),
+      QuotaFactory.instantiate(configs.head, metrics, SystemTime).follower)
   }
 
   @After
@@ -66,7 +66,7 @@ class IsrExpirationTest {
    */
   @Test
   def testIsrExpirationForStuckFollowers() {
-    val log = getLogWithLogEndOffset(15L, 2) // set logEndOffset for leader to 15L
+    val log = logMock
 
     // create one partition and all replicas
     val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time, configs.head, log)
@@ -75,8 +75,7 @@ class IsrExpirationTest {
 
     // let the follower catch up to the Leader logEndOffset (15)
     (partition0.assignedReplicas() - leaderReplica).foreach(
-      r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L),
-                                                                 MessageSet.Empty),
+      r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L), MessageSet.Empty),
                                                    -1L,
                                                    -1,
                                                    true)))
@@ -97,7 +96,7 @@ class IsrExpirationTest {
    */
   @Test
   def testIsrExpirationIfNoFetchRequestMade() {
-    val log = getLogWithLogEndOffset(15L, 1) // set logEndOffset for leader to 15L
+    val log = logMock
 
     // create one partition and all replicas
     val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time, configs.head, log)
@@ -119,7 +118,7 @@ class IsrExpirationTest {
   @Test
   def testIsrExpirationForSlowFollowers() {
     // create leader replica
-    val log = getLogWithLogEndOffset(15L, 4)
+    val log = logMock
     // add one partition
     val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time, configs.head, log)
     assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
@@ -158,7 +157,7 @@ class IsrExpirationTest {
 
   private def getPartitionWithAllReplicasInIsr(topic: String, partitionId: Int, time: Time, config: KafkaConfig,
                                                localLog: Log): Partition = {
-    val leaderId=config.brokerId
+    val leaderId = config.brokerId
     val partition = replicaManager.getOrCreatePartition(topic, partitionId)
     val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog))
 
@@ -171,12 +170,10 @@ class IsrExpirationTest {
     partition
   }
 
-  private def getLogWithLogEndOffset(logEndOffset: Long, expectedCalls: Int): Log = {
-    val log1 = EasyMock.createMock(classOf[kafka.log.Log])
-    EasyMock.expect(log1.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(logEndOffset)).times(expectedCalls)
-    EasyMock.replay(log1)
-
-    log1
+  private def logMock: Log = {
+    val log = EasyMock.createMock(classOf[kafka.log.Log])
+    EasyMock.replay(log)
+    log
   }
 
   private def getFollowerReplicas(partition: Partition, leaderId: Int, time: Time): Seq[Replica] = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 042dabf..93d0413 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -112,7 +112,7 @@ class KafkaConfigTest {
     props5.put("log.retention.ms", "0")
 
     intercept[IllegalArgumentException] {
-      val cfg5 = KafkaConfig.fromProps(props5)
+      KafkaConfig.fromProps(props5)
     }
   }
 
@@ -127,13 +127,13 @@ class KafkaConfigTest {
     props3.put("log.retention.hours", "0")
 
     intercept[IllegalArgumentException] {
-      val cfg1 = KafkaConfig.fromProps(props1)
+      KafkaConfig.fromProps(props1)
     }
     intercept[IllegalArgumentException] {
-      val cfg2 = KafkaConfig.fromProps(props2)
+      KafkaConfig.fromProps(props2)
     }
     intercept[IllegalArgumentException] {
-      val cfg3 = KafkaConfig.fromProps(props3)
+      KafkaConfig.fromProps(props3)
     }
 
   }
@@ -303,7 +303,7 @@ class KafkaConfigTest {
       KafkaConfig.fromProps(props)
       true
     } catch {
-      case e: IllegalArgumentException => false
+      case _: IllegalArgumentException => false
     }
   }
 
@@ -561,8 +561,7 @@ class KafkaConfigTest {
         case KafkaConfig.SaslKerberosTicketRenewJitterProp =>
         case KafkaConfig.SaslKerberosMinTimeBeforeReloginProp =>
         case KafkaConfig.SaslKerberosPrincipalToLocalRulesProp => // ignore string
-
-        case nonNegativeIntProperty => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
+        case _ => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
       }
     })
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 0885709..0f3ee63 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -90,7 +90,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     val log = logManager.getLog(TopicAndPartition(topic, part)).get
 
     val message = new Message(Integer.toString(42).getBytes())
-    for(i <- 0 until 20)
+    for (_ <- 0 until 20)
       log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
     log.flush()
 
@@ -125,7 +125,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server))
 
     var offsetChanged = false
-    for(i <- 1 to 14) {
+    for (_ <- 1 to 14) {
       val topicAndPartition = TopicAndPartition(topic, 0)
       val offsetRequest =
         OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))
@@ -151,7 +151,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     val logManager = server.getLogManager
     val log = logManager.createLog(TopicAndPartition(topic, part), logManager.defaultConfig)
     val message = new Message(Integer.toString(42).getBytes())
-    for(i <- 0 until 20)
+    for (_ <- 0 until 20)
       log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
     log.flush()
 
@@ -180,7 +180,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     val logManager = server.getLogManager
     val log = logManager.createLog(TopicAndPartition(topic, part), logManager.defaultConfig)
     val message = new Message(Integer.toString(42).getBytes())
-    for(i <- 0 until 20)
+    for (_ <- 0 until 20)
       log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
     log.flush()
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index b34c93d..138c36d 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -267,7 +267,7 @@ class MetadataCacheTest {
       fail(s"Exception should be thrown by `getTopicMetadata` with non-supported SecurityProtocol, $result was returned instead")
     }
     catch {
-      case e: BrokerEndPointNotAvailableException => //expected
+      case _: BrokerEndPointNotAvailableException => //expected
     }
 
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index 29eaf2d..64c67d6 100755
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -296,7 +296,6 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
     createTopic(zkUtils, topic2, servers = Seq(server), numPartitions = 1)
 
     // Commit an offset
-    val expectedReplicaAssignment = Map(0  -> List(1))
     val commitRequest = OffsetCommitRequest(group, immutable.Map(
       TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L),
       TopicAndPartition(topic2, 0) -> OffsetAndMetadata(offset=42L)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index 825b2b4..3a09737 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -120,7 +120,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
 
     //Add data equally to each partition
     producer = createNewProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = 1)
-    (0 until msgCount).foreach { x =>
+    (0 until msgCount).foreach { _ =>
       (0 to 7).foreach { partition =>
         producer.send(new ProducerRecord(topic, partition, null, msg))
       }
@@ -215,7 +215,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
 
   def addData(msgCount: Int, msg: Array[Byte]): Boolean = {
     producer = createNewProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = 0)
-    (0 until msgCount).foreach { x => producer.send(new ProducerRecord(topic, msg)).get }
+    (0 until msgCount).map(_ => producer.send(new ProducerRecord(topic, msg))).foreach(_.get)
     waitForOffsetsToMatch(msgCount, 0, 100)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
index 4e4fb95..9e19e39 100644
--- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
@@ -56,7 +56,7 @@ class SaslApiVersionsRequestTest extends BaseRequestTest with SaslTestHarness {
         sendApiVersionsRequest(plaintextSocket, new ApiVersionsRequest, 0)
         fail("Versions Request during Sasl handshake did not fail")
       } catch {
-        case ioe: IOException => // expected exception
+        case _: IOException => // expected exception
       }
     } finally {
       plaintextSocket.close()

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
index e0b6db4..f21f2de 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
@@ -130,7 +130,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
     try {
       server1.startup()
     } catch {
-      case e: kafka.common.InconsistentBrokerIdException => //success
+      case _: kafka.common.InconsistentBrokerIdException => //success
     }
     server1.shutdown()
     CoreUtils.delete(server1.config.logDirs)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index bc71edd..fd0a460 100755
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -129,16 +129,14 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
     val server = new KafkaServer(newConfig, threadNamePrefix = Option(this.getClass.getName))
     try {
       server.startup()
-      fail("Expected KafkaServer setup to fail, throw exception")
+      fail("Expected KafkaServer setup to fail and throw exception")
     }
     catch {
       // Try to clean up carefully without hanging even if the test fails. This means trying to accurately
       // identify the correct exception, making sure the server was shutdown, and cleaning up if anything
       // goes wrong so that awaitShutdown doesn't hang
-      case e: org.I0Itec.zkclient.exception.ZkException =>
+      case _: org.I0Itec.zkclient.exception.ZkException =>
         assertEquals(NotRunning.state, server.brokerState.currentState)
-      case e: Throwable =>
-        fail("Expected ZkException during Kafka server starting up but caught a different exception %s".format(e.toString))
     }
     finally {
       if (server.brokerState.currentState != NotRunning.state)
@@ -170,7 +168,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
       assertTrue(true)
     }
     catch{
-      case ex: Throwable => fail()
+      case _: Throwable => fail()
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index b5560c3..86b167e 100755
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -58,7 +58,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
       TestUtils.createServer(KafkaConfig.fromProps(props2))
       fail("Registering a broker with a conflicting id should fail")
     } catch {
-      case e : RuntimeException =>
+      case _: RuntimeException =>
       // this is expected
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
index 741eec9..da80c0d 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
@@ -62,7 +62,7 @@ class ConsoleProducerTest {
       new ConsoleProducer.ProducerConfig(invalidArgs)
       Assert.fail("Should have thrown an UnrecognizedOptionException")
     } catch {
-      case e: joptsimple.OptionException => // expected exception
+      case _: joptsimple.OptionException => // expected exception
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 3796e48..83fd6b8 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -822,14 +822,14 @@ object TestUtils extends Logging {
   def writeNonsenseToFile(fileName: File, position: Long, size: Int) {
     val file = new RandomAccessFile(fileName, "rw")
     file.seek(position)
-    for(i <- 0 until size)
+    for (_ <- 0 until size)
       file.writeByte(random.nextInt(255))
     file.close()
   }
 
   def appendNonsenseToFile(fileName: File, size: Int) {
     val file = new FileOutputStream(fileName, true)
-    for(i <- 0 until size)
+    for (_ <- 0 until size)
       file.write(random.nextInt(255))
     file.close()
   }
@@ -984,7 +984,7 @@ object TestUtils extends Logging {
 
     var messages: List[String] = Nil
     val shouldGetAllMessages = nMessagesPerThread < 0
-    for ((topic, messageStreams) <- topicMessageStreams) {
+    for (messageStreams <- topicMessageStreams.values) {
       for (messageStream <- messageStreams) {
         val iterator = messageStream.iterator()
         try {
@@ -1117,7 +1117,7 @@ object TestUtils extends Logging {
           }
       }
     } catch {
-      case ie: InterruptedException => failWithTimeout()
+      case _: InterruptedException => failWithTimeout()
       case e: Throwable => exceptions += e
     } finally {
       threadPool.shutdownNow()

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala b/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala
index 29c9067..64129e9 100644
--- a/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala
@@ -35,8 +35,6 @@ class TimerTaskListTest {
   @Test
   def testAll() {
     val sharedCounter = new AtomicInteger(0)
-    val runCounter = new AtomicInteger(0)
-    val execCounter = new AtomicInteger(0)
     val list1 = new TimerTaskList(sharedCounter)
     val list2 = new TimerTaskList(sharedCounter)
     val list3 = new TimerTaskList(sharedCounter)
@@ -46,7 +44,7 @@ class TimerTaskListTest {
       list1.add(new TimerTaskEntry(task, 10L))
       assertEquals(i, sharedCounter.get)
       task
-    }.toSeq
+    }
 
     assertEquals(tasks.size, sharedCounter.get)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
index c2c25ed..4d57ed9 100644
--- a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
@@ -80,7 +80,7 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
     try {
       zkUtils.createEphemeralPathExpectConflict("/tmp/zktest", "node created")
     } catch {                       
-      case e: Exception =>
+      case _: Exception =>
     }
 
     var testData: String = null
@@ -147,7 +147,7 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
         zwe.create()
         false
       } catch {
-        case e: ZkNodeExistsException => true
+        case _: ZkNodeExistsException => true
       }
     Assert.assertTrue(gotException)
     zkClient2.close()
@@ -155,7 +155,6 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
   
   /**
    * Tests if succeeds with znode from the same session
-   * 
    */
   @Test
   def testSameSession = {
@@ -171,7 +170,7 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
         zwe.create()
         false
       } catch {
-        case e: ZkNodeExistsException => true
+        case _: ZkNodeExistsException => true
       }
     Assert.assertFalse(gotException)
   }


Mime
View raw message