kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: MINOR: Include response in request log
Date Fri, 08 Sep 2017 03:41:46 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 07a428e0c -> 27336192f


MINOR: Include response in request log

It's implemented such that there is no overhead if request logging is
disabled.

Also:
- Reduce metrics computation duplication in `updateRequestMetrics`
- Change a couple of log calls to use string interpolation instead of `format`
- Fix a few compiler warnings related to unused imports and unused default
arguments.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Roger Hoover <roger.hoover@gmail.com>, Jason Gustafson <jason@confluent.io>,
Rajini Sivaram <rajinisivaram@googlemail.com>

Closes #3801 from ijuma/log-response-in-request-log


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/27336192
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/27336192
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/27336192

Branch: refs/heads/trunk
Commit: 27336192ff62d43d0beabc5dab2855b87befcc55
Parents: 07a428e
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Fri Sep 8 04:40:24 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Fri Sep 8 04:40:24 2017 +0100

----------------------------------------------------------------------
 .../ZkNodeChangeNotificationListener.scala      |  2 -
 .../scala/kafka/network/RequestChannel.scala    | 93 +++++++++++---------
 .../main/scala/kafka/network/SocketServer.scala | 17 ++--
 .../src/main/scala/kafka/server/KafkaApis.scala |  9 +-
 .../kafka/server/KafkaRequestHandler.scala      |  4 +-
 .../kafka/api/ConsumerBounceTest.scala          |  2 +-
 .../kafka/api/EndToEndClusterIdTest.scala       |  2 +-
 .../kafka/api/TransactionsTest.scala            |  2 +-
 .../kafka/log/LogCleanerIntegrationTest.scala   |  2 +-
 .../unit/kafka/log/LogCleanerManagerTest.scala  |  4 +-
 .../scala/unit/kafka/log/LogCleanerTest.scala   |  4 +-
 .../scala/unit/kafka/log/LogSegmentTest.scala   |  2 +-
 .../scala/unit/kafka/log/LogValidatorTest.scala |  4 +-
 .../unit/kafka/network/SocketServerTest.scala   |  6 +-
 14 files changed, 83 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
index 99b5103..b4ee1fd 100644
--- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
+++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
@@ -24,8 +24,6 @@ import org.I0Itec.zkclient.exception.ZkInterruptedException
 import org.I0Itec.zkclient.{IZkChildListener, IZkStateListener}
 import org.apache.kafka.common.utils.Time
 
-import scala.collection.JavaConverters._
-
 /**
  * Handle the notificationMessage.
  */

http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 1d95fa0..2d6370a 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -27,8 +27,8 @@ import kafka.network.RequestChannel.{ShutdownRequest, BaseRequest}
 import kafka.server.QuotaId
 import kafka.utils.{Logging, NotNothing}
 import org.apache.kafka.common.memory.MemoryPool
