kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-3765; Kafka Code style corrections
Date Sun, 29 May 2016 08:01:47 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 0aff45096 -> 404b696be


KAFKA-3765; Kafka Code style corrections

Removed explicit returns, not needed parentheses, corrected variables, removed unused imports
Using isEmpty/nonEmpty  instead of size check, using head, flatmap instead of map-flatten

Author: Joshi <rekhajoshm@gmail.com>
Author: Rekha Joshi <rekhajoshm@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #1442 from rekhajoshm/KAFKA-3765


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/404b696b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/404b696b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/404b696b

Branch: refs/heads/trunk
Commit: 404b696bea58aca17fbe528aed03cb3c94516c39
Parents: 0aff450
Author: Joshi <rekhajoshm@gmail.com>
Authored: Sun May 29 09:01:20 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Sun May 29 09:01:20 2016 +0100

----------------------------------------------------------------------
 core/src/main/scala/kafka/admin/AclCommand.scala      |  2 +-
 core/src/main/scala/kafka/admin/AdminClient.scala     | 14 +++++++-------
 core/src/main/scala/kafka/admin/AdminUtils.scala      | 12 ++++++------
 .../scala/kafka/admin/ReassignPartitionsCommand.scala |  2 +-
 core/src/main/scala/kafka/admin/TopicCommand.scala    |  4 ++--
 .../main/scala/kafka/admin/ZkSecurityMigrator.scala   | 14 ++++++--------
 .../scala/kafka/api/ControlledShutdownRequest.scala   |  2 +-
 core/src/main/scala/kafka/log/FileMessageSet.scala    |  4 ++--
 core/src/main/scala/kafka/log/Log.scala               |  4 ++--
 core/src/main/scala/kafka/log/LogCleaner.scala        |  4 ++--
 core/src/main/scala/kafka/log/LogCleanerManager.scala |  2 +-
 core/src/main/scala/kafka/log/LogManager.scala        |  5 ++---
 core/src/main/scala/kafka/log/OffsetIndex.scala       |  8 ++++++--
 core/src/main/scala/kafka/tools/ConsoleProducer.scala |  2 +-
 core/src/main/scala/kafka/tools/GetOffsetShell.scala  |  2 +-
 core/src/main/scala/kafka/tools/JmxTool.scala         |  6 +++---
 core/src/main/scala/kafka/tools/MirrorMaker.scala     |  2 +-
 17 files changed, 45 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/404b696b/core/src/main/scala/kafka/admin/AclCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index 966c4be..080f809 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -117,7 +117,7 @@ object AclCommand {
 
       val resourceToAcls: Iterable[(Resource, Set[Acl])] =
         if (resources.isEmpty) authorizer.getAcls()
-        else resources.map(resource => (resource -> authorizer.getAcls(resource)))
+        else resources.map(resource => resource -> authorizer.getAcls(resource))
 
       for ((resource, acls) <- resourceToAcls)
         println(s"Current ACLs for resource `${resource}`: $Newline ${acls.map("\t" + _).mkString(Newline)}
$Newline")

http://git-wip-us.apache.org/repos/asf/kafka/blob/404b696b/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 8572ceb..556a02b 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -49,7 +49,7 @@ class AdminClient(val time: Time,
     client.poll(future)
 
     if (future.succeeded())
-      return future.value().responseBody()
+      future.value().responseBody()
     else
       throw future.exception()
   }
@@ -61,10 +61,10 @@ class AdminClient(val time: Time,
           return send(broker, api, request)
         } catch {
           case e: Exception =>
-            debug(s"Request ${api} failed against node ${broker}", e)
+            debug(s"Request $api failed against node $broker", e)
         }
     }
-    throw new RuntimeException(s"Request ${api} failed on brokers ${bootstrapBrokers}")
+    throw new RuntimeException(s"Request $api failed on brokers $bootstrapBrokers")
   }
 
   private def findCoordinator(groupId: String): Node = {
@@ -88,7 +88,7 @@ class AdminClient(val time: Time,
     val response = new MetadataResponse(responseBody)
     val errors = response.errors()
     if (!errors.isEmpty)
-      debug(s"Metadata request contained errors: ${errors}")
+      debug(s"Metadata request contained errors: $errors")
     response.cluster().nodes().asScala.toList
   }
 
@@ -100,7 +100,7 @@ class AdminClient(val time: Time,
             listGroups(broker)
           } catch {
             case e: Exception =>
-              debug(s"Failed to find groups from broker ${broker}", e)
+              debug(s"Failed to find groups from broker $broker", e)
               List[GroupOverview]()
           }
         }
