kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-4899; Fix findbugs warnings in kafka-core
Date Fri, 07 Apr 2017 11:40:38 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 359a68510 -> 96ac0f307


KAFKA-4899; Fix findbugs warnings in kafka-core

Author: Colin P. Mccabe <cmccabe@confluent.io>

Reviewers: Jozef Koval <jozef.koval@protonmail.ch>, Ismael Juma <ismael@juma.me.uk>

Closes #2687 from cmccabe/KAFKA-4899


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/96ac0f30
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/96ac0f30
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/96ac0f30

Branch: refs/heads/trunk
Commit: 96ac0f3076271b7a867a1e94964893da78f2a24d
Parents: 359a685
Author: Colin P. Mccabe <cmccabe@confluent.io>
Authored: Fri Apr 7 12:29:48 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Fri Apr 7 12:29:48 2017 +0100

----------------------------------------------------------------------
 .../src/main/scala/kafka/admin/AdminUtils.scala |  4 +-
 .../kafka/consumer/ConsumerFetcherThread.scala  |  1 -
 .../main/scala/kafka/consumer/TopicCount.scala  | 11 +--
 .../main/scala/kafka/javaapi/FetchRequest.scala | 12 +--
 .../scala/kafka/javaapi/FetchResponse.scala     | 11 ++-
 .../javaapi/GroupCoordinatorResponse.scala      | 11 ++-
 .../kafka/javaapi/OffsetCommitRequest.scala     | 16 ++--
 .../kafka/javaapi/OffsetFetchRequest.scala      | 16 ++--
 .../scala/kafka/javaapi/OffsetRequest.scala     | 17 ++--
 .../scala/kafka/javaapi/OffsetResponse.scala    | 16 ++--
 .../kafka/javaapi/TopicMetadataResponse.scala   | 11 ++-
 core/src/main/scala/kafka/log/Log.scala         | 19 ++--
 core/src/main/scala/kafka/log/LogManager.scala  |  5 +-
 core/src/main/scala/kafka/log/LogSegment.scala  |  9 +-
 .../kafka/metrics/KafkaCSVMetricsReporter.scala |  3 +-
 .../kafka/server/BrokerMetadataCheckpoint.scala | 14 ++-
 .../scala/kafka/server/ClientQuotaManager.scala |  2 +-
 .../server/checkpoints/CheckpointFile.scala     | 10 +-
 .../scala/kafka/tools/ImportZkOffsets.scala     | 31 ++++---
 .../main/scala/kafka/tools/MirrorMaker.scala    | 17 ++--
 core/src/main/scala/kafka/utils/FileLock.scala  |  7 +-
 gradle/findbugs-exclude.xml                     | 98 ++++++++++++++++++--
 22 files changed, 210 insertions(+), 131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/96ac0f30/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index d4ae4ff..c5d7f12 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -307,8 +307,8 @@ object AdminUtils extends Logging with AdminUtilities {
       if (brokerList.size != brokerList.toSet.size)
         throw new AdminOperationException("duplicate brokers in replica assignment: " + brokerList)
       if (checkBrokerAvailable && !brokerList.toSet.subsetOf(availableBrokerList))
-        throw new AdminOperationException("some specified brokers not available. specified
brokers: " + brokerList.toString +
-          "available broker:" + availableBrokerList.toString)
+        throw new AdminOperationException("some specified brokers not available. specified
brokers: " + brokerList +
+          "available broker:" + availableBrokerList)
       ret.put(partitionId, brokerList.toList)
       if (ret(partitionId).size != ret(startPartitionId).size)
         throw new AdminOperationException("partition " + i + " has different replication
factor: " + brokerList)

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ac0f30/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index 8d712f4..ec60220 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -83,7 +83,6 @@ class ConsumerFetcherThread(name: String,
   def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = {
     val startTimestamp = config.autoOffsetReset match {
       case OffsetRequest.SmallestTimeString => OffsetRequest.EarliestTime
-      case OffsetRequest.LargestTimeString => OffsetRequest.LatestTime
       case _ => OffsetRequest.LatestTime
     }
     val topicAndPartition = TopicAndPartition(topicPartition.topic, topicPartition.partition)

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ac0f30/core/src/main/scala/kafka/consumer/TopicCount.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala
index eb035f2..4ca8b2c 100755
--- a/core/src/main/scala/kafka/consumer/TopicCount.scala
+++ b/core/src/main/scala/kafka/consumer/TopicCount.scala
@@ -104,6 +104,9 @@ private[kafka] object TopicCount extends Logging {
   def constructTopicCount(consumerIdString: String, filter: TopicFilter, numStreams: Int,
zkUtils: ZkUtils, excludeInternalTopics: Boolean) =
     new WildcardTopicCount(zkUtils, consumerIdString, filter, numStreams, excludeInternalTopics)
 
+  override def equals(o: Any): Boolean = {
+    throw new KafkaException("can't use equals here")
+  }
 }
 
 private[kafka] class StaticTopicCount(val consumerIdString: String,
@@ -112,14 +115,6 @@ private[kafka] class StaticTopicCount(val consumerIdString: String,
 
   def getConsumerThreadIdsPerTopic = TopicCount.makeConsumerThreadIdsPerTopic(consumerIdString,
topicCountMap)
 
-  override def equals(obj: Any): Boolean = {
-    obj match {
-      case null => false
-      case n: StaticTopicCount => consumerIdString == n.consumerIdString && topicCountMap
== n.topicCountMap
-      case _ => false
-    }
-  }
-
   def getTopicCountMap = topicCountMap
 
   def pattern = TopicCount.staticPattern

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ac0f30/core/src/main/scala/kafka/javaapi/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/FetchRequest.scala b/core/src/main/scala/kafka/javaapi/FetchRequest.scala
index fb9fa8e..fe8beaa 100644
--- a/core/src/main/scala/kafka/javaapi/FetchRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/FetchRequest.scala
@@ -57,14 +57,14 @@ class FetchRequest(correlationId: Int,
 
   override def toString = underlying.toString
 
-  override def equals(other: Any) = canEqual(other) && {
-    val otherFetchRequest = other.asInstanceOf[kafka.javaapi.FetchRequest]
-    this.underlying.equals(otherFetchRequest.underlying)
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case null => false
+      case other: FetchRequest => this.underlying.equals(other.underlying)
+      case _ => false
+    }
   }
 
-  def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.FetchRequest]
-
   override def hashCode = underlying.hashCode
-
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ac0f30/core/src/main/scala/kafka/javaapi/FetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/FetchResponse.scala b/core/src/main/scala/kafka/javaapi/FetchResponse.scala
index 9c67dd8..c916555 100644
--- a/core/src/main/scala/kafka/javaapi/FetchResponse.scala
+++ b/core/src/main/scala/kafka/javaapi/FetchResponse.scala
@@ -32,12 +32,13 @@ class FetchResponse(private val underlying: kafka.api.FetchResponse) {
 
   def errorCode(topic: String, partition: Int) = error(topic, partition).code
 
-  override def equals(other: Any) = canEqual(other) && {
-    val otherFetchResponse = other.asInstanceOf[kafka.javaapi.FetchResponse]
-    this.underlying.equals(otherFetchResponse.underlying)
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case null => false
+      case other: FetchResponse => this.underlying.equals(other.underlying)
+      case _ => false
+    }
   }
 
-  def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.FetchResponse]
-
   override def hashCode = underlying.hashCode
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ac0f30/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala b/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala
index 9871ca0..096941c 100644
--- a/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala
+++ b/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala
@@ -31,13 +31,14 @@ class GroupCoordinatorResponse(private val underlying: kafka.api.GroupCoordinato
     underlying.coordinatorOpt
   }
 
-  override def equals(other: Any) = canEqual(other) && {
-    val otherConsumerMetadataResponse = other.asInstanceOf[kafka.javaapi.GroupCoordinatorResponse]
-    this.underlying.equals(otherConsumerMetadataResponse.underlying)
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case null => false
+      case other: GroupCoordinatorResponse => this.underlying.equals(other.underlying)
+      case _ => false
+    }
   }
 
-  def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.GroupCoordinatorResponse]
-
   override def hashCode = underlying.hashCode
 
   override def toString = underlying.toString

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ac0f30/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
index 1924d5e..0c3c651 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
@@ -45,19 +45,15 @@ class OffsetCommitRequest(groupId: String,
     this(groupId, requestInfo, correlationId, clientId, 0)
   }
 
-
   override def toString = underlying.toString
 
-
-  override def equals(other: Any) = canEqual(other) && {
-    val otherOffsetRequest = other.asInstanceOf[kafka.javaapi.OffsetCommitRequest]
-    this.underlying.equals(otherOffsetRequest.underlying)
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case null => false
+      case other: OffsetCommitRequest => this.underlying.equals(other.underlying)
+      case _ => false
+    }
   }
 
-
-  def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.OffsetCommitRequest]
-
-
   override def hashCode = underlying.hashCode
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ac0f30/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala
index 8eb0d47..5f96439 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala
@@ -44,21 +44,17 @@ class OffsetFetchRequest(groupId: String,
     )
   }
 
