http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index 653b40c..b7fc657 100755
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -52,7 +52,7 @@ class ConsumerIteratorTest extends KafkaServerTestHarness {
@Before
override def setUp() {
super.setUp()
- topicInfos = configs.map(c => new PartitionTopicInfo(topic,
+ topicInfos = configs.map(_ => new PartitionTopicInfo(topic,
0,
queue,
new AtomicLong(consumedOffset),
@@ -77,7 +77,7 @@ class ConsumerIteratorTest extends KafkaServerTestHarness {
new StringDecoder(),
new StringDecoder(),
clientId = "")
- val receivedMessages = (0 until 5).map(i => iter.next.message).toList
+ val receivedMessages = (0 until 5).map(_ => iter.next.message)
assertFalse(iter.hasNext)
assertEquals(0, queue.size) // Shutdown command has been consumed.
@@ -101,17 +101,14 @@ class ConsumerIteratorTest extends KafkaServerTestHarness {
new FailDecoder(),
clientId = "")
- val receivedMessages = (0 until 5).map{ i =>
+ (0 until 5).foreach { i =>
assertTrue(iter.hasNext)
val message = iter.next
assertEquals(message.offset, i + consumedOffset)
- try {
- message.message // should fail
- }
+ try message.message // should fail
catch {
- case e: UnsupportedOperationException => // this is ok
- case e2: Throwable => fail("Unexpected exception when iterating the message set. " + e2.getMessage)
+ case _: UnsupportedOperationException => // this is ok
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
index 818229c..9e568f8 100644
--- a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
@@ -35,49 +35,49 @@ class PartitionAssignorTest extends Logging {
val assignor = new RoundRobinAssignor
/** various scenarios with only wildcard consumers */
- (1 to PartitionAssignorTest.TestCaseCount).foreach (testCase => {
+ (1 to PartitionAssignorTest.TestCaseCount).foreach { _ =>
val consumerCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxConsumerCount + 1))
val topicCount = PartitionAssignorTest.MinTopicCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxTopicCount + 1))
val topicPartitionCounts = Map((1 to topicCount).map(topic => {
("topic-" + topic, PartitionAssignorTest.MinPartitionCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxPartitionCount)))
- }).toSeq:_*)
+ }):_*)
- val subscriptions = Map((1 to consumerCount).map(consumer => {
+ val subscriptions = Map((1 to consumerCount).map { consumer =>
val streamCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxStreamCount + 1))
("g1c" + consumer, WildcardSubscriptionInfo(streamCount, ".*", isWhitelist = true))
- }).toSeq:_*)
+ }:_*)
val scenario = Scenario("g1", topicPartitionCounts, subscriptions)
val zkUtils = PartitionAssignorTest.setupZkClientMock(scenario)
EasyMock.replay(zkUtils.zkClient)
PartitionAssignorTest.assignAndVerify(scenario, assignor, zkUtils, verifyAssignmentIsUniform = true)
- })
+ }
}
@Test
def testRangePartitionAssignor() {
val assignor = new RangeAssignor
- (1 to PartitionAssignorTest.TestCaseCount).foreach (testCase => {
+ (1 to PartitionAssignorTest.TestCaseCount).foreach { _ =>
val consumerCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxConsumerCount + 1))
val topicCount = PartitionAssignorTest.MinTopicCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxTopicCount + 1))
val topicPartitionCounts = Map((1 to topicCount).map(topic => {
("topic-" + topic, PartitionAssignorTest.MinPartitionCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxPartitionCount)))
- }).toSeq:_*)
+ }):_*)
- val subscriptions = Map((1 to consumerCount).map(consumer => {
+ val subscriptions = Map((1 to consumerCount).map { consumer =>
val streamCounts = Map((1 to topicCount).map(topic => {
val streamCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxStreamCount + 1))
("topic-" + topic, streamCount)
- }).toSeq:_*)
+ }):_*)
("g1c" + consumer, StaticSubscriptionInfo(streamCounts))
- }).toSeq:_*)
+ }:_*)
val scenario = Scenario("g1", topicPartitionCounts, subscriptions)
val zkUtils = PartitionAssignorTest.setupZkClientMock(scenario)
EasyMock.replay(zkUtils.zkClient)
PartitionAssignorTest.assignAndVerify(scenario, assignor, zkUtils)
- })
+ }
}
}
@@ -163,7 +163,7 @@ private object PartitionAssignorTest extends Logging {
private def assignAndVerify(scenario: Scenario, assignor: PartitionAssignor, zkUtils: ZkUtils,
verifyAssignmentIsUniform: Boolean = false) {
- val assignments = scenario.subscriptions.map{ case(consumer, subscription) =>
+ val assignments = scenario.subscriptions.map { case (consumer, _) =>
val ctx = new AssignmentContext("g1", consumer, excludeInternalTopics = true, zkUtils)
assignor.assign(ctx).get(consumer)
}
@@ -200,7 +200,7 @@ private object PartitionAssignorTest extends Logging {
/** For each consumer stream, count the number of partitions that it owns. */
private def partitionCountPerStream(assignment: collection.Map[TopicAndPartition, ConsumerThreadId]) = {
val ownedCounts = collection.mutable.Map[ConsumerThreadId, Int]()
- assignment.foreach { case (topicPartition, owner) =>
+ assignment.foreach { case (_, owner) =>
val updatedCount = ownedCounts.getOrElse(owner, 0) + 1
ownedCounts.put(owner, updatedCount)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index b054794..df80d1d 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -81,13 +81,12 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
// no messages to consume, we should hit timeout;
// also the iterator should support re-entrant, so loop it twice
- for (i <- 0 until 2) {
+ for (_ <- 0 until 2) {
try {
getMessages(topicMessageStreams0, nMessages * 2)
fail("should get an exception")
} catch {
- case e: ConsumerTimeoutException => // this is ok
- case e: Throwable => throw e
+ case _: ConsumerTimeoutException => // this is ok
}
}
@@ -147,7 +146,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
val consumerConfig3 = new ConsumerConfig(
TestUtils.createConsumerProperties(zkConnect, group, consumer3))
val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
- val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
+ zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
// send some messages to each broker
val sentMessages3 = sendMessages(servers, topic, nMessages, 0) ++
sendMessages(servers, topic, nMessages, 1)
@@ -164,10 +163,10 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
// call createMesssageStreams twice should throw MessageStreamsExistException
try {
- val topicMessageStreams4 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
+ zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
fail("Should fail with MessageStreamsExistException")
} catch {
- case e: MessageStreamsExistException => // expected
+ case _: MessageStreamsExistException => // expected
}
zkConsumerConnector1.shutdown
@@ -235,7 +234,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
val consumerConfig3 = new ConsumerConfig(
TestUtils.createConsumerProperties(zkConnect, group, consumer3))
val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
- val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int](), new StringDecoder(), new StringDecoder())
+ zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int](), new StringDecoder(), new StringDecoder())
// send some messages to each broker
val sentMessages3 = sendMessages(servers, topic, nMessages, 0, GZIPCompressionCodec) ++
sendMessages(servers, topic, nMessages, 1, GZIPCompressionCodec)
@@ -304,10 +303,10 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
zkConsumerConnector.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
var receivedMessages: List[String] = Nil
- for ((topic, messageStreams) <- topicMessageStreams) {
+ for (messageStreams <- topicMessageStreams.values) {
for (messageStream <- messageStreams) {
val iterator = messageStream.iterator
- for (i <- 0 until nMessages * 2) {
+ for (_ <- 0 until nMessages * 2) {
assertTrue(iterator.hasNext())
val message = iterator.next().message
receivedMessages ::= message
@@ -390,7 +389,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
// Register consumer rebalance listener
val rebalanceListener2 = new TestConsumerRebalanceListener()
zkConsumerConnector2.setConsumerRebalanceListener(rebalanceListener2)
- val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
+ zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
// Consume messages from consumer 1 to make sure it has finished rebalance
getMessages(topicMessageStreams1, nMessages)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
index 699715b..6430b33 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
@@ -100,9 +100,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
controller.kafkaController.sendUpdateMetadataRequest(Seq(0), Set(topicPartition))
log.info("Queue state %d %d".format(channelManager.queueCapacity(0), channelManager.queueSize(0)))
} catch {
- case e : Exception => {
- log.info("Thread interrupted")
- }
+ case _: Exception => log.info("Thread interrupted")
}
}
})
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
index 0a1032f..a52fe48 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
@@ -156,7 +156,7 @@ class GroupMetadataManagerTest {
val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
protocolType, List(("protocol", Array[Byte]())))
- member.awaitingJoinCallback = (joinGroupResult: JoinGroupResult) => {}
+ member.awaitingJoinCallback = _ => ()
group.add(memberId, member)
group.transitionTo(PreparingRebalance)
group.initNextGeneration()
@@ -389,7 +389,7 @@ class GroupMetadataManagerTest {
val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
protocolType, List(("protocol", Array[Byte]())))
- member.awaitingJoinCallback = (joinGroupResult: JoinGroupResult) => {}
+ member.awaitingJoinCallback = _ => ()
group.add(memberId, member)
group.transitionTo(PreparingRebalance)
group.initNextGeneration()
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
index 8539340..bf695bf 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
@@ -252,7 +252,7 @@ class GroupMetadataTest extends JUnitSuite {
protocolType, List(("roundrobin", Array.empty[Byte])))
group.transitionTo(PreparingRebalance)
- member.awaitingJoinCallback = (result) => {}
+ member.awaitingJoinCallback = _ => ()
group.add(memberId, member)
assertEquals(0, group.generationId)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
index 4515b94..2221d90 100644
--- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
@@ -83,7 +83,7 @@ class AutoOffsetResetTest extends KafkaServerTestHarness with Logging {
TestUtils.getBrokerListStrFromServers(servers),
keyEncoder = classOf[StringEncoder].getName)
- for(i <- 0 until numMessages)
+ for(_ <- 0 until numMessages)
producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, "test".getBytes))
// update offset in zookeeper for consumer to jump "forward" in time
@@ -103,12 +103,12 @@ class AutoOffsetResetTest extends KafkaServerTestHarness with Logging {
var received = 0
val iter = messageStream.iterator
try {
- for (i <- 0 until numMessages) {
+ for (_ <- 0 until numMessages) {
iter.next // will throw a timeout exception if the message isn't there
received += 1
}
} catch {
- case e: ConsumerTimeoutException =>
+ case _: ConsumerTimeoutException =>
info("consumer timed out after receiving " + received + " messages.")
} finally {
producer.close()
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 881837f..003c04c 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -48,7 +48,7 @@ class FetcherTest extends KafkaServerTestHarness {
fetcher = new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkUtils)
fetcher.stopConnections()
- val topicInfos = configs.map(c =>
+ val topicInfos = configs.map(_ =>
new PartitionTopicInfo(topic,
0,
queue,
@@ -81,13 +81,10 @@ class FetcherTest extends KafkaServerTestHarness {
def fetch(expected: Int) {
var count = 0
- while(true) {
+ while (count < expected) {
val chunk = queue.poll(2L, TimeUnit.SECONDS)
assertNotNull("Timed out waiting for data chunk " + (count + 1), chunk)
- for(message <- chunk.messages)
- count += 1
- if(count == expected)
- return
+ count += chunk.messages.size
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index 3998a21..201fa87 100755
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -152,14 +152,14 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness {
response.data.foreach(pdata => ErrorMapping.maybeThrowException(pdata._2.error))
fail("Expected exception when fetching message with invalid offset")
} catch {
- case e: OffsetOutOfRangeException => // This is good.
+ case _: OffsetOutOfRangeException => // This is good.
}
}
{
// send some invalid partitions
val builder = new FetchRequestBuilder()
- for((topic, partition) <- topics)
+ for((topic, _) <- topics)
builder.addFetch(topic, -1, 0, 10000)
try {
@@ -168,7 +168,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness {
response.data.foreach(pdata => ErrorMapping.maybeThrowException(pdata._2.error))
fail("Expected exception when fetching message with invalid partition")
} catch {
- case e: UnknownTopicOrPartitionException => // This is good.
+ case _: UnknownTopicOrPartitionException => // This is good.
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
index 83cce77..7d8e0c2 100644
--- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
@@ -72,10 +72,10 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
// call createMesssageStreams twice should throw MessageStreamsExistException
try {
- val topicMessageStreams2 = zkConsumerConnector1.createMessageStreams(toJavaMap(Map(topic -> numNodes*numParts/2)), new StringDecoder(), new StringDecoder())
+ zkConsumerConnector1.createMessageStreams(toJavaMap(Map(topic -> numNodes*numParts/2)), new StringDecoder(), new StringDecoder())
fail("Should fail with MessageStreamsExistException")
} catch {
- case e: MessageStreamsExistException => // expected
+ case _: MessageStreamsExistException => // expected
}
zkConsumerConnector1.shutdown
info("all consumer connectors stopped")
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
index 8702474..a7f0446 100644
--- a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
@@ -46,7 +46,7 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
@Test
def testFileSize() {
assertEquals(messageSet.channel.size, messageSet.sizeInBytes)
- for(i <- 0 until 20) {
+ for (_ <- 0 until 20) {
messageSet.append(singleMessageSet("abcd".getBytes))
assertEquals(messageSet.channel.size, messageSet.sizeInBytes)
}
@@ -66,9 +66,8 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
def testPartialWrite(size: Int, messageSet: FileMessageSet) {
val buffer = ByteBuffer.allocate(size)
- val originalPosition = messageSet.channel.position
- for(i <- 0 until size)
- buffer.put(0.asInstanceOf[Byte])
+ for (_ <- 0 until size)
+ buffer.put(0: Byte)
buffer.rewind()
messageSet.channel.write(buffer)
// appending those bytes should not change the contents
@@ -195,7 +194,7 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
msgSet.truncateTo(43)
fail("Should throw KafkaException")
} catch {
- case e: KafkaException => // expected
+ case _: KafkaException => // expected
}
EasyMock.verify(channelMock)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
index 14d24f7..36c61d6 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
@@ -120,9 +120,9 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Logging
}
private def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionCodec, timestamp: Long): Seq[(Int, Int)] = {
- for(dup <- 0 until numDups; key <- 0 until numKeys) yield {
+ for (_ <- 0 until numDups; key <- 0 until numKeys) yield {
val count = counter
- val info = log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, codec = codec, key = key.toString.getBytes, timestamp = timestamp), assignOffsets = true)
+ log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, codec = codec, key = key.toString.getBytes, timestamp = timestamp), assignOffsets = true)
counter += 1
(key, count)
}
@@ -185,4 +185,4 @@ object LogCleanerLagIntegrationTest {
list.add(Array(codec.name))
list
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 4fa73dc..6e5806f 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -173,15 +173,6 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
cleanerManager
}
- private def appendMessagesAndExpireSegments(set: ByteBufferMessageSet, log: Log): Unit = {
- // append some messages to create some segments
- for (i <- 0 until 100)
- log.append(set)
-
- // expire all segments
- log.logSegments.foreach(_.lastModified = time.milliseconds - 1000)
- }
-
private def createLog(segmentSize: Int, cleanupPolicy: String = "delete"): Log = {
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, segmentSize: Integer)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 24a6366..d80fba1 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -254,7 +254,7 @@ class LogCleanerTest extends JUnitSuite {
// create 6 segments with only one message in each segment
val messageSet = TestUtils.singleMessageSet(payload = Array.fill[Byte](50)(0), key = 1.toString.getBytes)
- for (i <- 0 until 6)
+ for (_ <- 0 until 6)
log.append(messageSet, assignOffsets = true)
val logToClean = LogToClean(TopicAndPartition("test", 0), log, log.activeSegment.baseOffset, log.activeSegment.baseOffset)
@@ -272,7 +272,7 @@ class LogCleanerTest extends JUnitSuite {
// create 6 segments with only one message in each segment
val messageSet = TestUtils.singleMessageSet(payload = Array.fill[Byte](50)(0), key = 1.toString.getBytes)
- for (i <- 0 until 6)
+ for (_ <- 0 until 6)
log.append(messageSet, assignOffsets = true)
// segments [0,1] are clean; segments [2, 3] are cleanable; segments [4,5] are uncleanable
@@ -281,7 +281,6 @@ class LogCleanerTest extends JUnitSuite {
val expectedCleanSize = segs.take(2).map(_.size).sum
val expectedCleanableSize = segs.slice(2, 4).map(_.size).sum
- val expectedUncleanableSize = segs.drop(4).map(_.size).sum
assertEquals("Uncleanable bytes of LogToClean should equal size of all segments prior the one containing first dirty",
logToClean.cleanBytes, expectedCleanSize)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index 6d34c78..569264a 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -60,7 +60,7 @@ class LogConfigTest {
case LogConfig.MinCleanableDirtyRatioProp => assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2")
case LogConfig.MinInSyncReplicasProp => assertPropertyInvalid(name, "not_a_number", "0", "-1")
case LogConfig.MessageFormatVersionProp => assertPropertyInvalid(name, "")
- case positiveIntProperty => assertPropertyInvalid(name, "not_a_number", "-1")
+ case _ => assertPropertyInvalid(name, "not_a_number", "-1")
})
}
@@ -105,8 +105,4 @@ class LogConfigTest {
})
}
- private def randFrom[T](choices: T*): T = {
- import scala.util.Random
- choices(Random.nextInt(choices.size))
- }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 95a1cb5..5421da9 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -88,8 +88,8 @@ class LogManagerTest {
def testCleanupExpiredSegments() {
val log = logManager.createLog(TopicAndPartition(name, 0), logConfig)
var offset = 0L
- for(i <- 0 until 200) {
- var set = TestUtils.singleMessageSet("test".getBytes())
+ for(_ <- 0 until 200) {
+ val set = TestUtils.singleMessageSet("test".getBytes())
val info = log.append(set)
offset = info.lastOffset
}
@@ -107,7 +107,7 @@ class LogManagerTest {
log.read(0, 1024)
fail("Should get exception from fetching earlier.")
} catch {
- case e: OffsetOutOfRangeException => // This is good.
+ case _: OffsetOutOfRangeException => // This is good.
}
// log should still be appendable
log.append(TestUtils.singleMessageSet("test".getBytes()))
@@ -134,7 +134,7 @@ class LogManagerTest {
// add a bunch of messages that should be larger than the retentionSize
val numMessages = 200
- for(i <- 0 until numMessages) {
+ for (_ <- 0 until numMessages) {
val set = TestUtils.singleMessageSet("test".getBytes())
val info = log.append(set)
offset = info.firstOffset
@@ -152,7 +152,7 @@ class LogManagerTest {
log.read(0, 1024)
fail("Should get exception from fetching earlier.")
} catch {
- case e: OffsetOutOfRangeException => // This is good.
+ case _: OffsetOutOfRangeException => // This is good.
}
// log should still be appendable
log.append(TestUtils.singleMessageSet("test".getBytes()))
@@ -168,8 +168,8 @@ class LogManagerTest {
logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact + "," + LogConfig.Delete)
val log = logManager.createLog(TopicAndPartition(name, 0), LogConfig.fromProps(logConfig.originals, logProps))
var offset = 0L
- for(i <- 0 until 200) {
- var set = TestUtils.singleMessageSet("test".getBytes(), key="test".getBytes())
+ for (_ <- 0 until 200) {
+ val set = TestUtils.singleMessageSet("test".getBytes(), key="test".getBytes())
val info = log.append(set)
offset = info.lastOffset
}
@@ -197,8 +197,8 @@ class LogManagerTest {
logManager.startup
val log = logManager.createLog(TopicAndPartition(name, 0), config)
val lastFlush = log.lastFlushTime
- for(i <- 0 until 200) {
- var set = TestUtils.singleMessageSet("test".getBytes())
+ for (_ <- 0 until 200) {
+ val set = TestUtils.singleMessageSet("test".getBytes())
log.append(set)
}
time.sleep(logManager.InitialTaskDelayMs)
@@ -215,7 +215,7 @@ class LogManagerTest {
TestUtils.tempDir(),
TestUtils.tempDir())
logManager.shutdown()
- logManager = createLogManager()
+ logManager = createLogManager(dirs)
// verify that logs are always assigned to the least loaded partition
for(partition <- 0 until 20) {
@@ -235,7 +235,7 @@ class LogManagerTest {
createLogManager()
fail("Should not be able to create a second log manager instance with the same data directory")
} catch {
- case e: KafkaException => // this is good
+ case _: KafkaException => // this is good
}
}
@@ -279,7 +279,7 @@ class LogManagerTest {
logManager: LogManager) {
val logs = topicAndPartitions.map(this.logManager.createLog(_, logConfig))
logs.foreach(log => {
- for(i <- 0 until 50)
+ for (_ <- 0 until 50)
log.append(TestUtils.singleMessageSet("test".getBytes()))
log.flush()
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 04d46de..7f78148 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -138,7 +138,7 @@ class LogSegmentTest {
def testTruncate() {
val seg = createSegment(40)
var offset = 40
- for(i <- 0 until 30) {
+ for (_ <- 0 until 30) {
val ms1 = messages(offset, "hello")
seg.append(offset, Message.NoTimestamp, -1L, ms1)
val ms2 = messages(offset + 1, "hello")
@@ -160,7 +160,7 @@ class LogSegmentTest {
val numMessages = 30
val seg = createSegment(40, 2 * messages(0, "hello").sizeInBytes - 1)
var offset = 40
- for (i <- 0 until numMessages) {
+ for (_ <- 0 until numMessages) {
seg.append(offset, offset, offset, messages(offset, "hello"))
offset += 1
}
@@ -279,7 +279,7 @@ class LogSegmentTest {
@Test
def testRecoveryWithCorruptMessage() {
val messagesAppended = 20
- for(iteration <- 0 until 10) {
+ for (_ <- 0 until 10) {
val seg = createSegment(0)
for(i <- 0 until messagesAppended)
seg.append(i, Message.NoTimestamp, -1L, messages(i, i.toString))
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 9ecb651..d18719a 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -163,7 +163,7 @@ class LogTest extends JUnitSuite {
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
// segments expire in size
- for (i<- 1 to (msgPerSeg + 1)) {
+ for (_ <- 1 to (msgPerSeg + 1)) {
log.append(set)
}
assertEquals("There should be exactly 2 segments.", 2, log.numberOfSegments)
@@ -326,14 +326,14 @@ class LogTest extends JUnitSuite {
log.read(0, 1000)
fail("Reading below the log start offset should throw OffsetOutOfRangeException")
} catch {
- case e: OffsetOutOfRangeException => // This is good.
+ case _: OffsetOutOfRangeException => // This is good.
}
try {
log.read(1026, 1000)
fail("Reading at beyond the log end offset should throw OffsetOutOfRangeException")
} catch {
- case e: OffsetOutOfRangeException => // This is good.
+ case _: OffsetOutOfRangeException => // This is good.
}
assertEquals("Reading from below the specified maxOffset should produce 0 byte read.", 0, log.read(1025, 1000, Some(1024)).messageSet.sizeInBytes)
@@ -448,7 +448,7 @@ class LogTest extends JUnitSuite {
log.append(messageSet)
fail("message set should throw RecordBatchTooLargeException.")
} catch {
- case e: RecordBatchTooLargeException => // this is good
+ case _: RecordBatchTooLargeException => // this is good
}
}
@@ -475,19 +475,19 @@ class LogTest extends JUnitSuite {
log.append(messageSetWithUnkeyedMessage)
fail("Compacted topics cannot accept a message without a key.")
} catch {
- case e: CorruptRecordException => // this is good
+ case _: CorruptRecordException => // this is good
}
try {
log.append(messageSetWithOneUnkeyedMessage)
fail("Compacted topics cannot accept a message without a key.")
} catch {
- case e: CorruptRecordException => // this is good
+ case _: CorruptRecordException => // this is good
}
try {
log.append(messageSetWithCompressedUnkeyedMessage)
fail("Compacted topics cannot accept a message without a key.")
} catch {
- case e: CorruptRecordException => // this is good
+ case _: CorruptRecordException => // this is good
}
// the following should succeed without any InvalidMessageException
@@ -518,7 +518,7 @@ class LogTest extends JUnitSuite {
log.append(second)
fail("Second message set should throw MessageSizeTooLargeException.")
} catch {
- case e: RecordTooLargeException => // this is good
+ case _: RecordTooLargeException => // this is good
}
}
/**
@@ -725,7 +725,7 @@ class LogTest extends JUnitSuite {
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
- for (i<- 1 to msgPerSeg)
+ for (_ <- 1 to msgPerSeg)
log.append(set)
assertEquals("There should be exactly 1 segments.", 1, log.numberOfSegments)
@@ -746,7 +746,7 @@ class LogTest extends JUnitSuite {
assertEquals("Should change offset", 0, log.logEndOffset)
assertEquals("Should change log size", 0, log.size)
- for (i<- 1 to msgPerSeg)
+ for (_ <- 1 to msgPerSeg)
log.append(set)
assertEquals("Should be back to original offset", log.logEndOffset, lastOffset)
@@ -755,7 +755,7 @@ class LogTest extends JUnitSuite {
assertEquals("Should change offset", log.logEndOffset, lastOffset - (msgPerSeg - 1))
assertEquals("Should change log size", log.size, 0)
- for (i<- 1 to msgPerSeg)
+ for (_ <- 1 to msgPerSeg)
log.append(set)
assertTrue("Should be ahead of to original offset", log.logEndOffset > msgPerSeg)
@@ -831,7 +831,7 @@ class LogTest extends JUnitSuite {
assertFalse("The second time index file should have been deleted.", bogusTimeIndex2.exists)
// check that we can append to the log
- for(i <- 0 until 10)
+ for (_ <- 0 until 10)
log.append(set)
log.delete()
@@ -857,7 +857,7 @@ class LogTest extends JUnitSuite {
time)
// add enough messages to roll over several segments then close and re-open and attempt to truncate
- for(i <- 0 until 100)
+ for (_ <- 0 until 100)
log.append(set)
log.close()
log = new Log(logDir,
@@ -892,7 +892,7 @@ class LogTest extends JUnitSuite {
time)
// append some messages to create some segments
- for(i <- 0 until 100)
+ for (_ <- 0 until 100)
log.append(set)
// files should be renamed
@@ -934,7 +934,7 @@ class LogTest extends JUnitSuite {
time)
// append some messages to create some segments
- for(i <- 0 until 100)
+ for (_ <- 0 until 100)
log.append(set)
// expire all segments
@@ -986,7 +986,7 @@ class LogTest extends JUnitSuite {
val config = LogConfig(logProps)
val set = TestUtils.singleMessageSet("test".getBytes)
val recoveryPoint = 50L
- for(iteration <- 0 until 50) {
+ for (_ <- 0 until 50) {
// create a log and write some messages to it
logDir.mkdirs()
var log = new Log(logDir,
@@ -995,7 +995,7 @@ class LogTest extends JUnitSuite {
time.scheduler,
time)
val numMessages = 50 + TestUtils.random.nextInt(50)
- for(i <- 0 until numMessages)
+ for (_ <- 0 until numMessages)
log.append(set)
val messages = log.logSegments.flatMap(_.log.iterator.toList)
log.close()
@@ -1033,7 +1033,7 @@ class LogTest extends JUnitSuite {
recoveryPoint = 0L,
time.scheduler,
time)
- for(i <- 0 until 100)
+ for (_ <- 0 until 100)
log.append(set)
log.close()
@@ -1062,7 +1062,7 @@ class LogTest extends JUnitSuite {
Log.parseTopicPartitionName(dir)
fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
} catch {
- case e: Exception => // its GOOD!
+ case _: Exception => // its GOOD!
}
}
@@ -1073,7 +1073,7 @@ class LogTest extends JUnitSuite {
Log.parseTopicPartitionName(dir)
fail("KafkaException should have been thrown for dir: " + dir)
} catch {
- case e: Exception => // its GOOD!
+ case _: Exception => // its GOOD!
}
}
@@ -1086,7 +1086,7 @@ class LogTest extends JUnitSuite {
Log.parseTopicPartitionName(dir)
fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
} catch {
- case e: Exception => // its GOOD!
+ case _: Exception => // its GOOD!
}
}
@@ -1099,7 +1099,7 @@ class LogTest extends JUnitSuite {
Log.parseTopicPartitionName(dir)
fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
} catch {
- case e: Exception => // its GOOD!
+ case _: Exception => // its GOOD!
}
}
@@ -1112,7 +1112,7 @@ class LogTest extends JUnitSuite {
Log.parseTopicPartitionName(dir)
fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
} catch {
- case e: Exception => // its GOOD!
+ case _: Exception => // its GOOD!
}
}
@@ -1134,7 +1134,7 @@ class LogTest extends JUnitSuite {
time)
// append some messages to create some segments
- for (i <- 0 until 100)
+ for (_ <- 0 until 100)
log.append(set)
// expire all segments
@@ -1143,7 +1143,7 @@ class LogTest extends JUnitSuite {
assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
// append some messages to create some segments
- for (i <- 0 until 100)
+ for (_ <- 0 until 100)
log.append(set)
log.delete()
@@ -1158,7 +1158,7 @@ class LogTest extends JUnitSuite {
val log = createLog(set.sizeInBytes, retentionBytes = set.sizeInBytes * 10)
// append some messages to create some segments
- for (i <- 0 until 15)
+ for (_ <- 0 until 15)
log.append(set)
log.deleteOldSegments
@@ -1171,7 +1171,7 @@ class LogTest extends JUnitSuite {
val log = createLog(set.sizeInBytes, retentionBytes = set.sizeInBytes * 15)
// append some messages to create some segments
- for (i <- 0 until 15)
+ for (_ <- 0 until 15)
log.append(set)
log.deleteOldSegments
@@ -1184,7 +1184,7 @@ class LogTest extends JUnitSuite {
val log = createLog(set.sizeInBytes, retentionMs = 10000)
// append some messages to create some segments
- for (i <- 0 until 15)
+ for (_ <- 0 until 15)
log.append(set)
log.deleteOldSegments()
@@ -1197,7 +1197,7 @@ class LogTest extends JUnitSuite {
val log = createLog(set.sizeInBytes, retentionMs = 10000000)
// append some messages to create some segments
- for (i <- 0 until 15)
+ for (_ <- 0 until 15)
log.append(set)
log.deleteOldSegments()
@@ -1212,7 +1212,7 @@ class LogTest extends JUnitSuite {
cleanupPolicy = "compact")
// append some messages to create some segments
- for (i <- 0 until 15)
+ for (_ <- 0 until 15)
log.append(set)
// mark oldest segment as older the retention.ms
@@ -1231,7 +1231,7 @@ class LogTest extends JUnitSuite {
cleanupPolicy = "compact,delete")
// append some messages to create some segments
- for (i <- 0 until 15)
+ for (_ <- 0 until 15)
log.append(set)
log.deleteOldSegments()
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
index 869e618..7618cf7 100644
--- a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
+++ b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
@@ -157,7 +157,7 @@ class OffsetIndexTest extends JUnitSuite {
val rand = new Random(1L)
val vals = new mutable.ArrayBuffer[Int](len)
var last = base
- for (i <- 0 until len) {
+ for (_ <- 0 until len) {
last += rand.nextInt(15) + 1
vals += last
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala b/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
index a5bec17..b079f25 100644
--- a/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
+++ b/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
@@ -62,7 +62,6 @@ class OffsetMapTest extends JUnitSuite {
val map = new SkimpyOffsetMap((items/loadFactor * 24).toInt)
for(i <- 0 until items)
map.put(key(i), i)
- var misses = 0
for(i <- 0 until items)
assertEquals(map.get(key(i)), i.toLong)
map
@@ -85,4 +84,4 @@ object OffsetMapTest {
println(map.size + " entries in map of size " + map.slots + " in " + ellapsedMs + " ms")
println("Collision rate: %.1f%%".format(100*map.collisionRate))
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
index 18a023c..e2cfb87 100644
--- a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
@@ -293,7 +293,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
messageTimestampDiffMaxMs = 1000L)
fail("Should throw InvalidMessageException.")
} catch {
- case e: InvalidTimestampException =>
+ case _: InvalidTimestampException =>
}
try {
@@ -306,7 +306,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
messageTimestampDiffMaxMs = 1000L)
fail("Should throw InvalidMessageException.")
} catch {
- case e: InvalidTimestampException =>
+ case _: InvalidTimestampException =>
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
index 1438523..e8abfe1 100644
--- a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
@@ -94,8 +94,7 @@ class MessageCompressionTest extends JUnitSuite {
new org.xerial.snappy.SnappyOutputStream(new ByteArrayOutputStream())
true
} catch {
- case e: UnsatisfiedLinkError => false
- case e: org.xerial.snappy.SnappyError => false
+ case _: UnsatisfiedLinkError | _: org.xerial.snappy.SnappyError => false
}
}
@@ -104,7 +103,7 @@ class MessageCompressionTest extends JUnitSuite {
new net.jpountz.lz4.LZ4BlockOutputStream(new ByteArrayOutputStream())
true
} catch {
- case e: UnsatisfiedLinkError => false
+ case _: UnsatisfiedLinkError => false
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index e60f350..da6f260 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -159,7 +159,7 @@ class SocketServerTest extends JUnitSuite {
outgoing.flush()
receiveResponse(socket)
} catch {
- case e: IOException => // thats fine
+ case _: IOException => // thats fine
}
}
@@ -186,14 +186,14 @@ class SocketServerTest extends JUnitSuite {
sendRequest(plainSocket, largeChunkOfBytes, Some(0))
fail("expected exception when writing to closed plain socket")
} catch {
- case e: IOException => // expected
+ case _: IOException => // expected
}
try {
sendRequest(traceSocket, largeChunkOfBytes, Some(0))
fail("expected exception when writing to closed trace socket")
} catch {
- case e: IOException => // expected
+ case _: IOException => // expected
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 3088199..3093b93 100755
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -76,7 +76,7 @@ class AsyncProducerTest {
fail("Queue should be full")
}
catch {
- case e: QueueFullException => //expected
+ case _: QueueFullException => //expected
}finally {
producer.close()
}
@@ -96,7 +96,7 @@ class AsyncProducerTest {
fail("should complain that producer is already closed")
}
catch {
- case e: ProducerClosedException => //expected
+ case _: ProducerClosedException => //expected
}
}
@@ -266,7 +266,7 @@ class AsyncProducerTest {
}
catch {
// should not throw any exception
- case e: Throwable => fail("Should not throw any exception")
+ case _: Throwable => fail("Should not throw any exception")
}
}
@@ -298,7 +298,7 @@ class AsyncProducerTest {
fail("Should fail with FailedToSendMessageException")
}
catch {
- case e: FailedToSendMessageException => // we retry on any exception now
+ case _: FailedToSendMessageException => // we retry on any exception now
}
}
@@ -317,8 +317,8 @@ class AsyncProducerTest {
producer.send(getProduceData(1): _*)
fail("Should fail with ClassCastException due to incompatible Encoder")
} catch {
- case e: ClassCastException =>
- }finally {
+ case _: ClassCastException =>
+ } finally {
producer.close()
}
}
@@ -352,9 +352,9 @@ class AsyncProducerTest {
val partitionedDataOpt = handler.partitionAndCollate(producerDataList)
partitionedDataOpt match {
case Some(partitionedData) =>
- for ((brokerId, dataPerBroker) <- partitionedData) {
- for ( (TopicAndPartition(topic, partitionId), dataPerTopic) <- dataPerBroker)
- assertTrue(partitionId == 0)
+ for (dataPerBroker <- partitionedData.values) {
+ for (tp <- dataPerBroker.keys)
+ assertTrue(tp.partition == 0)
}
case None =>
fail("Failed to collate requests by topic, partition")
@@ -461,7 +461,7 @@ class AsyncProducerTest {
fail("should complain about wrong config")
}
catch {
- case e: IllegalArgumentException => //expected
+ case _: IllegalArgumentException => //expected
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index dc73db3..f4a339e 100755
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -115,15 +115,12 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
keyEncoder = classOf[StringEncoder].getName,
producerProps = props)
- try{
+ try {
producer1.send(new KeyedMessage[String, String](topic, "test", "test1"))
fail("Test should fail because the broker list provided are not valid")
} catch {
- case e: FailedToSendMessageException => // this is expected
- case oe: Throwable => fail("fails with exception", oe)
- } finally {
- producer1.close()
- }
+ case _: FailedToSendMessageException => // this is expected
+ } finally producer1.close()
val producer2 = TestUtils.createProducer[String, String](
brokerList = "localhost:80," + TestUtils.getBrokerListStrFromServers(Seq(server1)),
@@ -216,9 +213,7 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
fail("we don't support request.required.acks greater than 1")
}
catch {
- case iae: IllegalArgumentException => // this is expected
- case e: Throwable => fail("Not expected", e)
-
+ case _: IllegalArgumentException => // this is expected
}
}
@@ -261,7 +256,7 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
fail("Should fail since no leader exists for the partition.")
} catch {
case e : TestFailedException => throw e // catch and re-throw the failure message
- case e2: Throwable => // otherwise success
+ case _: Throwable => // otherwise success
}
// restart server 1
@@ -290,6 +285,10 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
@Test
def testAsyncSendCanCorrectlyFailWithTimeout() {
+ val topic = "new-topic"
+ // create topics in ZK
+ TestUtils.createTopic(zkUtils, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)
+
val timeoutMs = 500
val props = new Properties()
props.put("request.timeout.ms", String.valueOf(timeoutMs))
@@ -303,10 +302,6 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
partitioner = classOf[StaticPartitioner].getName,
producerProps = props)
- val topic = "new-topic"
- // create topics in ZK
- TestUtils.createTopic(zkUtils, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)
-
// do a simple test to make sure plumbing is okay
try {
// this message should be assigned to partition 0 whose leader is on broker 0
@@ -315,30 +310,25 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
val response1 = getConsumer1().fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
val messageSet1 = response1.messageSet("new-topic", 0).iterator
assertTrue("Message set should have 1 message", messageSet1.hasNext)
- assertEquals(new Message("test".getBytes), messageSet1.next.message)
- } catch {
- case e: Throwable => case e: Exception => producer.close; fail("Not expected", e)
- }
-
- // stop IO threads and request handling, but leave networking operational
- // any requests should be accepted and queue up, but not handled
- server1.requestHandlerPool.shutdown()
-
- val t1 = SystemTime.milliseconds
- try {
- // this message should be assigned to partition 0 whose leader is on broker 0, but
- // broker 0 will not response within timeoutMs millis.
- producer.send(new KeyedMessage[String, String](topic, "test", "test"))
- } catch {
- case e: FailedToSendMessageException => /* success */
- case e: Exception => fail("Not expected", e)
- } finally {
- producer.close()
- }
- val t2 = SystemTime.milliseconds
-
- // make sure we don't wait fewer than timeoutMs
- assertTrue((t2-t1) >= timeoutMs)
+ assertEquals(ByteBuffer.wrap("test".getBytes), messageSet1.next.message.payload)
+
+ // stop IO threads and request handling, but leave networking operational
+ // any requests should be accepted and queue up, but not handled
+ server1.requestHandlerPool.shutdown()
+
+ val t1 = SystemTime.milliseconds
+ try {
+ // this message should be assigned to partition 0 whose leader is on broker 0, but
+ // broker 0 will not response within timeoutMs millis.
+ producer.send(new KeyedMessage[String, String](topic, "test", "test"))
+ } catch {
+ case _: FailedToSendMessageException => /* success */
+ }
+ val t2 = SystemTime.milliseconds
+ // make sure we don't wait fewer than timeoutMs
+ assertTrue((t2-t1) >= timeoutMs)
+
+ } finally producer.close()
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index 968ea4b..7e72eec 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -151,8 +151,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
producer.send(produceRequest("test", 0,
new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new Array[Byte](configs.head.messageMaxBytes + 1))), acks = 0))
} catch {
- case e : java.io.IOException => // success
- case e2: Throwable => throw e2
+ case _ : java.io.IOException => // success
}
}
@@ -222,8 +221,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
producer.send(request)
fail("Should have received timeout exception since request handling is stopped.")
} catch {
- case e: SocketTimeoutException => /* success */
- case e: Throwable => fail("Unexpected exception when expecting timeout: " + e)
+ case _: SocketTimeoutException => /* success */
}
val t2 = SystemTime.milliseconds
// make sure we don't wait fewer than timeoutMs for a response
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala b/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala
index 0a28e8f..1df34ea 100644
--- a/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala
@@ -31,7 +31,7 @@ class OperationTest extends JUnitSuite {
Operation.fromString("badName")
fail("Expected exception on invalid operation name.")
} catch {
- case e: KafkaException => // expected
+ case _: KafkaException => // expected
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala b/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala
index 6cd1ad7..1b1c864 100644
--- a/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala
@@ -31,7 +31,7 @@ class PermissionTypeTest extends JUnitSuite {
PermissionType.fromString("badName")
fail("Expected exception on invalid PermissionType name.")
} catch {
- case e: KafkaException => // expected
+ case _: KafkaException => // expected
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala b/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala
index 3653a7d..546c92e 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala
@@ -31,7 +31,7 @@ class ResourceTypeTest extends JUnitSuite {
ResourceType.fromString("badName")
fail("Expected exception on invalid ResourceType name.")
} catch {
- case e: KafkaException => // expected
+ case _: KafkaException => // expected
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index 1f52af4..0765992 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -347,7 +347,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
val acl = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username), Allow, WildCardHost, All)
// Alternate authorizer to keep adding and removing zookeeper path
- val concurrentFuctions = (0 to 50).map { i =>
+ val concurrentFuctions = (0 to 50).map { _ =>
() => {
simpleAclAuthorizer.addAcls(Set(acl), resource)
simpleAclAuthorizer2.removeAcls(Set(acl), resource)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index 9203130..8cec0c7 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -65,7 +65,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
JaasUtils.isZkSecurityEnabled()
fail("Should have thrown an exception")
} catch {
- case e: KafkaException => // Expected
+ case _: KafkaException => // Expected
}
}
@@ -286,7 +286,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
// Fail the test if able to delete
result match {
- case Success(v) => // All done
+ case Success(_) => // All done
case Failure(e) => fail(e.getMessage)
}
}
@@ -302,7 +302,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
case "/" => deleteRecursive(zkUtils, s"/$child")
case path => deleteRecursive(zkUtils, s"$path/$child")
}) match {
- case Success(v) => result
+ case Success(_) => result
case Failure(e) => Failure(e)
}
path match {
@@ -314,7 +314,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
zkUtils.deletePath(path)
Failure(new Exception(s"Have been able to delete $path"))
} catch {
- case e: Exception => result
+ case _: Exception => result
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index 291e822..b581341 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -229,7 +229,7 @@ class ClientQuotaManagerTest {
/* We have 10 second windows. Make sure that there is no quota violation
* if we produce under the quota
*/
- for (i <- 0 until 10) {
+ for (_ <- 0 until 10) {
clientMetrics.recordAndMaybeThrottle("ANONYMOUS", "unknown", 400, callback)
time.sleep(1000)
}
@@ -256,7 +256,7 @@ class ClientQuotaManagerTest {
assertEquals(11, numCallbacks)
// Could continue to see delays until the bursty sample disappears
- for (i <- 0 until 10) {
+ for (_ <- 0 until 10) {
clientMetrics.recordAndMaybeThrottle("ANONYMOUS", "unknown", 400, callback)
time.sleep(1000)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index faa23f0..2e50d30 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -168,7 +168,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
AdminUtils.changeTopicConfig(zkUtils, topic, logProps)
fail("Should fail with AdminOperationException for topic doesn't exist")
} catch {
- case e: AdminOperationException => // expected
+ case _: AdminOperationException => // expected
}
}
@@ -198,7 +198,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap)
}
catch {
- case t: Throwable =>
+ case _: Throwable =>
}
// Version is provided. EntityType is incorrect
try {
@@ -207,7 +207,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap)
}
catch {
- case t: Throwable =>
+ case _: Throwable =>
}
// EntityName isn't provided
@@ -217,7 +217,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap)
}
catch {
- case t: Throwable =>
+ case _: Throwable =>
}
// Everything is provided
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 540a665..0051247 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -31,7 +31,6 @@ import java.util.concurrent.atomic.AtomicBoolean
import kafka.message.MessageSet
import org.apache.kafka.common.utils.{MockTime => JMockTime}
-
class IsrExpirationTest {
var topicPartitionIsr: Map[(String, Int), Seq[Int]] = new HashMap[(String, Int), Seq[Int]]()
@@ -52,7 +51,8 @@ class IsrExpirationTest {
@Before
def setUp() {
- replicaManager = new ReplicaManager(configs.head, metrics, time, jTime, null, null, null, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, SystemTime).follower)
+ replicaManager = new ReplicaManager(configs.head, metrics, time, jTime, null, null, null, new AtomicBoolean(false),
+ QuotaFactory.instantiate(configs.head, metrics, SystemTime).follower)
}
@After
@@ -66,7 +66,7 @@ class IsrExpirationTest {
*/
@Test
def testIsrExpirationForStuckFollowers() {
- val log = getLogWithLogEndOffset(15L, 2) // set logEndOffset for leader to 15L
+ val log = logMock
// create one partition and all replicas
val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time, configs.head, log)
@@ -75,8 +75,7 @@ class IsrExpirationTest {
// let the follower catch up to the Leader logEndOffset (15)
(partition0.assignedReplicas() - leaderReplica).foreach(
- r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L),
- MessageSet.Empty),
+ r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L), MessageSet.Empty),
-1L,
-1,
true)))
@@ -97,7 +96,7 @@ class IsrExpirationTest {
*/
@Test
def testIsrExpirationIfNoFetchRequestMade() {
- val log = getLogWithLogEndOffset(15L, 1) // set logEndOffset for leader to 15L
+ val log = logMock
// create one partition and all replicas
val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time, configs.head, log)
@@ -119,7 +118,7 @@ class IsrExpirationTest {
@Test
def testIsrExpirationForSlowFollowers() {
// create leader replica
- val log = getLogWithLogEndOffset(15L, 4)
+ val log = logMock
// add one partition
val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time, configs.head, log)
assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
@@ -158,7 +157,7 @@ class IsrExpirationTest {
private def getPartitionWithAllReplicasInIsr(topic: String, partitionId: Int, time: Time, config: KafkaConfig,
localLog: Log): Partition = {
- val leaderId=config.brokerId
+ val leaderId = config.brokerId
val partition = replicaManager.getOrCreatePartition(topic, partitionId)
val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog))
@@ -171,12 +170,10 @@ class IsrExpirationTest {
partition
}
- private def getLogWithLogEndOffset(logEndOffset: Long, expectedCalls: Int): Log = {
- val log1 = EasyMock.createMock(classOf[kafka.log.Log])
- EasyMock.expect(log1.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(logEndOffset)).times(expectedCalls)
- EasyMock.replay(log1)
-
- log1
+ private def logMock: Log = {
+ val log = EasyMock.createMock(classOf[kafka.log.Log])
+ EasyMock.replay(log)
+ log
}
private def getFollowerReplicas(partition: Partition, leaderId: Int, time: Time): Seq[Replica] = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 042dabf..93d0413 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -112,7 +112,7 @@ class KafkaConfigTest {
props5.put("log.retention.ms", "0")
intercept[IllegalArgumentException] {
- val cfg5 = KafkaConfig.fromProps(props5)
+ KafkaConfig.fromProps(props5)
}
}
@@ -127,13 +127,13 @@ class KafkaConfigTest {
props3.put("log.retention.hours", "0")
intercept[IllegalArgumentException] {
- val cfg1 = KafkaConfig.fromProps(props1)
+ KafkaConfig.fromProps(props1)
}
intercept[IllegalArgumentException] {
- val cfg2 = KafkaConfig.fromProps(props2)
+ KafkaConfig.fromProps(props2)
}
intercept[IllegalArgumentException] {
- val cfg3 = KafkaConfig.fromProps(props3)
+ KafkaConfig.fromProps(props3)
}
}
@@ -303,7 +303,7 @@ class KafkaConfigTest {
KafkaConfig.fromProps(props)
true
} catch {
- case e: IllegalArgumentException => false
+ case _: IllegalArgumentException => false
}
}
@@ -561,8 +561,7 @@ class KafkaConfigTest {
case KafkaConfig.SaslKerberosTicketRenewJitterProp =>
case KafkaConfig.SaslKerberosMinTimeBeforeReloginProp =>
case KafkaConfig.SaslKerberosPrincipalToLocalRulesProp => // ignore string
-
- case nonNegativeIntProperty => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
+ case _ => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
}
})
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 0885709..0f3ee63 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -90,7 +90,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
val log = logManager.getLog(TopicAndPartition(topic, part)).get
val message = new Message(Integer.toString(42).getBytes())
- for(i <- 0 until 20)
+ for (_ <- 0 until 20)
log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
log.flush()
@@ -125,7 +125,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server))
var offsetChanged = false
- for(i <- 1 to 14) {
+ for (_ <- 1 to 14) {
val topicAndPartition = TopicAndPartition(topic, 0)
val offsetRequest =
OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))
@@ -151,7 +151,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
val logManager = server.getLogManager
val log = logManager.createLog(TopicAndPartition(topic, part), logManager.defaultConfig)
val message = new Message(Integer.toString(42).getBytes())
- for(i <- 0 until 20)
+ for (_ <- 0 until 20)
log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
log.flush()
@@ -180,7 +180,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
val logManager = server.getLogManager
val log = logManager.createLog(TopicAndPartition(topic, part), logManager.defaultConfig)
val message = new Message(Integer.toString(42).getBytes())
- for(i <- 0 until 20)
+ for (_ <- 0 until 20)
log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
log.flush()
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index b34c93d..138c36d 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -267,7 +267,7 @@ class MetadataCacheTest {
fail(s"Exception should be thrown by `getTopicMetadata` with non-supported SecurityProtocol, $result was returned instead")
}
catch {
- case e: BrokerEndPointNotAvailableException => //expected
+ case _: BrokerEndPointNotAvailableException => //expected
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index 29eaf2d..64c67d6 100755
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -296,7 +296,6 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
createTopic(zkUtils, topic2, servers = Seq(server), numPartitions = 1)
// Commit an offset
- val expectedReplicaAssignment = Map(0 -> List(1))
val commitRequest = OffsetCommitRequest(group, immutable.Map(
TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L),
TopicAndPartition(topic2, 0) -> OffsetAndMetadata(offset=42L)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index 825b2b4..3a09737 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -120,7 +120,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
//Add data equally to each partition
producer = createNewProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = 1)
- (0 until msgCount).foreach { x =>
+ (0 until msgCount).foreach { _ =>
(0 to 7).foreach { partition =>
producer.send(new ProducerRecord(topic, partition, null, msg))
}
@@ -215,7 +215,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
def addData(msgCount: Int, msg: Array[Byte]): Boolean = {
producer = createNewProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = 0)
- (0 until msgCount).foreach { x => producer.send(new ProducerRecord(topic, msg)).get }
+ (0 until msgCount).map(_ => producer.send(new ProducerRecord(topic, msg))).foreach(_.get)
waitForOffsetsToMatch(msgCount, 0, 100)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
index 4e4fb95..9e19e39 100644
--- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
@@ -56,7 +56,7 @@ class SaslApiVersionsRequestTest extends BaseRequestTest with SaslTestHarness {
sendApiVersionsRequest(plaintextSocket, new ApiVersionsRequest, 0)
fail("Versions Request during Sasl handshake did not fail")
} catch {
- case ioe: IOException => // expected exception
+ case _: IOException => // expected exception
}
} finally {
plaintextSocket.close()
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
index e0b6db4..f21f2de 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
@@ -130,7 +130,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
try {
server1.startup()
} catch {
- case e: kafka.common.InconsistentBrokerIdException => //success
+ case _: kafka.common.InconsistentBrokerIdException => //success
}
server1.shutdown()
CoreUtils.delete(server1.config.logDirs)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index bc71edd..fd0a460 100755
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -129,16 +129,14 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
val server = new KafkaServer(newConfig, threadNamePrefix = Option(this.getClass.getName))
try {
server.startup()
- fail("Expected KafkaServer setup to fail, throw exception")
+ fail("Expected KafkaServer setup to fail and throw exception")
}
catch {
// Try to clean up carefully without hanging even if the test fails. This means trying to accurately
// identify the correct exception, making sure the server was shutdown, and cleaning up if anything
// goes wrong so that awaitShutdown doesn't hang
- case e: org.I0Itec.zkclient.exception.ZkException =>
+ case _: org.I0Itec.zkclient.exception.ZkException =>
assertEquals(NotRunning.state, server.brokerState.currentState)
- case e: Throwable =>
- fail("Expected ZkException during Kafka server starting up but caught a different exception %s".format(e.toString))
}
finally {
if (server.brokerState.currentState != NotRunning.state)
@@ -170,7 +168,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
assertTrue(true)
}
catch{
- case ex: Throwable => fail()
+ case _: Throwable => fail()
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index b5560c3..86b167e 100755
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -58,7 +58,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
TestUtils.createServer(KafkaConfig.fromProps(props2))
fail("Registering a broker with a conflicting id should fail")
} catch {
- case e : RuntimeException =>
+ case _: RuntimeException =>
// this is expected
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
index 741eec9..da80c0d 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
@@ -62,7 +62,7 @@ class ConsoleProducerTest {
new ConsoleProducer.ProducerConfig(invalidArgs)
Assert.fail("Should have thrown an UnrecognizedOptionException")
} catch {
- case e: joptsimple.OptionException => // expected exception
+ case _: joptsimple.OptionException => // expected exception
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 3796e48..83fd6b8 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -822,14 +822,14 @@ object TestUtils extends Logging {
def writeNonsenseToFile(fileName: File, position: Long, size: Int) {
val file = new RandomAccessFile(fileName, "rw")
file.seek(position)
- for(i <- 0 until size)
+ for (_ <- 0 until size)
file.writeByte(random.nextInt(255))
file.close()
}
def appendNonsenseToFile(fileName: File, size: Int) {
val file = new FileOutputStream(fileName, true)
- for(i <- 0 until size)
+ for (_ <- 0 until size)
file.write(random.nextInt(255))
file.close()
}
@@ -984,7 +984,7 @@ object TestUtils extends Logging {
var messages: List[String] = Nil
val shouldGetAllMessages = nMessagesPerThread < 0
- for ((topic, messageStreams) <- topicMessageStreams) {
+ for (messageStreams <- topicMessageStreams.values) {
for (messageStream <- messageStreams) {
val iterator = messageStream.iterator()
try {
@@ -1117,7 +1117,7 @@ object TestUtils extends Logging {
}
}
} catch {
- case ie: InterruptedException => failWithTimeout()
+ case _: InterruptedException => failWithTimeout()
case e: Throwable => exceptions += e
} finally {
threadPool.shutdownNow()
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala b/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala
index 29c9067..64129e9 100644
--- a/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala
@@ -35,8 +35,6 @@ class TimerTaskListTest {
@Test
def testAll() {
val sharedCounter = new AtomicInteger(0)
- val runCounter = new AtomicInteger(0)
- val execCounter = new AtomicInteger(0)
val list1 = new TimerTaskList(sharedCounter)
val list2 = new TimerTaskList(sharedCounter)
val list3 = new TimerTaskList(sharedCounter)
@@ -46,7 +44,7 @@ class TimerTaskListTest {
list1.add(new TimerTaskEntry(task, 10L))
assertEquals(i, sharedCounter.get)
task
- }.toSeq
+ }
assertEquals(tasks.size, sharedCounter.get)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
index c2c25ed..4d57ed9 100644
--- a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
@@ -80,7 +80,7 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
try {
zkUtils.createEphemeralPathExpectConflict("/tmp/zktest", "node created")
} catch {
- case e: Exception =>
+ case _: Exception =>
}
var testData: String = null
@@ -147,7 +147,7 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
zwe.create()
false
} catch {
- case e: ZkNodeExistsException => true
+ case _: ZkNodeExistsException => true
}
Assert.assertTrue(gotException)
zkClient2.close()
@@ -155,7 +155,6 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
/**
* Tests if succeeds with znode from the same session
- *
*/
@Test
def testSameSession = {
@@ -171,7 +170,7 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
zwe.create()
false
} catch {
- case e: ZkNodeExistsException => true
+ case _: ZkNodeExistsException => true
}
Assert.assertFalse(gotException)
}
|