@@ -127,7 +127,7 @@ class AdminClient(val time: Time,
     val response = new DescribeGroupsResponse(responseBody)
     val metadata = response.groups().get(groupId)
     if (metadata == null)
-      throw new KafkaException(s"Response from broker contained no metadata for group ${groupId}")
+      throw new KafkaException(s"Response from broker contained no metadata for group $groupId")
 
     Errors.forCode(metadata.errorCode()).maybeThrow()
     val members = metadata.members().map { member =>
@@ -149,7 +149,7 @@ class AdminClient(val time: Time,
       return None
 
     if (group.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
-      throw new IllegalArgumentException(s"Group ${groupId} with protocol type '${group.protocolType}'
is not a valid consumer group")
+      throw new IllegalArgumentException(s"Group $groupId with protocol type '${group.protocolType}'
is not a valid consumer group")
 
     if (group.state == "Stable") {
       Some(group.members.map { member =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/404b696b/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index a8a282e..53b6dd7 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -250,7 +250,7 @@ object AdminUtils extends Logging {
                     checkBrokerAvailable: Boolean = true,
                     rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
     val existingPartitionsReplicaList = zkUtils.getReplicaAssignmentForTopics(List(topic))
-    if (existingPartitionsReplicaList.size == 0)
+    if (existingPartitionsReplicaList.isEmpty)
       throw new AdminOperationException("The topic %s does not exist".format(topic))
 
     val existingReplicaListForPartitionZero = existingPartitionsReplicaList.find(p =>
p._1.partition == 0) match {
@@ -274,8 +274,8 @@ object AdminUtils extends Logging {
           existingPartitionsReplicaList.size, checkBrokerAvailable)
 
     // check if manual assignment has the right replication factor
-    val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size !=
existingReplicaListForPartitionZero.size))
-    if (unmatchedRepFactorList.size != 0)
+    val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => p.size !=
existingReplicaListForPartitionZero.size)
+    if (unmatchedRepFactorList.nonEmpty)
       throw new AdminOperationException("The replication factor in manual replication assignment
" +
         " is not equal to the existing replication factor for the topic " + existingReplicaListForPartitionZero.size)
 
@@ -291,9 +291,9 @@ object AdminUtils extends Logging {
     val ret = new mutable.HashMap[Int, List[Int]]()
     var partitionId = startPartitionId
     partitionList = partitionList.takeRight(partitionList.size - partitionId)
-    for (i <- 0 until partitionList.size) {
+    for (i <- partitionList.indices) {
       val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
-      if (brokerList.size <= 0)
+      if (brokerList.isEmpty)
         throw new AdminOperationException("replication factor must be larger than 0")
       if (brokerList.size != brokerList.toSet.size)
         throw new AdminOperationException("duplicate brokers in replica assignment: " + brokerList)
@@ -443,7 +443,7 @@ object AdminUtils extends Logging {
   private def writeTopicPartitionAssignment(zkUtils: ZkUtils, topic: String, replicaAssignment:
Map[Int, Seq[Int]], update: Boolean) {
     try {
       val zkPath = getTopicPath(topic)
-      val jsonPartitionData = zkUtils.replicaAssignmentZkData(replicaAssignment.map(e =>
(e._1.toString -> e._2)))
+      val jsonPartitionData = zkUtils.replicaAssignmentZkData(replicaAssignment.map(e =>
e._1.toString -> e._2))
 
       if (!update) {
         info("Topic creation " + jsonPartitionData.toString)

http://git-wip-us.apache.org/repos/asf/kafka/blob/404b696b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 1bf351a..fae0a40 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -113,7 +113,7 @@ object ReassignPartitionsCommand extends Logging {
       val (_, replicas) = assignment.head
       val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerMetadatas, assignment.size,
replicas.size)
       partitionsToBeReassigned ++= assignedReplicas.map { case (partition, replicas) =>
-        (TopicAndPartition(topic, partition) -> replicas)
+        TopicAndPartition(topic, partition) -> replicas
       }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/404b696b/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index e6ebb96..c643a9d 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -118,7 +118,7 @@ object TopicCommand extends Logging {
   def alterTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
     val topics = getTopics(zkUtils, opts)
     val ifExists = if (opts.options.has(opts.ifExistsOpt)) true else false
-    if (topics.length == 0 && !ifExists) {
+    if (topics.isEmpty && !ifExists) {
       throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
           opts.options.valueOf(opts.zkConnectOpt)))
     }
@@ -165,7 +165,7 @@ object TopicCommand extends Logging {
   def deleteTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
     val topics = getTopics(zkUtils, opts)
     val ifExists = if (opts.options.has(opts.ifExistsOpt)) true else false
-    if (topics.length == 0 && !ifExists) {
+    if (topics.isEmpty && !ifExists) {
       throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
           opts.options.valueOf(opts.zkConnectOpt)))
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/404b696b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
index 2080879..a87e5b7 100644
--- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
+++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
@@ -18,20 +18,18 @@
 package kafka.admin
 
 import java.util.concurrent.LinkedBlockingQueue
-import java.util.concurrent.ThreadPoolExecutor
-import java.util.concurrent.TimeUnit
+
 import joptsimple.OptionParser
 import org.I0Itec.zkclient.exception.ZkException
-import kafka.utils.{Logging, ZkUtils, CommandLineUtils}
-import org.apache.log4j.Level
+import kafka.utils.{CommandLineUtils, Logging, ZkUtils}
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.zookeeper.AsyncCallback.{ChildrenCallback, StatCallback}
 import org.apache.zookeeper.data.Stat
 import org.apache.zookeeper.KeeperException
 import org.apache.zookeeper.KeeperException.Code
+
 import scala.annotation.tailrec
 import scala.collection.JavaConverters._
-import scala.collection._
 import scala.collection.mutable.Queue
 import scala.concurrent._
 import scala.concurrent.duration._
@@ -83,9 +81,9 @@ object ZkSecurityMigrator extends Logging {
     if (options.has(helpOpt))
       CommandLineUtils.printUsageAndDie(parser, usageMessage)
 
-    if ((jaasFile == null)) {
-     val errorMsg = ("No JAAS configuration file has been specified. Please make sure that
you have set " +
-                    "the system property %s".format(JaasUtils.JAVA_LOGIN_CONFIG_PARAM))
+    if (jaasFile == null) {
+     val errorMsg = "No JAAS configuration file has been specified. Please make sure that
you have set " +
+       "the system property %s".format(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
      System.out.println("ERROR: %s".format(errorMsg))
      throw new IllegalArgumentException("Incorrect configuration")
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/404b696b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
index b875e3e..42a17e6 100644
--- a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
+++ b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
@@ -19,7 +19,7 @@ package kafka.api
 
 import java.nio.ByteBuffer
 
-import kafka.common.{TopicAndPartition}
+import kafka.common.TopicAndPartition
 import kafka.api.ApiUtils._
 import kafka.network.{RequestOrResponseSend, RequestChannel}
 import kafka.network.RequestChannel.Response

http://git-wip-us.apache.org/repos/asf/kafka/blob/404b696b/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index a164b4b..a454f2c 100755
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -83,7 +83,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
       this(file,
         channel = FileMessageSet.openChannel(file, mutable = true, fileAlreadyExists, initFileSize,
preallocate),
         start = 0,
-        end = ( if ( !fileAlreadyExists && preallocate ) 0 else Int.MaxValue),
+        end = if (!fileAlreadyExists && preallocate) 0 else Int.MaxValue,
         isSlice = false)
 
   /**
@@ -224,7 +224,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
       }
     }
 
-    if (sizeInBytes > 0 && newMessages.size == 0) {
+    if (sizeInBytes > 0 && newMessages.isEmpty) {
       // This indicates that the message is too large. We just return all the bytes in the
file message set.
       this
     } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/404b696b/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index a7549dc..62dc7a1 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -35,7 +35,7 @@ import com.yammer.metrics.core.Gauge
 import org.apache.kafka.common.utils.Utils
 
 object LogAppendInfo {
-  val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Message.NoTimestamp, NoCompressionCodec,
NoCompressionCodec, -1, -1, false)
+  val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Message.NoTimestamp, NoCompressionCodec,
NoCompressionCodec, -1, -1, offsetsMonotonic = false)
 }
 
 /**
@@ -228,7 +228,7 @@ class Log(val dir: File,
       replaceSegments(swapSegment, oldSegments.toSeq, isRecoveredSwapFile = true)
     }
 
-    if(logSegments.size == 0) {
+    if(logSegments.isEmpty) {
       // no existing segments, create a new mutable segment beginning at offset 0
       segments.put(0L, new LogSegment(dir = dir,
                                      startOffset = 0,

http://git-wip-us.apache.org/repos/asf/kafka/blob/404b696b/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index c6636be..4c0db0d 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -578,12 +578,12 @@ private[log] class Cleaner(val id: Int,
   private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], maxSize: Int, maxIndexSize:
Int): List[Seq[LogSegment]] = {
     var grouped = List[List[LogSegment]]()
     var segs = segments.toList
-    while(!segs.isEmpty) {
+    while(segs.nonEmpty) {
       var group = List(segs.head)
       var logSize = segs.head.size
       var indexSize = segs.head.index.sizeInBytes
       segs = segs.tail
-      while(!segs.isEmpty &&
+      while(segs.nonEmpty &&
             logSize + segs.head.size <= maxSize &&
             indexSize + segs.head.index.sizeInBytes <= maxIndexSize &&
             segs.head.index.lastOffset - group.last.index.baseOffset <= Int.MaxValue)
{

http://git-wip-us.apache.org/repos/asf/kafka/blob/404b696b/core/src/main/scala/kafka/log/LogCleanerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index f92db4e..72757c0 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -100,7 +100,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs:
Pool[To
           LogToClean(topicAndPartition, log, firstDirtyOffset)
       }.filter(ltc => ltc.totalBytes > 0) // skip any empty logs
 
-      this.dirtiestLogCleanableRatio = if (!dirtyLogs.isEmpty) dirtyLogs.max.cleanableRatio
else 0
+      this.dirtiestLogCleanableRatio = if (dirtyLogs.nonEmpty) dirtyLogs.max.cleanableRatio
else 0
       // and must meet the minimum threshold for dirty byte ratio
       val cleanableLogs = dirtyLogs.filter(ltc => ltc.cleanableRatio > ltc.log.config.minCleanableRatio)
       if(cleanableLogs.isEmpty) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/404b696b/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 749c622..4357ef4 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -132,10 +132,9 @@ class LogManager(val logDirs: Array[File],
       try {
         recoveryPoints = this.recoveryPointCheckpoints(dir).read
       } catch {
-        case e: Exception => {
+        case e: Exception =>
           warn("Error occured while reading recovery-point-offset-checkpoint file of directory
" + dir, e)
           warn("Resetting the recovery checkpoint to 0")
-        }
       }
 
       val jobsForDir = for {
@@ -282,7 +281,7 @@ class LogManager(val logDirs: Array[File],
       // If the log does not exist, skip it
       if (log != null) {
         //May need to abort and pause the cleaning of the log, and resume after truncation
is done.
-        val needToStopCleaner: Boolean = (truncateOffset < log.activeSegment.baseOffset)
+        val needToStopCleaner: Boolean = truncateOffset < log.activeSegment.baseOffset
         if (needToStopCleaner && cleaner != null)
           cleaner.abortAndPauseCleaning(topicAndPartition)
         log.truncateTo(truncateOffset)

http://git-wip-us.apache.org/repos/asf/kafka/blob/404b696b/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index ce35d68..f432732 100755
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -24,9 +24,11 @@ import java.io._
 import java.nio._
 import java.nio.channels._
 import java.util.concurrent.locks._
+
 import kafka.utils._
 import kafka.utils.CoreUtils.inLock
 import kafka.common.InvalidOffsetException
+import sun.nio.ch.DirectBuffer
 
 /**
  * An index that maps offsets to physical file locations for a particular log segment. This
index may be sparse:
@@ -306,8 +308,10 @@ class OffsetIndex(@volatile private[this] var _file: File, val baseOffset:
Long,
    */
   private def forceUnmap(m: MappedByteBuffer) {
     try {
-      if(m.isInstanceOf[sun.nio.ch.DirectBuffer])
-        (m.asInstanceOf[sun.nio.ch.DirectBuffer]).cleaner().clean()
+      m match {
+        case buffer: DirectBuffer => buffer.cleaner().clean()
+        case _ =>
+      }
     } catch {
       case t: Throwable => warn("Error when freeing index buffer", t)
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/404b696b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index e647601..4cc7c20 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -311,7 +311,7 @@ object ConsoleProducer {
           line.indexOf(keySeparator) match {
             case -1 =>
               if (ignoreError) new ProducerRecord(topic, line.getBytes)
-              else throw new KafkaException(s"No key found on line ${lineNumber}: $line")
+              else throw new KafkaException(s"No key found on line $lineNumber: $line")
             case n =>
               val value = (if (n + keySeparator.size > line.size) "" else line.substring(n
+ keySeparator.size)).getBytes
               new ProducerRecord(topic, line.substring(0, n).getBytes, value)

http://git-wip-us.apache.org/repos/asf/kafka/blob/404b696b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
index 30c7afe..f7207ec 100644
--- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala
+++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
@@ -77,7 +77,7 @@ object GetOffsetShell {
     val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue()
 
     val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers,
clientId, maxWaitMs).topicsMetadata
-    if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) {
+    if(topicsMetadata.size != 1 || !topicsMetadata.head.topic.equals(topic)) {
       System.err.println(("Error: no valid topic metadata for topic: %s, " + " probably the
topic does not exist, run ").format(topic) +
         "kafka-list-topic.sh to verify")
       System.exit(1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/404b696b/core/src/main/scala/kafka/tools/JmxTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala
index bd7ca0e..8112f9e 100644
--- a/core/src/main/scala/kafka/tools/JmxTool.scala
+++ b/core/src/main/scala/kafka/tools/JmxTool.scala
@@ -89,7 +89,7 @@ object JmxTool extends Logging {
       else
         List(null)
 
-    val names = queries.map((name: ObjectName) => mbsc.queryNames(name, null): mutable.Set[ObjectName]).flatten
+    val names = queries.flatMap((name: ObjectName) => mbsc.queryNames(name, null): mutable.Set[ObjectName])
 
     val numExpectedAttributes: Map[ObjectName, Int] =
       attributesWhitelistExists match {
@@ -101,7 +101,7 @@ object JmxTool extends Logging {
 
     // print csv header
     val keys = List("time") ++ queryAttributes(mbsc, names, attributesWhitelist).keys.toArray.sorted
-    if(keys.size == numExpectedAttributes.map(_._2).sum + 1)
+    if(keys.size == numExpectedAttributes.values.sum + 1)
       println(keys.map("\"" + _ + "\"").mkString(","))
 
     while(true) {
@@ -111,7 +111,7 @@ object JmxTool extends Logging {
         case Some(dFormat) => dFormat.format(new Date)
         case None => System.currentTimeMillis().toString
       }
-      if(attributes.keySet.size == numExpectedAttributes.map(_._2).sum + 1)
+      if(attributes.keySet.size == numExpectedAttributes.values.sum + 1)
         println(keys.map(attributes(_)).mkString(","))
       val sleep = max(0, interval - (System.currentTimeMillis - start))
       Thread.sleep(sleep)

http://git-wip-us.apache.org/repos/asf/kafka/blob/404b696b/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 87f3cc5..9d5f7e6 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -494,7 +494,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
       // Creating one stream per each connector instance
       val streams = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(),
new DefaultDecoder())
       require(streams.size == 1)
-      val stream = streams(0)
+      val stream = streams.head
       iter = stream.iterator()
     }
 


Mime
View raw message