-
   override def toString = underlying.toString
 
-
-  override def equals(other: Any) = canEqual(other) && {
-    val otherOffsetRequest = other.asInstanceOf[kafka.javaapi.OffsetFetchRequest]
-    this.underlying.equals(otherOffsetRequest.underlying)
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case null => false
+      case other: OffsetFetchRequest => this.underlying.equals(other.underlying)
+      case _ => false
+    }
   }
 
-
-  def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.OffsetFetchRequest]
-
-
   override def hashCode = underlying.hashCode
-
 }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ac0f30/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
index 21997d3..96b66ef 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
@@ -36,20 +36,15 @@ class OffsetRequest(requestInfo: java.util.Map[TopicAndPartition, PartitionOffse
     )
   }
 
-
-
   override def toString = underlying.toString
 
-
-  override def equals(other: Any) = canEqual(other) && {
-    val otherOffsetRequest = other.asInstanceOf[kafka.javaapi.OffsetRequest]
-    this.underlying.equals(otherOffsetRequest.underlying)
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case null => false
+      case other: OffsetRequest => this.underlying.equals(other.underlying)
+      case _ => false
+    }
   }
 
-
-  def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.OffsetRequest]
-
-
   override def hashCode = underlying.hashCode
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ac0f30/core/src/main/scala/kafka/javaapi/OffsetResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetResponse.scala b/core/src/main/scala/kafka/javaapi/OffsetResponse.scala
index 42ee2ab..cb2047f 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetResponse.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetResponse.scala
@@ -31,19 +31,15 @@ class OffsetResponse(private val underlying: kafka.api.OffsetResponse)
{
   def offsets(topic: String, partition: Int) =
     underlying.partitionErrorAndOffsets(TopicAndPartition(topic, partition)).offsets.toArray
 
-
-  override def equals(other: Any) = canEqual(other) && {
-    val otherOffsetResponse = other.asInstanceOf[kafka.javaapi.OffsetResponse]
-    this.underlying.equals(otherOffsetResponse.underlying)
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case null => false
+      case other: OffsetResponse => this.underlying.equals(other.underlying)
+      case _ => false
+    }
   }
 
