kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch trunk updated: MINOR: Use string/log interpolation instead of string concat in core and clients (#5850)
Date Mon, 29 Oct 2018 13:37:35 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9a0ea25  MINOR: Use string/log interpolation instead of string concat in core and
clients (#5850)
9a0ea25 is described below

commit 9a0ea25fee85837748145d37c69cf4d9bb7f9933
Author: Mickael Maison <mimaison@users.noreply.github.com>
AuthorDate: Mon Oct 29 13:37:23 2018 +0000

    MINOR: Use string/log interpolation instead of string concat in core and clients (#5850)
    
    Also removed a few unused imports and tweaked the log message slightly.
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>
---
 .../apache/kafka/clients/CommonClientConfigs.java  |  4 +--
 .../kafka/clients/consumer/StickyAssignor.java     | 17 ++++++-----
 .../producer/internals/TransactionManager.java     |  2 +-
 .../kafka/common/record/MultiRecordsSend.java      |  2 +-
 .../apache/kafka/common/utils/AppInfoParser.java   |  6 ++--
 core/src/main/scala/kafka/admin/AdminUtils.scala   |  4 +--
 .../coordinator/group/GroupMetadataManager.scala   |  4 +--
 .../transaction/TransactionStateManager.scala      |  1 -
 core/src/main/scala/kafka/log/AbstractIndex.scala  |  2 +-
 core/src/main/scala/kafka/log/LogCleaner.scala     |  6 ++--
 .../main/scala/kafka/log/LogCleanerManager.scala   |  2 --
 core/src/main/scala/kafka/log/LogManager.scala     | 34 +++++++++++-----------
 core/src/main/scala/kafka/log/OffsetIndex.scala    |  1 -
 .../main/scala/kafka/network/SocketServer.scala    |  8 ++---
 .../kafka/server/DelegationTokenManager.scala      | 18 ++++++------
 .../main/scala/kafka/server/ThrottledChannel.scala |  2 +-
 core/src/main/scala/kafka/tools/MirrorMaker.scala  |  4 +--
 .../kafka/tools/ReplicaVerificationTool.scala      |  6 ++--
 core/src/main/scala/kafka/utils/CoreUtils.scala    |  2 +-
 core/src/main/scala/kafka/utils/FileLock.scala     |  6 ++--
 .../main/scala/kafka/utils/KafkaScheduler.scala    |  2 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala      |  6 ++--
 .../kafka/api/AuthorizerIntegrationTest.scala      |  2 +-
 .../kafka/api/EndToEndAuthorizationTest.scala      |  2 +-
 .../kafka/server/DelayedFetchTest.scala            |  2 +-
 .../src/test/scala/other/kafka/StressTestLog.scala |  1 -
 .../transaction/TransactionStateManagerTest.scala  |  1 -
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  1 -
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |  1 -
 .../kafka/server/ReplicaManagerQuotasTest.scala    |  1 -
 .../scala/unit/kafka/server/SimpleFetchTest.scala  |  1 -
 .../server/epoch/LeaderEpochFileCacheTest.scala    |  1 -
 32 files changed, 71 insertions(+), 81 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index c8e2357..491b5de 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -113,8 +113,8 @@ public class CommonClientConfigs {
         HashMap<String, Object> rval = new HashMap<>();
         if ((!config.originals().containsKey(RECONNECT_BACKOFF_MAX_MS_CONFIG)) &&
                 config.originals().containsKey(RECONNECT_BACKOFF_MS_CONFIG)) {
-            log.debug("Disabling exponential reconnect backoff because " + RECONNECT_BACKOFF_MS_CONFIG
+
-                " is set, but " + RECONNECT_BACKOFF_MAX_MS_CONFIG + " is not.");
+            log.debug("Disabling exponential reconnect backoff because {} is set, but {}
is not.",
+                    RECONNECT_BACKOFF_MS_CONFIG, RECONNECT_BACKOFF_MAX_MS_CONFIG);
             rval.put(RECONNECT_BACKOFF_MAX_MS_CONFIG, parsedValues.get(RECONNECT_BACKOFF_MS_CONFIG));
         }
         return rval;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
index 0d74eed..4be34c2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
@@ -334,7 +334,7 @@ public class StickyAssignor extends AbstractPartitionAssignor {
             List<TopicPartition> topicPartitions = entry.getValue();
             for (TopicPartition topicPartition: topicPartitions) {
                 if (allPartitions.containsKey(topicPartition))
-                    log.error(topicPartition + " is assigned to more than one consumer.");
+                    log.error("{} is assigned to more than one consumer.", topicPartition);
                 allPartitions.put(topicPartition, entry.getKey());
             }
         }
@@ -356,7 +356,8 @@ public class StickyAssignor extends AbstractPartitionAssignor {
                     String otherConsumer = allPartitions.get(topicPartition);
                     int otherConsumerPartitionCount = currentAssignment.get(otherConsumer).size();
                     if (consumerPartitionCount < otherConsumerPartitionCount) {
-                        log.debug(topicPartition + " can be moved from consumer " + otherConsumer
+ " to consumer " + consumer + " for a more balanced assignment.");
+                        log.debug("{} can be moved from consumer {} to consumer {} for a
more balanced assignment.",
+                                topicPartition, otherConsumer, consumer);
                         return false;
                     }
                 }
@@ -499,7 +500,7 @@ public class StickyAssignor extends AbstractPartitionAssignor {
         int currentAssignmentSize = currentPartitions.size();
         int maxAssignmentSize = consumer2AllPotentialPartitions.get(consumer).size();
         if (currentAssignmentSize > maxAssignmentSize)
-            log.error("The consumer " + consumer + " is assigned more partitions than the
maximum possible.");
+            log.error("The consumer {} is assigned more partitions than the maximum possible.",
consumer);
 
         if (currentAssignmentSize < maxAssignmentSize)
             // if a consumer is not assigned all its potential partitions it is subject to
reassignment
@@ -598,12 +599,12 @@ public class StickyAssignor extends AbstractPartitionAssignor {
 
                 // the partition must have at least two consumers
                 if (partition2AllPotentialConsumers.get(partition).size() <= 1)
-                    log.error("Expected more than one potential consumer for partition '"
+ partition + "'");
+                    log.error("Expected more than one potential consumer for partition '{}'",
partition);
 
                 // the partition must have a current consumer
                 String consumer = currentPartitionConsumer.get(partition);
                 if (consumer == null)
-                    log.error("Expected partition '" + partition + "' to be assigned to a
consumer");
+                    log.error("Expected partition '{}' to be assigned to a consumer", partition);
 
                 // check if a better-suited consumer exist for the partition; if so, reassign
it
                 for (String otherConsumer: partition2AllPotentialConsumers.get(partition))
{
@@ -879,7 +880,7 @@ public class StickyAssignor extends AbstractPartitionAssignor {
                 List<String> path = new ArrayList<>(Collections.singleton(pair.srcMemberId));
                 if (isLinked(pair.dstMemberId, pair.srcMemberId, reducedPairs, path) &&
!in(path, cycles)) {
                     cycles.add(new ArrayList<>(path));
-                    log.error("A cycle of length " + (path.size() - 1) + " was found: " +
path.toString());
+                    log.error("A cycle of length {} was found: {}", path.size() - 1, path.toString());
                 }
             }
 
@@ -896,9 +897,9 @@ public class StickyAssignor extends AbstractPartitionAssignor {
             for (Map.Entry<String, Map<ConsumerPair, Set<TopicPartition>>>
topicMovements: this.partitionMovementsByTopic.entrySet()) {
                 Set<ConsumerPair> topicMovementPairs = topicMovements.getValue().keySet();
                 if (hasCycles(topicMovementPairs)) {
-                    log.error("Stickiness is violated for topic " + topicMovements.getKey()
+                    log.error("Stickiness is violated for topic {}"
                             + "\nPartition movements for this topic occurred among the following
consumer pairs:"
-                            + "\n" + topicMovements.getValue().toString());
+                            + "\n{}", topicMovements.getKey(), topicMovements.getValue().toString());
                     return false;
                 }
             }
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 2cbd1e9..620991b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -196,7 +196,7 @@ public class TransactionManager {
     }
 
     TransactionManager() {
-        this(new LogContext(), null, 0, 100);
+        this(new LogContext(), null, 0, 100L);
     }
 
     public synchronized TransactionalRequestResult initializeTransactions() {
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java b/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java
index 2bc8d1c..2e78a17 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java
@@ -102,7 +102,7 @@ public class MultiRecordsSend implements Send {
         totalWritten += totalWrittenPerCall;
 
         if (completed() && totalWritten != size)
-            log.error("mismatch in sending bytes over socket; expected: " + size + " actual:
" + totalWritten);
+            log.error("mismatch in sending bytes over socket; expected: {} actual: {}", size,
totalWritten);
 
         log.trace("Bytes written as part of multi-send call: {}, total bytes written so far:
{}, expected bytes to write: {}",
                 totalWrittenPerCall, totalWritten, size);
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java b/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
index 17c4ba1..8a12fbc 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
@@ -41,7 +41,7 @@ public class AppInfoParser {
         try (InputStream resourceStream = AppInfoParser.class.getResourceAsStream("/kafka/kafka-version.properties"))
{
             props.load(resourceStream);
         } catch (Exception e) {
-            log.warn("Error while loading kafka-version.properties :" + e.getMessage());
+            log.warn("Error while loading kafka-version.properties: {}", e.getMessage());
         }
         VERSION = props.getProperty("version", "unknown").trim();
         COMMIT_ID = props.getProperty("commitId", "unknown").trim();
@@ -106,8 +106,8 @@ public class AppInfoParser {
     public static class AppInfo implements AppInfoMBean {
 
         public AppInfo() {
-            log.info("Kafka version : " + AppInfoParser.getVersion());
-            log.info("Kafka commitId : " + AppInfoParser.getCommitId());
+            log.info("Kafka version: {}", AppInfoParser.getVersion());
+            log.info("Kafka commitId: {}", AppInfoParser.getCommitId());
         }
 
         @Override
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 4dae6eb..e62e5a8 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -473,10 +473,10 @@ object AdminUtils extends Logging with AdminUtilities {
       val jsonPartitionData = zkUtils.replicaAssignmentZkData(replicaAssignment.map(e =>
e._1.toString -> e._2))
 
       if (!update) {
-        info("Topic creation " + jsonPartitionData.toString)
+        info(s"Topic creation $jsonPartitionData")
         zkUtils.createPersistentPath(zkPath, jsonPartitionData)
       } else {
-        info("Topic update " + jsonPartitionData.toString)
+        info(s"Topic update $jsonPartitionData")
         zkUtils.updatePersistentPath(zkPath, jsonPartitionData)
       }
       debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData))
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 626aaad..260b802 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -29,7 +29,7 @@ import com.yammer.metrics.core.Gauge
 import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0, KAFKA_2_1_IV0, KAFKA_2_1_IV1}
 import kafka.common.{MessageFormatter, OffsetAndMetadata}
 import kafka.metrics.KafkaMetricsGroup
-import kafka.server.{FetchHighWatermark, FetchLogEnd, ReplicaManager}
+import kafka.server.ReplicaManager
 import kafka.utils.CoreUtils.inLock
 import kafka.utils._
 import kafka.zk.KafkaZkClient
@@ -40,7 +40,7 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.protocol.types.Type._
 import org.apache.kafka.common.protocol.types._
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.requests.{IsolationLevel, OffsetCommitRequest, OffsetFetchResponse}
+import org.apache.kafka.common.requests.{OffsetCommitRequest, OffsetFetchResponse}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{Time, Utils}
 
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 589407c..87e6d13 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -33,7 +33,6 @@ import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.{FileRecords, MemoryRecords, SimpleRecord}
-import org.apache.kafka.common.requests.IsolationLevel
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests.TransactionResult
 import org.apache.kafka.common.utils.{Time, Utils}
diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala
index bf5cc25..b75ab8a 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -17,7 +17,7 @@
 
 package kafka.log
 
-import java.io.{Closeable, File, IOException, RandomAccessFile}
+import java.io.{Closeable, File, RandomAccessFile}
 import java.nio.channels.FileChannel
 import java.nio.file.Files
 import java.nio.{ByteBuffer, MappedByteBuffer}
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index aca8154..8449e39 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -267,7 +267,7 @@ class LogCleaner(initialConfig: CleanerConfig,
    * choosing the dirtiest log, cleaning it, and then swapping in the cleaned segments.
    */
   private class CleanerThread(threadId: Int)
-    extends ShutdownableThread(name = "kafka-log-cleaner-thread-" + threadId, isInterruptible
= false) {
+    extends ShutdownableThread(name = s"kafka-log-cleaner-thread-$threadId", isInterruptible
= false) {
 
     protected override def loggerName = classOf[LogCleaner].getName
 
@@ -458,7 +458,7 @@ private[log] class Cleaner(val id: Int,
 
   protected override def loggerName = classOf[LogCleaner].getName
 
-  this.logIdent = "Cleaner " + id + ": "
+  this.logIdent = s"Cleaner $id: "
 
   /* buffer used for read i/o */
   private var readBuffer = ByteBuffer.allocate(ioBufferSize)
@@ -752,7 +752,7 @@ private[log] class Cleaner(val id: Int,
     if(readBuffer.capacity >= maxBufferSize || writeBuffer.capacity >= maxBufferSize)
       throw new IllegalStateException("This log contains a message larger than maximum allowable
size of %s.".format(maxBufferSize))
     val newSize = math.min(this.readBuffer.capacity * 2, maxBufferSize)
-    info("Growing cleaner I/O buffers from " + readBuffer.capacity + "bytes to " + newSize
+ " bytes.")
+    info(s"Growing cleaner I/O buffers from ${readBuffer.capacity} bytes to $newSize bytes.")
     this.readBuffer = ByteBuffer.allocate(newSize)
     this.writeBuffer = ByteBuffer.allocate(newSize)
   }
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index 13d14c1..2fc7b74 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -59,8 +59,6 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
                                      val logs: Pool[TopicPartition, Log],
                                      val logDirFailureChannel: LogDirFailureChannel) extends
Logging with KafkaMetricsGroup {
 
-  import LogCleanerManager._
-
   protected override def loggerName = classOf[LogCleaner].getName
 
   // package-private for testing
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index bcf3801..39029b0 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -256,7 +256,7 @@ class LogManager(logDirs: Seq[File],
   private[log] def hasLogsToBeDeleted: Boolean = !logsToBeDeleted.isEmpty
 
   private def loadLog(logDir: File, recoveryPoints: Map[TopicPartition, Long], logStartOffsets:
Map[TopicPartition, Long]): Unit = {
-    debug("Loading log '" + logDir.getName + "'")
+    debug(s"Loading log '${logDir.getName}'")
     val topicPartition = Log.parseTopicPartitionName(logDir)
     val config = topicConfigs.getOrElse(topicPartition.topic, currentDefaultConfig)
     val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
@@ -324,7 +324,7 @@ class LogManager(logDirs: Seq[File],
           recoveryPoints = this.recoveryPointCheckpoints(dir).read
         } catch {
           case e: Exception =>
-            warn("Error occurred while reading recovery-point-offset-checkpoint file of directory
" + dir, e)
+            warn(s"Error occurred while reading recovery-point-offset-checkpoint file of
directory $dir", e)
             warn("Resetting the recovery checkpoint to 0")
         }
 
@@ -333,7 +333,7 @@ class LogManager(logDirs: Seq[File],
           logStartOffsets = this.logStartOffsetCheckpoints(dir).read
         } catch {
           case e: Exception =>
-            warn("Error occurred while reading log-start-offset-checkpoint file of directory
" + dir, e)
+            warn(s"Error occurred while reading log-start-offset-checkpoint file of directory
$dir", e)
         }
 
         val jobsForDir = for {
@@ -346,7 +346,7 @@ class LogManager(logDirs: Seq[File],
             } catch {
               case e: IOException =>
                 offlineDirs.add((dir.getAbsolutePath, e))
-                error("Error while loading log dir " + dir.getAbsolutePath, e)
+                error(s"Error while loading log dir ${dir.getAbsolutePath}", e)
             }
           }
         }
@@ -354,7 +354,7 @@ class LogManager(logDirs: Seq[File],
       } catch {
         case e: IOException =>
           offlineDirs.add((dir.getAbsolutePath, e))
-          error("Error while loading log dir " + dir.getAbsolutePath, e)
+          error(s"Error while loading log dir ${dir.getAbsolutePath}", e)
       }
     }
 
@@ -375,7 +375,7 @@ class LogManager(logDirs: Seq[File],
       }
     } catch {
       case e: ExecutionException =>
-        error("There was an error in one of the threads during logs loading: " + e.getCause)
+        error(s"There was an error in one of the threads during logs loading: ${e.getCause}")
         throw e.getCause
     } finally {
       threadPools.foreach(_.shutdown())
@@ -442,7 +442,7 @@ class LogManager(logDirs: Seq[File],
 
     // close logs in each dir
     for (dir <- liveLogDirs) {
-      debug("Flushing and closing logs at " + dir)
+      debug(s"Flushing and closing logs at $dir")
 
       val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir)
       threadPools.append(pool)
@@ -465,19 +465,19 @@ class LogManager(logDirs: Seq[File],
         dirJobs.foreach(_.get)
 
         // update the last flush point
-        debug("Updating recovery points at " + dir)
+        debug(s"Updating recovery points at $dir")
         checkpointLogRecoveryOffsetsInDir(dir)
 
-        debug("Updating log start offsets at " + dir)
+        debug(s"Updating log start offsets at $dir")
         checkpointLogStartOffsetsInDir(dir)
 
         // mark that the shutdown was clean by creating marker file
-        debug("Writing clean shutdown marker at " + dir)
+        debug(s"Writing clean shutdown marker at $dir")
         CoreUtils.swallow(Files.createFile(new File(dir, Log.CleanShutdownFile).toPath),
this)
       }
     } catch {
       case e: ExecutionException =>
-        error("There was an error in one of the threads during LogManager shutdown: " + e.getCause)
+        error(s"There was an error in one of the threads during LogManager shutdown: ${e.getCause}")
         throw e.getCause
     } finally {
       threadPools.foreach(_.shutdown())
@@ -894,13 +894,13 @@ class LogManager(logDirs: Seq[File],
     try {
       deletableLogs.foreach {
         case (topicPartition, log) =>
-          debug("Garbage collecting '" + log.name + "'")
+          debug(s"Garbage collecting '${log.name}'")
           total += log.deleteOldSegments()
 
           val futureLog = futureLogs.get(topicPartition)
           if (futureLog != null) {
             // clean future logs
-            debug("Garbage collecting future log '" + futureLog.name + "'")
+            debug(s"Garbage collecting future log '${futureLog.name}'")
             total += futureLog.deleteOldSegments()
           }
       }
@@ -910,7 +910,7 @@ class LogManager(logDirs: Seq[File],
       }
     }
 
-    debug("Log cleanup completed. " + total + " files deleted in " +
+    debug(s"Log cleanup completed. $total files deleted in " +
                   (time.milliseconds - startMs) / 1000 + " seconds")
   }
 
@@ -952,13 +952,13 @@ class LogManager(logDirs: Seq[File],
     for ((topicPartition, log) <- currentLogs.toList ++ futureLogs.toList) {
       try {
         val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
-        debug("Checking if flush is needed on " + topicPartition.topic + " flush interval
 " + log.config.flushMs +
-              " last flushed " + log.lastFlushTime + " time since last flush: " + timeSinceLastFlush)
+        debug(s"Checking if flush is needed on ${topicPartition.topic} flush interval ${log.config.flushMs}"
+
+              s" last flushed ${log.lastFlushTime} time since last flush: $timeSinceLastFlush")
         if(timeSinceLastFlush >= log.config.flushMs)
           log.flush
       } catch {
         case e: Throwable =>
-          error("Error flushing topic " + topicPartition.topic, e)
+          error(s"Error flushing topic ${topicPartition.topic}", e)
       }
     }
   }
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index 23dabf7..6f246ee 100755
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -20,7 +20,6 @@ package kafka.log
 import java.io.File
 import java.nio.ByteBuffer
 
-import kafka.common.IndexOffsetOverflowException
 import kafka.utils.CoreUtils.inLock
 import org.apache.kafka.common.errors.InvalidOffsetException
 
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index b5b3e4d..ae09a03 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -133,7 +133,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val
time: Time
         }
       }
     )
-    info("Started " + acceptors.size + " acceptor threads")
+    info(s"Started ${acceptors.size} acceptor threads")
   }
 
   /**
@@ -335,7 +335,7 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ
    */
   def close(channel: SocketChannel): Unit = {
     if (channel != null) {
-      debug("Closing connection from " + channel.socket.getRemoteSocketAddress())
+      debug(s"Closing connection from ${channel.socket.getRemoteSocketAddress()}")
       connectionQuotas.dec(channel.socket.getInetAddress)
       CoreUtils.swallow(channel.socket().close(), this, Level.ERROR)
       CoreUtils.swallow(channel.close(), this, Level.ERROR)
@@ -627,7 +627,7 @@ private[kafka] class Processor(val id: Int,
         }
       }
     } finally {
-      debug("Closing selector - processor " + id)
+      debug(s"Closing selector - processor $id")
       CoreUtils.swallow(closeAll(), this, Level.ERROR)
       shutdownComplete()
     }
@@ -658,7 +658,7 @@ private[kafka] class Processor(val id: Int,
             // There is no response to send to the client, we need to read more pipelined
requests
             // that are sitting in the server's socket buffer
             updateRequestMetrics(response)
-            trace("Socket server received empty response to send, registering for read: "
+ response)
+            trace(s"Socket server received empty response to send, registering for read:
$response")
             // Try unmuting the channel. If there was no quota violation and the channel
has not been throttled,
             // it will be unmuted immediately. If the channel has been throttled, it will
be unmuted only if the
             // throttling delay has already passed by now.
diff --git a/core/src/main/scala/kafka/server/DelegationTokenManager.scala b/core/src/main/scala/kafka/server/DelegationTokenManager.scala
index 0eb9b07..90025f4 100644
--- a/core/src/main/scala/kafka/server/DelegationTokenManager.scala
+++ b/core/src/main/scala/kafka/server/DelegationTokenManager.scala
@@ -161,7 +161,7 @@ class DelegationTokenManager(val config: KafkaConfig,
                              val tokenCache: DelegationTokenCache,
                              val time: Time,
                              val zkClient: KafkaZkClient) extends Logging with KafkaMetricsGroup
{
-  this.logIdent = "[Token Manager on Broker " + config.brokerId + "]: "
+  this.logIdent = s"[Token Manager on Broker ${config.brokerId}]: "
 
   import DelegationTokenManager._
 
@@ -201,7 +201,7 @@ class DelegationTokenManager(val config: KafkaConfig,
   private def loadCache() {
     lock.synchronized {
       val tokens = zkClient.getChildren(DelegationTokensZNode.path)
-      info(s"Loading the token cache. Total token count : " + tokens.size)
+      info(s"Loading the token cache. Total token count: ${tokens.size}")
       for (tokenId <- tokens) {
         try {
           getTokenFromZk(tokenId) match {
@@ -209,7 +209,7 @@ class DelegationTokenManager(val config: KafkaConfig,
             case None =>
           }
         } catch {
-          case ex: Throwable => error(s"Error while getting Token for tokenId :$tokenId",
ex)
+          case ex: Throwable => error(s"Error while getting Token for tokenId: $tokenId",
ex)
         }
       }
     }
@@ -279,7 +279,7 @@ class DelegationTokenManager(val config: KafkaConfig,
         val hmac = createHmac(tokenId, secretKey)
         val token = new DelegationToken(tokenInfo, hmac)
         updateToken(token)
-        info(s"Created a delegation token : $tokenId for owner : $owner")
+        info(s"Created a delegation token: $tokenId for owner: $owner")
         responseCallback(CreateTokenResult(issueTimeStamp, expiryTimeStamp, maxLifeTimeStamp,
tokenId, hmac, Errors.NONE))
       }
     }
@@ -317,7 +317,7 @@ class DelegationTokenManager(val config: KafkaConfig,
               tokenInfo.setExpiryTimestamp(expiryTimeStamp)
 
               updateToken(token)
-              info(s"Delegation token renewed for token : " + tokenInfo.tokenId + " for owner
:" + tokenInfo.owner)
+              info(s"Delegation token renewed for token: ${tokenInfo.tokenId} for owner:
${tokenInfo.owner}")
               renewCallback(Errors.NONE, expiryTimeStamp)
             }
           }
@@ -412,7 +412,7 @@ class DelegationTokenManager(val config: KafkaConfig,
               expireResponseCallback(Errors.DELEGATION_TOKEN_EXPIRED, -1)
             } else if (expireLifeTimeMs < 0) { //expire immediately
               removeToken(tokenInfo.tokenId)
-              info(s"Token expired for token : " + tokenInfo.tokenId + " for owner :" + tokenInfo.owner)
+              info(s"Token expired for token: ${tokenInfo.tokenId} for owner: ${tokenInfo.owner}")
               expireResponseCallback(Errors.NONE, now)
             } else {
               //set expiry time stamp
@@ -420,7 +420,7 @@ class DelegationTokenManager(val config: KafkaConfig,
               tokenInfo.setExpiryTimestamp(expiryTimeStamp)
 
               updateToken(token)
-              info(s"Updated expiry time for token : " + tokenInfo.tokenId + " for owner
:" + tokenInfo.owner)
+              info(s"Updated expiry time for token: ${tokenInfo.tokenId} for owner: ${tokenInfo.owner}")
               expireResponseCallback(Errors.NONE, expiryTimeStamp)
             }
           }
@@ -457,7 +457,7 @@ class DelegationTokenManager(val config: KafkaConfig,
       for (tokenInfo <- getAllTokenInformation) {
         val now = time.milliseconds
         if (tokenInfo.maxTimestamp < now || tokenInfo.expiryTimestamp < now) {
-          info(s"Delegation token expired for token : " + tokenInfo.tokenId + " for owner
:" + tokenInfo.owner)
+          info(s"Delegation token expired for token: ${tokenInfo.tokenId} for owner: ${tokenInfo.owner}")
           removeToken(tokenInfo.tokenId)
         }
       }
@@ -480,7 +480,7 @@ class DelegationTokenManager(val config: KafkaConfig,
     override def processNotification(tokenIdBytes: Array[Byte]) {
       lock.synchronized {
         val tokenId = new String(tokenIdBytes, StandardCharsets.UTF_8)
-        info(s"Processing Token Notification for tokenId : $tokenId")
+        info(s"Processing Token Notification for tokenId: $tokenId")
         getTokenFromZk(tokenId) match {
           case Some(token) => updateCache(token)
           case None => removeCache(tokenId)
diff --git a/core/src/main/scala/kafka/server/ThrottledChannel.scala b/core/src/main/scala/kafka/server/ThrottledChannel.scala
index 8fe8649..c46188f 100644
--- a/core/src/main/scala/kafka/server/ThrottledChannel.scala
+++ b/core/src/main/scala/kafka/server/ThrottledChannel.scala
@@ -42,7 +42,7 @@ class ThrottledChannel(val request: RequestChannel.Request, val time: Time,
val
 
   // Notify the socket server that throttling has been done for this channel.
   def notifyThrottlingDone(): Unit = {
-    trace("Channel throttled for: " + throttleTimeMs + " ms")
+    trace(s"Channel throttled for: $throttleTimeMs ms")
     channelThrottlingCallback(new network.RequestChannel.EndThrottlingResponse(request))
   }
 
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index b6fd918..de2eba1 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -359,7 +359,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
         record.headers)
 
     override def run() {
-      info("Starting mirror maker thread " + threadName)
+      info(s"Starting mirror maker thread $threadName")
       try {
         consumerWrapper.init()
 
@@ -425,7 +425,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
 
     def shutdown() {
       try {
-        info(threadName + " shutting down")
+        info(s"$threadName shutting down")
         shuttingDown = true
         consumerWrapper.wakeup()
       }
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 9a5ac7b..6edd315 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -118,7 +118,7 @@ object ReplicaVerificationTool extends Logging {
     try Pattern.compile(regex)
     catch {
       case _: PatternSyntaxException =>
-        throw new RuntimeException(regex + " is an invalid regex.")
+        throw new RuntimeException(s"$regex is an invalid regex.")
     }
 
     val fetchSize = options.valueOf(fetchSizeOpt).intValue
@@ -199,7 +199,7 @@ object ReplicaVerificationTool extends Logging {
       }
     })
     fetcherThreads.foreach(_.start())
-    println(ReplicaVerificationTool.getCurrentTimeString() + ": verification process is started.")
+    println(s"${ReplicaVerificationTool.getCurrentTimeString()}: verification process is
started.")
 
   }
 
@@ -300,7 +300,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicPartition: Map[TopicPartitio
     debug("Begin verification")
     maxLag = -1L
     for ((topicPartition, fetchResponsePerReplica) <- recordsCache) {
-      debug("Verifying " + topicPartition)
+      debug(s"Verifying $topicPartition")
       assert(fetchResponsePerReplica.size == expectedReplicasPerTopicPartition(topicPartition),
         "fetched " + fetchResponsePerReplica.size + " replicas for " + topicPartition + ",
but expected "
           + expectedReplicasPerTopicPartition(topicPartition) + " replicas")
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 3a4399c..a902701 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -147,7 +147,7 @@ object CoreUtils extends Logging {
       }
     } catch {
       case e: Exception => {
-        error("Failed to register Mbean " + name, e)
+        error(s"Failed to register Mbean $name", e)
         false
       }
     }
diff --git a/core/src/main/scala/kafka/utils/FileLock.scala b/core/src/main/scala/kafka/utils/FileLock.scala
index c0afbfb..fd31b12 100644
--- a/core/src/main/scala/kafka/utils/FileLock.scala
+++ b/core/src/main/scala/kafka/utils/FileLock.scala
@@ -36,7 +36,7 @@ class FileLock(val file: File) extends Logging {
    */
   def lock() {
     this synchronized {
-      trace("Acquiring lock on " + file.getAbsolutePath)
+      trace(s"Acquiring lock on ${file.getAbsolutePath}")
       flock = channel.lock()
     }
   }
@@ -46,7 +46,7 @@ class FileLock(val file: File) extends Logging {
    */
   def tryLock(): Boolean = {
     this synchronized {
-      trace("Acquiring lock on " + file.getAbsolutePath)
+      trace(s"Acquiring lock on ${file.getAbsolutePath}")
       try {
         // weirdly this method will return null if the lock is held by another
         // process, but will throw an exception if the lock is held by this process
@@ -64,7 +64,7 @@ class FileLock(val file: File) extends Logging {
    */
   def unlock() {
     this synchronized {
-      trace("Releasing lock on " + file.getAbsolutePath)
+      trace(s"Releasing lock on ${file.getAbsolutePath}")
       if(flock != null)
         flock.release()
     }
diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
index 24eb177..b4fae0b 100755
--- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala
+++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
@@ -113,7 +113,7 @@ class KafkaScheduler(val threads: Int,
           trace("Beginning execution of scheduled task '%s'.".format(name))
           fun()
         } catch {
-          case t: Throwable => error("Uncaught exception in scheduled task '" + name +"'",
t)
+          case t: Throwable => error(s"Uncaught exception in scheduled task '$name'",
t)
         } finally {
           trace("Completed execution of scheduled task '%s'.".format(name))
         }
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 331f7bb..dd85090 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -220,7 +220,7 @@ class ZkUtils(val zkClient: ZkClient,
           + "Probably this controller is still using the old format [%s] to store the broker
id in zookeeper".format(controllerInfoString))
         try controllerInfoString.toInt
         catch {
-          case t: Throwable => throw new KafkaException("Failed to parse the controller
info: " + controllerInfoString + ". This is neither the new or the old format.", t)
+          case t: Throwable => throw new KafkaException(s"Failed to parse the controller
info: $controllerInfoString. This is neither the new or the old format.", t)
         }
     }
   }
@@ -415,11 +415,11 @@ class ZkUtils(val zkClient: ZkClient,
           case _: ZkNoNodeException => // the node disappeared; treat as if node existed
and let caller handles this
         }
         if (storedData == null || storedData != data) {
-          info("conflict in " + path + " data: " + data + " stored data: " + storedData)
+          info(s"conflict in $path data: $data stored data: $storedData")
           throw e
         } else {
           // otherwise, the creation succeeded, return normally
-          info(path + " exists with value " + data + " during connection loss; this is ok")
+          info(s"$path exists with value $data during connection loss; this is ok")
         }
     }
   }
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index a2f1049..9ef7214 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -25,7 +25,7 @@ import kafka.network.SocketServer
 import kafka.security.auth._
 import kafka.server.{BaseRequestTest, KafkaConfig}
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewPartitions}
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.clients.producer._
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index a1f2cff..7fd68c2 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -18,7 +18,7 @@
 package kafka.api
 
 import com.yammer.metrics.Metrics
-import com.yammer.metrics.core.{Gauge, Metric, MetricName}
+import com.yammer.metrics.core.Gauge
 
 import java.io.File
 import java.util.ArrayList
diff --git a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
index 890ea3b..9ed2274 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
@@ -23,7 +23,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.FencedLeaderEpochException
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.MemoryRecords
-import org.apache.kafka.common.requests.{FetchRequest, IsolationLevel}
+import org.apache.kafka.common.requests.FetchRequest
 import org.easymock.{EasyMock, EasyMockSupport}
 import org.junit.Test
 import org.junit.Assert._
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
index 87e26fe..7821a0e 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -25,7 +25,6 @@ import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
 import kafka.utils._
 import org.apache.kafka.clients.consumer.OffsetOutOfRangeException
 import org.apache.kafka.common.record.FileRecords
-import org.apache.kafka.common.requests.IsolationLevel
 import org.apache.kafka.common.utils.Utils
 
 /**
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index d2fe7ea..6004cc0 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -28,7 +28,6 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.requests.IsolationLevel
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests.TransactionResult
 import org.apache.kafka.common.utils.MockTime
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index b903c4a..a10800f 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -17,7 +17,6 @@
 
 package kafka.server
 
-import java.lang.{Long => JLong}
 import java.net.InetAddress
 import java.util
 import java.util.{Collections, Optional}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 9ca7256..10dba1a 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -22,7 +22,6 @@ import java.util.Properties
 import kafka.api.{ApiVersion, KAFKA_0_8_2}
 import kafka.cluster.EndPoint
 import kafka.message._
-import kafka.metrics.KafkaMetricsConfig
 import kafka.utils.{CoreUtils, TestUtils}
 import org.apache.kafka.common.config.ConfigException
 import org.apache.kafka.common.metrics.Sensor
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index 9b59e71..0c6d09c 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -30,7 +30,6 @@ import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRec
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.easymock.EasyMock
 import EasyMock._
-import org.apache.kafka.common.requests.IsolationLevel
 import org.junit.Assert._
 import org.junit.{After, Test}
 
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index ecfbd73..1bb0e20 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicBoolean
 
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
-import org.apache.kafka.common.requests.IsolationLevel
 import org.easymock.EasyMock
 import org.junit.Assert._
 
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
index 7ac606a..fc4dcfd 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
@@ -19,7 +19,6 @@ package kafka.server.epoch
 
 import java.io.File
 
-import kafka.server.LogOffsetMetadata
 import kafka.server.checkpoints.{LeaderEpochCheckpoint, LeaderEpochCheckpointFile}
 import org.apache.kafka.common.requests.EpochEndOffset.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
 import kafka.utils.TestUtils


Mime
View raw message