-import org.apache.kafka.common.network.{ListenerName, Send}
-import org.apache.kafka.common.protocol.{ApiKeys, Protocol, SecurityProtocol}
+import org.apache.kafka.common.network.Send
+import org.apache.kafka.common.protocol.{ApiKeys, Protocol}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.Time
@@ -39,6 +39,8 @@ import scala.reflect.ClassTag
 object RequestChannel extends Logging {
   private val requestLogger = Logger.getLogger("kafka.request.logger")
 
+  def isRequestLoggingEnabled: Boolean = requestLogger.isDebugEnabled
+
   sealed trait BaseRequest
   case object ShutdownRequest extends BaseRequest
 
@@ -90,7 +92,7 @@ object RequestChannel extends Logging {
       math.max(apiLocalCompleteTimeNanos - requestDequeueTimeNanos, 0L)
     }
 
-    def updateRequestMetrics(networkThreadTimeNanos: Long) {
+    def updateRequestMetrics(networkThreadTimeNanos: Long, response: Response) {
       val endTimeNanos = Time.SYSTEM.nanoseconds
       // In some corner cases, apiLocalCompleteTimeNanos may not be set when the request
completes if the remote
       // processing time is really small. This value is set in KafkaApis from a request handling
thread.
@@ -103,15 +105,23 @@ object RequestChannel extends Logging {
       if (apiRemoteCompleteTimeNanos < 0)
         apiRemoteCompleteTimeNanos = responseCompleteTimeNanos
 
-      def nanosToMs(nanos: Long) = math.max(TimeUnit.NANOSECONDS.toMillis(nanos), 0)
+      /**
+       * Converts nanos to millis with micros precision as additional decimal places in the
request log have low
+       * signal to noise ratio. When it comes to metrics, there is little difference either
way as we round the value
+       * to the nearest long.
+       */
+      def nanosToMs(nanos: Long): Double = {
+        val positiveNanos = math.max(nanos, 0)
+        TimeUnit.NANOSECONDS.toMicros(positiveNanos).toDouble / TimeUnit.MILLISECONDS.toMicros(1)
+      }
 
-      val requestQueueTime = nanosToMs(requestDequeueTimeNanos - startTimeNanos)
-      val apiLocalTime = nanosToMs(apiLocalCompleteTimeNanos - requestDequeueTimeNanos)
-      val apiRemoteTime = nanosToMs(apiRemoteCompleteTimeNanos - apiLocalCompleteTimeNanos)
-      val apiThrottleTime = nanosToMs(responseCompleteTimeNanos - apiRemoteCompleteTimeNanos)
-      val responseQueueTime = nanosToMs(responseDequeueTimeNanos - responseCompleteTimeNanos)
-      val responseSendTime = nanosToMs(endTimeNanos - responseDequeueTimeNanos)
-      val totalTime = nanosToMs(endTimeNanos - startTimeNanos)
+      val requestQueueTimeMs = nanosToMs(requestDequeueTimeNanos - startTimeNanos)
+      val apiLocalTimeMs = nanosToMs(apiLocalCompleteTimeNanos - requestDequeueTimeNanos)
+      val apiRemoteTimeMs = nanosToMs(apiRemoteCompleteTimeNanos - apiLocalCompleteTimeNanos)
+      val apiThrottleTimeMs = nanosToMs(responseCompleteTimeNanos - apiRemoteCompleteTimeNanos)
+      val responseQueueTimeMs = nanosToMs(responseDequeueTimeNanos - responseCompleteTimeNanos)
+      val responseSendTimeMs = nanosToMs(endTimeNanos - responseDequeueTimeNanos)
+      val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos)
       val fetchMetricNames =
         if (header.apiKey == ApiKeys.FETCH) {
           val isFromFollower = body[FetchRequest].isFromFollower
@@ -125,13 +135,13 @@ object RequestChannel extends Logging {
       metricNames.foreach { metricName =>
         val m = RequestMetrics.metricsMap(metricName)
         m.requestRate.mark()
-        m.requestQueueTimeHist.update(requestQueueTime)
-        m.localTimeHist.update(apiLocalTime)
-        m.remoteTimeHist.update(apiRemoteTime)
-        m.throttleTimeHist.update(apiThrottleTime)
-        m.responseQueueTimeHist.update(responseQueueTime)
-        m.responseSendTimeHist.update(responseSendTime)
-        m.totalTimeHist.update(totalTime)
+        m.requestQueueTimeHist.update(Math.round(requestQueueTimeMs))
+        m.localTimeHist.update(Math.round(apiLocalTimeMs))
+        m.remoteTimeHist.update(Math.round(apiRemoteTimeMs))
+        m.throttleTimeHist.update(Math.round(apiThrottleTimeMs))
+        m.responseQueueTimeHist.update(Math.round(responseQueueTimeMs))
+        m.responseSendTimeHist.update(Math.round(responseSendTimeMs))
+        m.totalTimeHist.update(Math.round(totalTimeMs))
       }
 
       // Records network handler thread usage. This is included towards the request quota
for the
@@ -142,28 +152,26 @@ object RequestChannel extends Logging {
       // the total time spent on authentication, which may be significant for SASL/SSL.
       recordNetworkThreadTimeCallback.foreach(record => record(networkThreadTimeNanos))
 
-      if (requestLogger.isDebugEnabled) {
+      if (isRequestLoggingEnabled) {
         val detailsEnabled = requestLogger.isTraceEnabled
-        def nanosToMs(nanos: Long) = TimeUnit.NANOSECONDS.toMicros(math.max(nanos, 0)).toDouble
/ TimeUnit.MILLISECONDS.toMicros(1)
-        val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos)
-        val requestQueueTimeMs = nanosToMs(requestDequeueTimeNanos - startTimeNanos)
-        val apiLocalTimeMs = nanosToMs(apiLocalCompleteTimeNanos - requestDequeueTimeNanos)
-        val apiRemoteTimeMs = nanosToMs(apiRemoteCompleteTimeNanos - apiLocalCompleteTimeNanos)
-        val apiThrottleTimeMs = nanosToMs(responseCompleteTimeNanos - apiRemoteCompleteTimeNanos)
-        val responseQueueTimeMs = nanosToMs(responseDequeueTimeNanos - responseCompleteTimeNanos)
-        val responseSendTimeMs = nanosToMs(endTimeNanos - responseDequeueTimeNanos)
-
-        requestLogger.debug(s"Completed request:${requestDesc(detailsEnabled)} from connection
${context.connectionId};" +
-          s"totalTime:$totalTimeMs," +
-          s"requestQueueTime:$requestQueueTimeMs," +
-          s"localTime:$apiLocalTimeMs," +
-          s"remoteTime:$apiRemoteTimeMs," +
-          s"throttleTime:$apiThrottleTimeMs," +
-          s"responseQueueTime:$responseQueueTimeMs," +
-          s"sendTime:$responseSendTimeMs," +
-          s"securityProtocol:${context.securityProtocol}," +
-          s"principal:${context.principal}," +
-          s"listener:${context.listenerName.value}")
+        val responseString = response.responseAsString.getOrElse(
+          throw new IllegalStateException("responseAsString should always be defined if request
logging is enabled"))
+
+        val builder = new StringBuilder(256)
+        builder.append("Completed request:").append(requestDesc(detailsEnabled))
+          .append(",response:").append(responseString)
+          .append(" from connection ").append(context.connectionId)
+          .append(";totalTime:").append(totalTimeMs)
+          .append(",requestQueueTime:").append(requestQueueTimeMs)
+          .append(",localTime:").append(apiLocalTimeMs)
+          .append(",remoteTime:").append(apiRemoteTimeMs)
+          .append(",throttleTime:").append(apiThrottleTimeMs)
+          .append(",responseQueueTime:").append(responseQueueTimeMs)
+          .append(",sendTime:").append(responseSendTimeMs)
+          .append(",securityProtocol:").append(context.securityProtocol)
+          .append(",principal:").append(session.principal)
+          .append(",listener:").append(context.listenerName.value)
+        requestLogger.debug(builder.toString)
       }
     }
 
@@ -183,13 +191,16 @@ object RequestChannel extends Logging {
 
   }
 
-  class Response(val request: Request, val responseSend: Option[Send], val responseAction:
ResponseAction) {
+  /** responseAsString should only be defined if request logging is enabled */
+  class Response(val request: Request, val responseSend: Option[Send], val responseAction:
ResponseAction,
+                 val responseAsString: Option[String]) {
     request.responseCompleteTimeNanos = Time.SYSTEM.nanoseconds
     if (request.apiLocalCompleteTimeNanos == -1L) request.apiLocalCompleteTimeNanos = Time.SYSTEM.nanoseconds
 
     def processor: Int = request.processor
 
-    override def toString = s"Response(request=$request, responseSend=$responseSend, responseAction=$responseAction)"
+    override def toString =
+      s"Response(request=$request, responseSend=$responseSend, responseAction=$responseAction),
responseAsString=$responseAsString"
   }
 
   trait ResponseAction

http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index f1bc926..3d54c8a 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -36,8 +36,8 @@ import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool}
 import org.apache.kafka.common.metrics._
 import org.apache.kafka.common.metrics.stats.Rate
 import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, ListenerName, Selectable,
Send, Selector => KSelector}
+import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
 import org.apache.kafka.common.protocol.types.SchemaException
 import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
 import org.apache.kafka.common.utils.{KafkaThread, Time}
@@ -484,7 +484,7 @@ private[kafka] class Processor(val id: Int,
           case RequestChannel.NoOpAction =>
             // There is no response to send to the client, we need to read more pipelined
requests
             // that are sitting in the server's socket buffer
-            updateRequestMetrics(curr.request)
+            updateRequestMetrics(curr)
             trace("Socket server received empty response to send, registering for read: "
+ curr)
             val channelId = curr.request.context.connectionId
             if (selector.channel(channelId) != null || selector.closingChannel(channelId)
!= null)
@@ -494,7 +494,7 @@ private[kafka] class Processor(val id: Int,
               throw new IllegalStateException(s"responseSend must be defined for SendAction,
response: $curr"))
             sendResponse(curr, responseSend)
           case RequestChannel.CloseConnectionAction =>
-            updateRequestMetrics(curr.request)
+            updateRequestMetrics(curr)
             trace("Closing socket connection actively according to the response code.")
             close(selector, curr.request.context.connectionId)
         }
@@ -511,7 +511,7 @@ private[kafka] class Processor(val id: Int,
     // `channel` can be None if the connection was closed remotely or if selector closed
it for being idle for too long
     if (channel(connectionId).isEmpty) {
       warn(s"Attempting to send response via channel for which there is no open connection,
connection id $connectionId")
-      response.request.updateRequestMetrics(0L)
+      response.request.updateRequestMetrics(0L, response)
     }
     // Invoke send for closingChannel as well so that the send is failed and the channel
closed properly and
     // removed from the Selector after discarding any pending staged receives.
@@ -561,14 +561,15 @@ private[kafka] class Processor(val id: Int,
       val resp = inflightResponses.remove(send.destination).getOrElse {
         throw new IllegalStateException(s"Send for ${send.destination} completed, but not
in `inflightResponses`")
       }
-      updateRequestMetrics(resp.request)
+      updateRequestMetrics(resp)
       selector.unmute(send.destination)
     }
   }
 
-  private def updateRequestMetrics(request: RequestChannel.Request) {
+  private def updateRequestMetrics(response: RequestChannel.Response) {
+    val request = response.request
     val networkThreadTimeNanos = openOrClosingChannel(request.context.connectionId).fold(0L)(_.getAndResetNetworkThreadTimeNanos())
-    request.updateRequestMetrics(networkThreadTimeNanos)
+    request.updateRequestMetrics(networkThreadTimeNanos, response)
   }
 
   private def processDisconnected() {
@@ -576,7 +577,7 @@ private[kafka] class Processor(val id: Int,
       val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
         throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
       }.remoteHost
-      inflightResponses.remove(connectionId).foreach(response => updateRequestMetrics(response.request))
+      inflightResponses.remove(connectionId).foreach(updateRequestMetrics)
       // the channel has been closed by the selector but the quotas still need to be updated
       connectionQuotas.dec(InetAddress.getByName(remoteHost))
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 4fd906f..2c5517f 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2001,16 +2001,19 @@ class KafkaApis(val requestChannel: RequestChannel,
   private def closeConnection(request: RequestChannel.Request): Unit = {
     // This case is used when the request handler has encountered an error, but the client
     // does not expect a response (e.g. when produce request has acks set to 0)
-    requestChannel.sendResponse(new RequestChannel.Response(request, None, CloseConnectionAction))
+    requestChannel.sendResponse(new RequestChannel.Response(request, None, CloseConnectionAction,
None))
   }
 
   private def sendResponse(request: RequestChannel.Request, responseOpt: Option[AbstractResponse]):
Unit = {
     responseOpt match {
       case Some(response) =>
         val responseSend = request.context.buildResponse(response)
-        requestChannel.sendResponse(new RequestChannel.Response(request, Some(responseSend),
SendAction))
+        val responseString =
+          if (RequestChannel.isRequestLoggingEnabled) Some(response.toString(request.context.header.apiVersion))
+          else None
+        requestChannel.sendResponse(new RequestChannel.Response(request, Some(responseSend),
SendAction, responseString))
       case None =>
-        requestChannel.sendResponse(new RequestChannel.Response(request, None, NoOpAction))
+        requestChannel.sendResponse(new RequestChannel.Response(request, None, NoOpAction,
None))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index 356ccae..f055762 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -54,14 +54,14 @@ class KafkaRequestHandler(id: Int,
 
       req match {
         case RequestChannel.ShutdownRequest =>
-          debug("Kafka request handler %d on broker %d received shut down command".format(id,
brokerId))
+          debug(s"Kafka request handler $id on broker $brokerId received shut down command")
           latch.countDown()
           return
 
         case request: RequestChannel.Request =>
           try {
             request.requestDequeueTimeNanos = endTime
-            trace("Kafka request handler %d on broker %d handling request %s".format(id,
brokerId, request))
+            trace(s"Kafka request handler $id on broker $brokerId handling request $request")
             apis.handle(request)
           } catch {
             case e: FatalExitError =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 27cafd7..0b42118 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -177,7 +177,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
       }, 2, TimeUnit.SECONDS)
     consumer.poll(0)
 
-    def sendRecords(numRecords: Int, topic: String = this.topic) {
+    def sendRecords(numRecords: Int, topic: String) {
       var remainingRecords = numRecords
       val endTimeMs = System.currentTimeMillis + 20000
       while (remainingRecords > 0 && System.currentTimeMillis < endTimeMs)
{

http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
index 2cd0df2..7ec2feb 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
@@ -201,7 +201,7 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness {
   }
 
   private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]],
-                             numRecords: Int = 1,
+                             numRecords: Int,
                              startingOffset: Int = 0,
                              topic: String = topic,
                              part: Int = part) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 760cc39..d978961 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -529,7 +529,7 @@ class TransactionsTest extends KafkaServerTestHarness {
     consumer
   }
 
-  private def createReadUncommittedConsumer(group: String = "group") = {
+  private def createReadUncommittedConsumer(group: String) = {
     val props = new Properties()
     props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted")
     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")

http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index ff1e8e2..b20622f 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -268,7 +268,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) extends AbstractLogCle
   }
 
   private def writeDupsSingleMessageSet(numKeys: Int, numDups: Int, log: Log, codec: CompressionType,
-                                        startKey: Int = 0, magicValue: Byte = RecordBatch.CURRENT_MAGIC_VALUE):
Seq[(Int, String, Long)] = {
+                                        startKey: Int = 0, magicValue: Byte): Seq[(Int, String,
Long)] = {
     val kvs = for (_ <- 0 until numDups; key <- startKey until (startKey + numKeys))
yield {
       val payload = counter.toString
       counter += 1

http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index c9f5441..1146029 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -222,7 +222,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
     cleanerManager
   }
 
-  private def createLog(segmentSize: Int, cleanupPolicy: String = "delete"): Log = {
+  private def createLog(segmentSize: Int, cleanupPolicy: String): Log = {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, segmentSize: Integer)
     logProps.put(LogConfig.RetentionMsProp, 1: Integer)
@@ -243,7 +243,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
     log
   }
 
-  private def makeLog(dir: File = logDir, config: LogConfig = logConfig) =
+  private def makeLog(dir: File = logDir, config: LogConfig) =
     Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, scheduler =
time.scheduler,
       time = time, brokerTopicStats = new BrokerTopicStats, maxProducerIdExpirationMs = 60
* 60 * 1000,
       producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,

http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index c29ece5..517e876 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -1270,12 +1270,12 @@ class LogCleanerTest extends JUnitSuite {
       partitionLeaderEpoch, new SimpleRecord(key.toString.getBytes, value.toString.getBytes))
   }
 
-  private def appendTransactionalAsLeader(log: Log, producerId: Long, producerEpoch: Short
= 0): Seq[Int] => LogAppendInfo = {
+  private def appendTransactionalAsLeader(log: Log, producerId: Long, producerEpoch: Short):
Seq[Int] => LogAppendInfo = {
     appendIdempotentAsLeader(log, producerId, producerEpoch, isTransactional = true)
   }
 
   private def appendIdempotentAsLeader(log: Log, producerId: Long,
-                                       producerEpoch: Short = 0,
+                                       producerEpoch: Short,
                                        isTransactional: Boolean = false): Seq[Int] =>
LogAppendInfo = {
     var sequence = 0
     keys: Seq[Int] => {

http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 30ccc8b..2a07532 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -333,7 +333,7 @@ class LogSegmentTest {
   private def endTxnRecords(controlRecordType: ControlRecordType,
                             producerId: Long,
                             producerEpoch: Short,
-                            offset: Long = 0L,
+                            offset: Long,
                             partitionLeaderEpoch: Int = 0,
                             coordinatorEpoch: Int = 0,
                             timestamp: Long = RecordBatch.NO_TIMESTAMP): MemoryRecords =
{

http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index a5d415e..96f7bfc 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -947,9 +947,9 @@ class LogValidatorTest {
       isFromClient = true)
   }
 
-  private def createRecords(magicValue: Byte = RecordBatch.CURRENT_MAGIC_VALUE,
+  private def createRecords(magicValue: Byte,
                             timestamp: Long = RecordBatch.NO_TIMESTAMP,
-                            codec: CompressionType = CompressionType.NONE): MemoryRecords
= {
+                            codec: CompressionType): MemoryRecords = {
     val buf = ByteBuffer.allocate(512)
     val builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME,
0L)
     builder.appendWithOffset(0, timestamp, null, "hello".getBytes)

http://git-wip-us.apache.org/repos/asf/kafka/blob/27336192/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 6fbb0c9..9b32635 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -107,7 +107,7 @@ class SocketServerTest extends JUnitSuite {
     byteBuffer.rewind()
 
     val send = new NetworkSend(request.context.connectionId, byteBuffer)
-    channel.sendResponse(new RequestChannel.Response(request, Some(send), SendAction))
+    channel.sendResponse(new RequestChannel.Response(request, Some(send), SendAction, None))
   }
 
   def connect(s: SocketServer = server, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT)
= {
@@ -187,7 +187,7 @@ class SocketServerTest extends JUnitSuite {
     for (_ <- 0 until 10) {
       val request = receiveRequest(server.requestChannel)
       assertNotNull("receiveRequest timed out", request)
-      server.requestChannel.sendResponse(new RequestChannel.Response(request, None, RequestChannel.NoOpAction))
+      server.requestChannel.sendResponse(new RequestChannel.Response(request, None, RequestChannel.NoOpAction,
None))
     }
   }
 
@@ -510,7 +510,7 @@ class SocketServerTest extends JUnitSuite {
       // detected. If the buffer is larger than 102400 bytes, a second write is attempted
and it fails with an
       // IOException.
       val send = new NetworkSend(request.context.connectionId, ByteBuffer.allocate(550000))
-      channel.sendResponse(new RequestChannel.Response(request, Some(send), SendAction))
+      channel.sendResponse(new RequestChannel.Response(request, Some(send), SendAction, None))
       TestUtils.waitUntilTrue(() => totalTimeHistCount() == expectedTotalTimeCount,
         s"request metrics not updated, expected: $expectedTotalTimeCount, actual: ${totalTimeHistCount()}")
 


Mime
View raw message