-
-  def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.OffsetResponse]
-
-
   override def hashCode = underlying.hashCode
 
-
   override def toString = underlying.toString
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ac0f30/core/src/main/scala/kafka/javaapi/TopicMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataResponse.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataResponse.scala
index 3359060..40f81d5 100644
--- a/core/src/main/scala/kafka/javaapi/TopicMetadataResponse.scala
+++ b/core/src/main/scala/kafka/javaapi/TopicMetadataResponse.scala
@@ -24,13 +24,14 @@ class TopicMetadataResponse(private val underlying: kafka.api.TopicMetadataRespo
     underlying.topicsMetadata
   }
 
-  override def equals(other: Any) = canEqual(other) && {
-    val otherTopicMetadataResponse = other.asInstanceOf[kafka.javaapi.TopicMetadataResponse]
-    this.underlying.equals(otherTopicMetadataResponse.underlying)
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case null => false
+      case other: TopicMetadataResponse => this.underlying.equals(other.underlying)
+      case _ => false
+    }
   }
 
-  def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.TopicMetadataResponse]
-
   override def hashCode = underlying.hashCode
 
   override def toString = underlying.toString

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ac0f30/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 999c6aa..0e8cda8 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -18,22 +18,19 @@
 package kafka.log
 
 import java.io.{File, IOException}
+import java.nio.file.Files
 import java.text.NumberFormat
 import java.util.concurrent.atomic._
 import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap, TimeUnit}
 
-import com.yammer.metrics.core.Gauge
 import kafka.api.KAFKA_0_10_0_IV0
 import kafka.common._
-import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.{BrokerTopicStats, FetchDataInfo, LogOffsetMetadata}
 import kafka.utils._
-import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.{CorruptRecordException, OffsetOutOfRangeException,
RecordBatchTooLargeException, RecordTooLargeException, UnsupportedForMessageFormatException}
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.ListOffsetRequest
-import org.apache.kafka.common.utils.{Time, Utils}
 
 import scala.collection.JavaConverters._
 import scala.collection.{Seq, mutable}
@@ -210,7 +207,7 @@ class Log(@volatile var dir: File,
 
   private def initializeLeaderEpochCache(): LeaderEpochCache = {
     // create the log directory if it doesn't exist
-    dir.mkdirs()
+    Files.createDirectories(dir.toPath)
     new LeaderEpochFileCache(topicPartition, () => logEndOffsetMetadata,
       new LeaderEpochCheckpointFile(LeaderEpochFile.newFile(dir)))
   }
@@ -227,18 +224,18 @@ class Log(@volatile var dir: File,
       val filename = file.getName
       if(filename.endsWith(DeletedFileSuffix) || filename.endsWith(CleanedFileSuffix)) {
         // if the file ends in .deleted or .cleaned, delete it
-        file.delete()
+        Files.deleteIfExists(file.toPath)
       } else if(filename.endsWith(SwapFileSuffix)) {
         // we crashed in the middle of a swap operation, to recover:
         // if a log, delete the .index file, complete the swap operation later
         // if an index just delete it, it will be rebuilt
         val baseName = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
         if(baseName.getPath.endsWith(IndexFileSuffix)) {
-          file.delete()
+          Files.deleteIfExists(file.toPath)
         } else if(baseName.getPath.endsWith(LogFileSuffix)){
           // delete the index
           val index = new File(CoreUtils.replaceSuffix(baseName.getPath, LogFileSuffix, IndexFileSuffix))
-          index.delete()
+          Files.deleteIfExists(index.toPath())
           swapFiles += file
         }
       }
@@ -257,7 +254,7 @@ class Log(@volatile var dir: File,
 
         if(!logFile.exists) {
           warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath))
-          file.delete()
+          Files.deleteIfExists(file.toPath)
         }
       } else if(filename.endsWith(LogFileSuffix)) {
         // if its a log file, load the corresponding log segment
@@ -286,8 +283,8 @@ class Log(@volatile var dir: File,
             case e: java.lang.IllegalArgumentException =>
               warn(s"Found a corrupted index file due to ${e.getMessage}}. deleting ${timeIndexFile.getAbsolutePath},
" +
                 s"${indexFile.getAbsolutePath} and rebuilding index...")
-              indexFile.delete()
-              timeIndexFile.delete()
+              Files.deleteIfExists(timeIndexFile.toPath)
+              Files.delete(indexFile.toPath)
               segment.recover(config.maxMessageSize)
           }
         } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ac0f30/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 469c46b..a9398f0 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -18,6 +18,7 @@
 package kafka.log
 
 import java.io._
+import java.nio.file.Files
 import java.util.concurrent._
 
 import kafka.admin.AdminUtils
@@ -295,7 +296,7 @@ class LogManager(val logDirs: Array[File],
 
         // mark that the shutdown was clean by creating marker file
         debug("Writing clean shutdown marker at " + dir)
-        CoreUtils.swallow(new File(dir, Log.CleanShutdownFile).createNewFile())
+        CoreUtils.swallow(Files.createFile(new File(dir, Log.CleanShutdownFile).toPath))
       }
     } catch {
       case e: ExecutionException => {
@@ -408,7 +409,7 @@ class LogManager(val logDirs: Array[File],
       getLog(topicPartition).getOrElse {
         val dataDir = nextLogDir()
         val dir = new File(dataDir, topicPartition.topic + "-" + topicPartition.partition)
-        dir.mkdirs()
+        Files.createDirectories(dir.toPath)
 
         val log = new Log(
           dir = dir,

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ac0f30/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index b77be34..4f055a6 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -17,6 +17,8 @@
 package kafka.log
 
 import java.io.{File, IOException}
+import java.nio.file.Files
+import java.nio.file.attribute.FileTime
 import java.util.concurrent.TimeUnit
 
 import kafka.common._
@@ -468,9 +470,10 @@ class LogSegment(val log: FileRecords,
    * Change the last modified time for this log segment
    */
   def lastModified_=(ms: Long) = {
-    log.file.setLastModified(ms)
-    index.file.setLastModified(ms)
-    timeIndex.file.setLastModified(ms)
+    val fileTime = FileTime.fromMillis(ms)
+    Files.setLastModifiedTime(log.file.toPath, fileTime)
+    Files.setLastModifiedTime(index.file.toPath, fileTime)
+    Files.setLastModifiedTime(timeIndex.file.toPath, fileTime)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ac0f30/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
index 686f692..81c20a7 100755
--- a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
@@ -22,6 +22,7 @@ package kafka.metrics
 
 import com.yammer.metrics.Metrics
 import java.io.File
+import java.nio.file.Files
 
 import com.yammer.metrics.reporting.CsvReporter
 import java.util.concurrent.TimeUnit
@@ -50,7 +51,7 @@ private class KafkaCSVMetricsReporter extends KafkaMetricsReporter
         val metricsConfig = new KafkaMetricsConfig(props)
         csvDir = new File(props.getString("kafka.csv.metrics.dir", "kafka_metrics"))
         Utils.delete(csvDir)
-        csvDir.mkdirs()
+        Files.createDirectories(csvDir.toPath())
         underlying = new CsvReporter(Metrics.defaultRegistry(), csvDir)
         if (props.getBoolean("kafka.csv.metrics.reporter.enabled", default = false)) {
           initialized = true

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ac0f30/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
index cc2c4cd..8630026 100755
--- a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
@@ -18,6 +18,7 @@
 package kafka.server
 
 import java.io._
+import java.nio.file.Files
 import java.util.Properties
 import kafka.utils._
 import org.apache.kafka.common.utils.Utils
@@ -29,7 +30,7 @@ case class BrokerMetadata(brokerId: Int)
   */
 class BrokerMetadataCheckpoint(val file: File) extends Logging {
   private val lock = new Object()
-  new File(file + ".tmp").delete() // try to delete any existing temp files for cleanliness
+  Files.deleteIfExists(new File(file + ".tmp").toPath()) // try to delete any existing temp
files for cleanliness
 
   def write(brokerMetadata: BrokerMetadata) = {
     lock synchronized {
@@ -39,10 +40,13 @@ class BrokerMetadataCheckpoint(val file: File) extends Logging {
         brokerMetaProps.setProperty("broker.id", brokerMetadata.brokerId.toString)
         val temp = new File(file.getAbsolutePath + ".tmp")
         val fileOutputStream = new FileOutputStream(temp)
-        brokerMetaProps.store(fileOutputStream,"")
-        fileOutputStream.flush()
-        fileOutputStream.getFD().sync()
-        fileOutputStream.close()
+        try {
+          brokerMetaProps.store(fileOutputStream, "")
+          fileOutputStream.flush()
+          fileOutputStream.getFD().sync()
+        } finally {
+          Utils.closeQuietly(fileOutputStream, temp.getName)
+        }
         Utils.atomicMoveWithFallback(temp.toPath, file.toPath)
       } catch {
         case ie: IOException =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ac0f30/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index eb536f7..84772db 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -129,7 +129,7 @@ case class QuotaEntity(quotaId: QuotaId, sanitizedUser: String, clientId:
String
  * @param apiKey API Key for the request
  * @param time @Time object to use
  */
-class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
+final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
                          private val metrics: Metrics,
                          private val apiKey: QuotaType,
                          private val time: Time) extends Logging {

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ac0f30/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
index 890dde0..3adda76 100644
--- a/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
+++ b/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
@@ -18,7 +18,7 @@ package kafka.server.checkpoints
 
 import java.io._
 import java.nio.charset.StandardCharsets
-import java.nio.file.{FileSystems, Paths}
+import java.nio.file.{FileAlreadyExistsException, FileSystems, Files, Paths}
 import kafka.utils.{Exit, Logging}
 import org.apache.kafka.common.utils.Utils
 import scala.collection.{Seq, mutable}
@@ -33,7 +33,11 @@ class CheckpointFile[T](val file: File, version: Int, formatter: CheckpointFileF
   private val path = file.toPath.toAbsolutePath
   private val tempPath = Paths.get(path.toString + ".tmp")
   private val lock = new Object()
-  file.createNewFile()
+  try {
+    Files.createFile(file.toPath) // create the file if it doesn't exist
+  } catch {
+    case _ :FileAlreadyExistsException => {}
+  }
 
   def write(entries: Seq[T]) {
     lock synchronized {
@@ -111,4 +115,4 @@ class CheckpointFile[T](val file: File, version: Int, formatter: CheckpointFileF
       }
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ac0f30/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
index bb9a65d..77d6bc1 100644
--- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
@@ -77,21 +77,24 @@ object ImportZkOffsets extends Logging {
   }
 
   private def getPartitionOffsetsFromFile(filename: String):Map[String,String] = {
-    val fr = new InputStreamReader(new FileInputStream(filename), StandardCharsets.UTF_8)
-    val br = new BufferedReader(fr)
-    var partOffsetsMap: Map[String,String] = Map()
-    
-    var s: String = br.readLine()
-    while ( s != null && s.length() >= 1) {
-      val tokens = s.split(":")
-      
-      partOffsetsMap += tokens(0) -> tokens(1)
-      debug("adding node path [" + s + "]")
-      
-      s = br.readLine()
+    val br = new BufferedReader(new InputStreamReader(new FileInputStream(filename), StandardCharsets.UTF_8))
+    try {
+      var partOffsetsMap: Map[String,String] = Map()
+
+      var s: String = br.readLine()
+      while ( s != null && s.length() >= 1) {
+        val tokens = s.split(":")
+
+        partOffsetsMap += tokens(0) -> tokens(1)
+        debug("adding node path [" + s + "]")
+
+        s = br.readLine()
+      }
+
+      partOffsetsMap
+    } finally {
+      br.close()
     }
-    
-    partOffsetsMap
   }
   
   private def updateZkOffsets(zkUtils: ZkUtils, partitionOffsets: Map[String,String]): Unit
= {

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ac0f30/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 5d88b4e..5dd8efc 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -239,7 +239,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
 
       // Create consumers
       val mirrorMakerConsumers = if (useOldConsumer) {
-        val customRebalanceListener = {
+        val customRebalanceListener :Option[ConsumerRebalanceListener] = {
           val customRebalanceListenerClass = options.valueOf(consumerRebalanceListenerOpt)
           if (customRebalanceListenerClass != null) {
             val rebalanceListenerArgs = options.valueOf(rebalanceListenerArgsOpt)
@@ -252,9 +252,6 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
             None
           }
         }
-
-        if (customRebalanceListener.exists(!_.isInstanceOf[ConsumerRebalanceListener]))
-          throw new IllegalArgumentException("The rebalance listener should be an instance
of kafka.consumer.ConsumerRebalanceListener")
         createOldConsumers(
           numStreams,
           consumerProps,
@@ -262,7 +259,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
           Option(options.valueOf(whitelistOpt)),
           Option(options.valueOf(blacklistOpt)))
       } else {
-        val customRebalanceListener = {
+        val customRebalanceListener :Option[org.apache.kafka.clients.consumer.ConsumerRebalanceListener]
= {
           val customRebalanceListenerClass = options.valueOf(consumerRebalanceListenerOpt)
           if (customRebalanceListenerClass != null) {
             val rebalanceListenerArgs = options.valueOf(rebalanceListenerArgsOpt)
@@ -275,9 +272,6 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
             None
           }
         }
-        if (customRebalanceListener.exists(!_.isInstanceOf[org.apache.kafka.clients.consumer.ConsumerRebalanceListener]))
-          throw new IllegalArgumentException("The rebalance listener should be an instance
of" +
-            "org.apache.kafka.clients.consumer.ConsumerRebalanceListner")
         createNewConsumers(
           numStreams,
           consumerProps,
@@ -518,6 +512,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
                                        filterSpec: TopicFilter) extends MirrorMakerBaseConsumer
{
     private var iter: ConsumerIterator[Array[Byte], Array[Byte]] = null
     private var immediateCommitRequested: Boolean = false
+    private var numCommitsNotified: Long = 0
 
     override def init() {
       // Creating one stream per each connector instance
@@ -532,7 +527,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
         // only wait() if mirrorMakerConsumer has been initialized and it has not been cleaned
up.
         if (iter != null) {
           immediateCommitRequested = true
-          this.wait()
+          val nextNumCommitsNotified = numCommitsNotified + 1
+          do {
+            this.wait()
+          } while (numCommitsNotified < nextNumCommitsNotified)
         }
       }
     }
@@ -540,6 +538,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
     override def notifyCommit() {
       this.synchronized {
         immediateCommitRequested = false
+        numCommitsNotified = numCommitsNotified + 1
         this.notifyAll()
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ac0f30/core/src/main/scala/kafka/utils/FileLock.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/FileLock.scala b/core/src/main/scala/kafka/utils/FileLock.scala
index 896c300..649fc92 100644
--- a/core/src/main/scala/kafka/utils/FileLock.scala
+++ b/core/src/main/scala/kafka/utils/FileLock.scala
@@ -18,6 +18,7 @@
 
 import java.io._
 import java.nio.channels._
+import java.nio.file.{FileAlreadyExistsException, Files}
 
 /**
  * A file lock a la flock/funlock
@@ -25,7 +26,11 @@ import java.nio.channels._
  * The given path will be created and opened if it doesn't exist.
  */
 class FileLock(val file: File) extends Logging {
-  file.createNewFile() // create the file if it doesn't exist
+  try {
+    Files.createFile(file.toPath) // create the file if it doesn't exist
+  } catch {
+    case _ :FileAlreadyExistsException => {}
+  }
   private val channel = new RandomAccessFile(file, "rw").getChannel()
   private var flock: java.nio.channels.FileLock = null
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ac0f30/gradle/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/gradle/findbugs-exclude.xml b/gradle/findbugs-exclude.xml
index 0a64e5f..26af433 100644
--- a/gradle/findbugs-exclude.xml
+++ b/gradle/findbugs-exclude.xml
@@ -15,15 +15,97 @@
    limitations under the License.
 -->
 
+<!-- Findbugs filtering.
+
+Findbugs is a static code analysis tool run as part of the "check" phase of the build.
+This file dictates which categories of bugs and individual false positives that we supress.
+
+For a detailed description of findbugs bug categories, see http://findbugs.sourceforge.net/bugDescriptions.html
+-->
 <FindBugsFilter>
-    <!-- Exclude a few findbugs codes.
-         EI: May expose internal representation by returning reference to mutable object.
-         EI2: May expose internal representation by incorporating reference to mutable object.
-         MS: Malicious code vulnerability.
-         See http://findbugs.sourceforge.net/bugDescriptions.html for a full description
of the findbugs codes.
-    -->
-    <Match>
-        <Bug code="EI,EI2,MS"/>
+    <Match>
+        <!-- Disable warnings about mutable objects and the use of public fields.
+            EI_EXPOSE_REP: May expose internal representation by returning reference to mutable
object
+            EI_EXPOSE_REP2: May expose internal representation by incorporating reference
to mutable object
+            MS_PKGPROTECT: Field should be package protected -->
+        <Bug pattern="EI_EXPOSE_REP,EI_EXPOSE_REP2,MS_PKGPROTECT"/>
+    </Match>
+
+    <Match>
+        <!-- Disable warnings about System.exit, until we decide to stop using it.
+            DM_EXIT: Method invokes System.exit -->
+        <Bug pattern="DM_EXIT"/>
+    </Match>
+
+    <Match>
+        <!-- Disable warnings about the lack of equals() when compareTo() is implemented.
+            EQ_COMPARETO_USE_OBJECT_EQUALS: This class defines a compareTo method but no
equals() method -->
+        <Bug pattern="EQ_COMPARETO_USE_OBJECT_EQUALS"/>
+    </Match>
+
+    <Match>
+        <!-- Findbugs tends to work a little bit better with Java than with Scala.  We
suppress
+             some categories of bug reports when using Scala, since findbugs generates huge
+             numbers of false positives in some of these categories when examining Scala
code.
+
+            NP_LOAD_OF_KNOWN_NULL_VALUE: The variable referenced at this point is known to
be null
+            due to an earlier check against null.
+            NP_NULL_PARAM_DEREF: Method call passes null for non-null parameter.
+            NP_NULL_ON_SOME_PATH: Possible null pointer dereference
+            SE_BAD_FIELD: Non-transient non-serializable instance field in serializable class.
+            DM_STRING_CTOR: Method invokes inefficient new String(String) constructor.
+            DM_NEW_FOR_GETCLASS: Method allocates an object, only to get the class object.
+            ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD: Write to static field from instance
method.
+            DM_NUMBER_CTOR: Method invokes inefficient Number constructor; use static valueOf
instead.
+            RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE: Nullcheck of value previously
dereferenced.
+            RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE: Redundant nullcheck of value known
to be non-null.
+            RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE: Redundant nullcheck of value known to
be null.
+            RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT: Return value of method without side effect
is ignored.
+            NM_CLASS_NAMING_CONVENTION: Class names should start with an upper case letter.
+            NM_METHOD_NAMING_CONVENTION: Method names should start with a lower case letter.
-->
+        <Source name="~.*\.scala" />
+        <Bug pattern="NP_LOAD_OF_KNOWN_NULL_VALUE,NP_NULL_ON_SOME_PATH,NP_NULL_PARAM_DEREF,SE_BAD_FIELD,DM_STRING_CTOR,DM_NEW_FOR_GETCLASS,ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD,DM_NUMBER_CTOR,RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE,RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE,RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE,RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT,NM_CLASS_NAMING_CONVENTION,NM_METHOD_NAMING_CONVENTION"/>
+    </Match>
+
+    <Match>
+        <!-- Add a suppression for KAFKA-4897: LogCleaner#cleanSegments should not ignore
failures to delete files.
+            TODO: remove this suppression when KAFKA-4897 is fixed. -->
+        <Class name="kafka.log.Cleaner"/>
+        <Method name="cleanSegments"/>
+        <Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"/>
+    </Match>
+
+    <Match>
+        <!-- Add a suppression for ignoring the return value of CountDownLatch#await.
-->
+        <Class name="kafka.log.Cleaner"/>
+        <Method name="cleanOrSleep"/>
+        <Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"/>
+    </Match>
+
+    <Match>
+        <!-- Add a suppression for having the thread start in the constructor of the old,
deprecated consumer. -->
+        <Class name="kafka.producer.Producer"/>
+        <Bug pattern="SC_START_IN_CTOR"/>
+    </Match>
+
+    <Match>
+        <!-- Add a suppression for the equals() method of NetworkClientBlockingOps. -->
+        <Class name="kafka.utils.NetworkClientBlockingOps"/>
+        <Bug pattern="EQ_UNUSUAL"/>
+    </Match>
+
+    <Match>
+        <!-- Add a suppression for auto-generated calls to instanceof in kafka.utils.Json
-->
+        <Source name="Json.scala"/>
+        <Package name="kafka.utils"/>
+        <Bug pattern="BC_VACUOUS_INSTANCEOF"/>
+    </Match>
+
+    <Match>
+        <!-- Add a suppression for a false locking warning. -->
+        <Package name="kafka.consumer"/>
+        <Source name="ZookeeperConsumerConnector.scala"/>
+        <Bug pattern="UL_UNRELEASED_LOCK_EXCEPTION_PATH"/>
     </Match>
 
     <Match>


Mime
View raw message