kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-1199 Add a reduced access log level; reviewed by Guozhang Wang and Jun Rao
Date Mon, 13 Jan 2014 22:03:45 GMT
Updated Branches:
  refs/heads/trunk f63e3f730 -> d2e2c607d


KAFKA-1199 Add a reduced access log level; reviewed by Guozhang Wang and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d2e2c607
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d2e2c607
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d2e2c607

Branch: refs/heads/trunk
Commit: d2e2c607d1490cf2121dd64f4e79f34ba64c8ed7
Parents: f63e3f7
Author: Neha Narkhede <neha.narkhede@gmail.com>
Authored: Mon Jan 13 13:59:33 2014 -0800
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Mon Jan 13 13:59:33 2014 -0800

----------------------------------------------------------------------
 .../kafka/api/ControlledShutdownRequest.scala   | 16 +++++++-----
 .../kafka/api/ControlledShutdownResponse.scala  |  3 +++
 .../src/main/scala/kafka/api/FetchRequest.scala | 26 +++++++++++---------
 .../scala/kafka/api/LeaderAndIsrRequest.scala   | 23 ++++++++++-------
 .../scala/kafka/api/LeaderAndIsrResponse.scala  |  3 +++
 .../scala/kafka/api/OffsetCommitRequest.scala   | 16 ++++++++++++
 .../scala/kafka/api/OffsetCommitResponse.scala  |  3 +++
 .../scala/kafka/api/OffsetFetchRequest.scala    | 18 +++++++++++++-
 .../scala/kafka/api/OffsetFetchResponse.scala   |  4 ++-
 .../main/scala/kafka/api/OffsetRequest.scala    | 23 ++++++++++-------
 .../main/scala/kafka/api/OffsetResponse.scala   |  1 +
 .../main/scala/kafka/api/ProducerRequest.scala  | 24 +++++++++++-------
 .../main/scala/kafka/api/ProducerResponse.scala |  2 ++
 .../scala/kafka/api/RequestOrResponse.scala     |  8 +++++-
 .../scala/kafka/api/StopReplicaRequest.scala    | 25 +++++++++++--------
 .../scala/kafka/api/StopReplicaResponse.scala   |  2 ++
 .../scala/kafka/api/TopicMetadataRequest.scala  | 21 ++++++++++------
 .../scala/kafka/api/TopicMetadataResponse.scala |  2 ++
 .../scala/kafka/api/UpdateMetadataRequest.scala | 19 ++++++++------
 .../kafka/api/UpdateMetadataResponse.scala      |  2 ++
 .../kafka/javaapi/TopicMetadataRequest.scala    | 25 +++++++++++++++++++
 .../scala/kafka/network/RequestChannel.scala    |  6 ++++-
 22 files changed, 199 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d2e2c607/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
index ad6a20d..7dacb20 100644
--- a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
+++ b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
@@ -58,6 +58,15 @@ case class ControlledShutdownRequest(val versionId: Short,
   }
 
   override def toString(): String = {
+    describe(true)
+  }
+
+  override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request):
Unit = {
+    val errorResponse = ControlledShutdownResponse(correlationId, ErrorMapping.codeFor(e.getCause.asInstanceOf[Class[Throwable]]),
Set.empty[TopicAndPartition])
+    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+  }
+
+  override def describe(details: Boolean = false): String = {
     val controlledShutdownRequest = new StringBuilder
     controlledShutdownRequest.append("Name: " + this.getClass.getSimpleName)
     controlledShutdownRequest.append("; Version: " + versionId)
@@ -65,9 +74,4 @@ case class ControlledShutdownRequest(val versionId: Short,
     controlledShutdownRequest.append("; BrokerId: " + brokerId)
     controlledShutdownRequest.toString()
   }
-
-  override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request):
Unit = {
-    val errorResponse = ControlledShutdownResponse(correlationId, ErrorMapping.codeFor(e.getCause.asInstanceOf[Class[Throwable]]),
Set.empty[TopicAndPartition])
-    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
-  }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2e2c607/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
index b7c8448..a80aa49 100644
--- a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
+++ b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
@@ -67,4 +67,7 @@ case class ControlledShutdownResponse(override val correlationId: Int,
       buffer.putInt(topicAndPartition.partition)
     }
   }
+
+  override def describe(details: Boolean):String = { toString }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2e2c607/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index d41a705..dea118a 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -141,16 +141,7 @@ case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentV
   def numPartitions = requestInfo.size
 
   override def toString(): String = {
-    val fetchRequest = new StringBuilder
-    fetchRequest.append("Name: " + this.getClass.getSimpleName)
-    fetchRequest.append("; Version: " + versionId)
-    fetchRequest.append("; CorrelationId: " + correlationId)
-    fetchRequest.append("; ClientId: " + clientId)
-    fetchRequest.append("; ReplicaId: " + replicaId)
-    fetchRequest.append("; MaxWait: " + maxWait + " ms")
-    fetchRequest.append("; MinBytes: " + minBytes + " bytes")
-    fetchRequest.append("; RequestInfo: " + requestInfo.mkString(","))
-    fetchRequest.toString()
+    describe(true)
   }
 
   override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request):
Unit = {
@@ -161,8 +152,21 @@ case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentV
     val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData)
     requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(errorResponse)))
   }
-}
 
+  override def describe(details: Boolean): String = {
+    val fetchRequest = new StringBuilder
+    fetchRequest.append("Name: " + this.getClass.getSimpleName)
+    fetchRequest.append("; Version: " + versionId)
+    fetchRequest.append("; CorrelationId: " + correlationId)
+    fetchRequest.append("; ClientId: " + clientId)
+    fetchRequest.append("; ReplicaId: " + replicaId)
+    fetchRequest.append("; MaxWait: " + maxWait + " ms")
+    fetchRequest.append("; MinBytes: " + minBytes + " bytes")
+    if(details)
+      fetchRequest.append("; RequestInfo: " + requestInfo.mkString(","))
+    fetchRequest.toString()
+  }
+}
 
 @nonthreadsafe
 class FetchRequestBuilder() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2e2c607/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index 3401afa..a984878 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -173,6 +173,18 @@ case class LeaderAndIsrRequest (versionId: Short,
   }
 
   override def toString(): String = {
+    describe(true)
+  }
+
+  override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request):
Unit = {
+    val responseMap = partitionStateInfos.map {
+      case (topicAndPartition, partitionAndState) => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+    }
+    val errorResponse = LeaderAndIsrResponse(correlationId, responseMap)
+    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+  }
+
+  override def describe(details: Boolean): String = {
     val leaderAndIsrRequest = new StringBuilder
     leaderAndIsrRequest.append("Name:" + this.getClass.getSimpleName)
     leaderAndIsrRequest.append(";Version:" + versionId)
@@ -180,16 +192,9 @@ case class LeaderAndIsrRequest (versionId: Short,
     leaderAndIsrRequest.append(";ControllerEpoch:" + controllerEpoch)
     leaderAndIsrRequest.append(";CorrelationId:" + correlationId)
     leaderAndIsrRequest.append(";ClientId:" + clientId)
-    leaderAndIsrRequest.append(";PartitionState:" + partitionStateInfos.mkString(","))
     leaderAndIsrRequest.append(";Leaders:" + leaders.mkString(","))
+    if(details)
+      leaderAndIsrRequest.append(";PartitionState:" + partitionStateInfos.mkString(","))
     leaderAndIsrRequest.toString()
   }
-
-  override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request):
Unit = {
-    val responseMap = partitionStateInfos.map {
-      case (topicAndPartition, partitionAndState) => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
-    }
-    val errorResponse = LeaderAndIsrResponse(correlationId, responseMap)
-    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2e2c607/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala b/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
index 378b2b3..f636444 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
@@ -69,4 +69,7 @@ case class LeaderAndIsrResponse(override val correlationId: Int,
       buffer.putShort(value)
     }
   }
+
+  override def describe(details: Boolean):String = { toString }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2e2c607/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
index 1cbe6e8..4d1fa5c 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
@@ -106,4 +106,20 @@ case class OffsetCommitRequest(groupId: String,
     val errorResponse = OffsetCommitResponse(requestInfo=responseMap, correlationId=correlationId)
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
   }
+
+  override def describe(details: Boolean): String = {
+    val offsetCommitRequest = new StringBuilder
+    offsetCommitRequest.append("Name: " + this.getClass.getSimpleName)
+    offsetCommitRequest.append("; Version: " + versionId)
+    offsetCommitRequest.append("; CorrelationId: " + correlationId)
+    offsetCommitRequest.append("; ClientId: " + clientId)
+    offsetCommitRequest.append("; GroupId: " + groupId)
+    if(details)
+      offsetCommitRequest.append("; RequestInfo: " + requestInfo.mkString(","))
+    offsetCommitRequest.toString()
+  }
+
+  override def toString(): String = {
+    describe(true)
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2e2c607/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
index ad54bd6..9e1795f 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
@@ -74,5 +74,8 @@ case class OffsetCommitResponse(requestInfo: Map[TopicAndPartition, Short],
         2 /* error */
       )
     })
+
+  override def describe(details: Boolean):String = { toString }
+
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2e2c607/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
index a4c5623..7036532 100644
--- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
@@ -98,4 +98,20 @@ case class OffsetFetchRequest(groupId: String,
     val errorResponse = OffsetFetchResponse(requestInfo=responseMap, correlationId=correlationId)
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
   }
-}
+
+  override def describe(details: Boolean): String = {
+    val offsetFetchRequest = new StringBuilder
+    offsetFetchRequest.append("Name: " + this.getClass.getSimpleName)
+    offsetFetchRequest.append("; Version: " + versionId)
+    offsetFetchRequest.append("; CorrelationId: " + correlationId)
+    offsetFetchRequest.append("; ClientId: " + clientId)
+    offsetFetchRequest.append("; GroupId: " + groupId)
+    if(details)
+      offsetFetchRequest.append("; RequestInfo: " + requestInfo.mkString(","))
+    offsetFetchRequest.toString()
+  }
+
+  override def toString(): String = {
+    describe(true)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2e2c607/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetFetchResponse.scala b/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
index ce03a13..c1222f4 100644
--- a/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
@@ -65,7 +65,7 @@ case class OffsetFetchResponse(requestInfo: Map[TopicAndPartition, OffsetMetadat
     })
   }
 
-  override def sizeInBytes = 
+  override def sizeInBytes =
     4 + /* correlationId */
     4 + /* topic count */
     requestInfoGroupedByTopic.foldLeft(0)((count, topicAndOffsets) => {
@@ -81,5 +81,7 @@ case class OffsetFetchResponse(requestInfo: Map[TopicAndPartition, OffsetMetadat
         2 /* error */
       })
     })
+
+  override def describe(details: Boolean):String = { toString }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2e2c607/core/src/main/scala/kafka/api/OffsetRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala
index 0a94a6c..7cbc26c 100644
--- a/core/src/main/scala/kafka/api/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetRequest.scala
@@ -108,14 +108,7 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ
   def isFromDebuggingClient = replicaId == Request.DebuggingConsumerId
 
   override def toString(): String = {
-    val offsetRequest = new StringBuilder
-    offsetRequest.append("Name: " + this.getClass.getSimpleName)
-    offsetRequest.append("; Version: " + versionId)
-    offsetRequest.append("; CorrelationId: " + correlationId)
-    offsetRequest.append("; ClientId: " + clientId)
-    offsetRequest.append("; RequestInfo: " + requestInfo.mkString(","))
-    offsetRequest.append("; ReplicaId: " + replicaId)
-    offsetRequest.toString()
+    describe(true)
   }
 
   override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request):
Unit = {
@@ -126,4 +119,16 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ
     val errorResponse = OffsetResponse(correlationId, partitionOffsetResponseMap)
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
   }
-}
+
+  override def describe(details: Boolean): String = {
+    val offsetRequest = new StringBuilder
+    offsetRequest.append("Name: " + this.getClass.getSimpleName)
+    offsetRequest.append("; Version: " + versionId)
+    offsetRequest.append("; CorrelationId: " + correlationId)
+    offsetRequest.append("; ClientId: " + clientId)
+    offsetRequest.append("; ReplicaId: " + replicaId)
+    if(details)
+      offsetRequest.append("; RequestInfo: " + requestInfo.mkString(","))
+    offsetRequest.toString()
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2e2c607/core/src/main/scala/kafka/api/OffsetResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetResponse.scala b/core/src/main/scala/kafka/api/OffsetResponse.scala
index fca76a2..0e1d6e3 100644
--- a/core/src/main/scala/kafka/api/OffsetResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetResponse.scala
@@ -94,5 +94,6 @@ case class OffsetResponse(override val correlationId: Int,
     }
   }
 
+  override def describe(details: Boolean):String = { toString }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2e2c607/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index c606351..0c295a2 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -123,15 +123,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
   def numPartitions = data.size
 
   override def toString(): String = {
-    val producerRequest = new StringBuilder
-    producerRequest.append("Name: " + this.getClass.getSimpleName)
-    producerRequest.append("; Version: " + versionId)
-    producerRequest.append("; CorrelationId: " + correlationId)
-    producerRequest.append("; ClientId: " + clientId)
-    producerRequest.append("; RequiredAcks: " + requiredAcks)
-    producerRequest.append("; AckTimeoutMs: " + ackTimeoutMs + " ms")
-    producerRequest.append("; TopicAndPartition: " + topicPartitionMessageSizeMap.mkString(","))
-    producerRequest.toString()
+    describe(true)
   }
 
   override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request):
Unit = {
@@ -148,6 +140,20 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
     }
   }
 
+  override def describe(details: Boolean): String = {
+    val producerRequest = new StringBuilder
+    producerRequest.append("Name: " + this.getClass.getSimpleName)
+    producerRequest.append("; Version: " + versionId)
+    producerRequest.append("; CorrelationId: " + correlationId)
+    producerRequest.append("; ClientId: " + clientId)
+    producerRequest.append("; RequiredAcks: " + requiredAcks)
+    producerRequest.append("; AckTimeoutMs: " + ackTimeoutMs + " ms")
+    if(details)
+      producerRequest.append("; TopicAndPartition: " + topicPartitionMessageSizeMap.mkString(","))
+    producerRequest.toString()
+  }
+
+
   def emptyData(){
     data.clear()
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2e2c607/core/src/main/scala/kafka/api/ProducerResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala
index d59c5bb..06261b9 100644
--- a/core/src/main/scala/kafka/api/ProducerResponse.scala
+++ b/core/src/main/scala/kafka/api/ProducerResponse.scala
@@ -87,5 +87,7 @@ case class ProducerResponse(override val correlationId: Int,
       }
     })
   }
+
+  override def describe(details: Boolean):String = { toString }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2e2c607/core/src/main/scala/kafka/api/RequestOrResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/RequestOrResponse.scala b/core/src/main/scala/kafka/api/RequestOrResponse.scala
index ba59c31..708e547 100644
--- a/core/src/main/scala/kafka/api/RequestOrResponse.scala
+++ b/core/src/main/scala/kafka/api/RequestOrResponse.scala
@@ -30,12 +30,18 @@ object Request {
 }
 
 
-private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None, val
correlationId: Int) extends Logging{
+private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None, val
correlationId: Int) extends Logging {
 
   def sizeInBytes: Int
   
   def writeTo(buffer: ByteBuffer): Unit
 
   def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request):
Unit = {}
+
+  /* The purpose of this API is to return a string description of the Request mainly for
the purpose of request logging.
+  *  This API has no meaning for a Response object.
+   * @param details If this is false, omit the parts of the request description that are
proportional to the number of
+   *                topics or partitions. This is mainly to control the amount of request
logging. */
+  def describe(details: Boolean):String
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2e2c607/core/src/main/scala/kafka/api/StopReplicaRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
index efd7046..820f0f5 100644
--- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala
+++ b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
@@ -98,6 +98,18 @@ case class StopReplicaRequest(versionId: Short,
   }
 
   override def toString(): String = {
+    describe(true)
+  }
+
+  override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request):
Unit = {
+    val responseMap = partitions.map {
+      case topicAndPartition => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+    }.toMap
+    val errorResponse = StopReplicaResponse(correlationId, responseMap)
+    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+  }
+
+  override def describe(details: Boolean): String = {
     val stopReplicaRequest = new StringBuilder
     stopReplicaRequest.append("Name: " + this.getClass.getSimpleName)
     stopReplicaRequest.append("; Version: " + versionId)
@@ -106,15 +118,8 @@ case class StopReplicaRequest(versionId: Short,
     stopReplicaRequest.append("; DeletePartitions: " + deletePartitions)
     stopReplicaRequest.append("; ControllerId: " + controllerId)
     stopReplicaRequest.append("; ControllerEpoch: " + controllerEpoch)
-    stopReplicaRequest.append("; Partitions: " + partitions.mkString(","))
+    if(details)
+      stopReplicaRequest.append("; Partitions: " + partitions.mkString(","))
     stopReplicaRequest.toString()
   }
-
-  override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request):
Unit = {
-    val responseMap = partitions.map {
-      case topicAndPartition => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
-    }.toMap
-    val errorResponse = StopReplicaResponse(correlationId, responseMap)
-    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
-  }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2e2c607/core/src/main/scala/kafka/api/StopReplicaResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/StopReplicaResponse.scala b/core/src/main/scala/kafka/api/StopReplicaResponse.scala
index c82eadd..d7e3630 100644
--- a/core/src/main/scala/kafka/api/StopReplicaResponse.scala
+++ b/core/src/main/scala/kafka/api/StopReplicaResponse.scala
@@ -70,4 +70,6 @@ case class StopReplicaResponse(override val correlationId: Int,
       buffer.putShort(value)
     }
   }
+
+  override def describe(details: Boolean):String = { toString }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2e2c607/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
index c5221c4..a319f2f 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
@@ -72,13 +72,7 @@ case class TopicMetadataRequest(val versionId: Short,
   }
 
   override def toString(): String = {
-    val topicMetadataRequest = new StringBuilder
-    topicMetadataRequest.append("Name: " + this.getClass.getSimpleName)
-    topicMetadataRequest.append("; Version: " + versionId)
-    topicMetadataRequest.append("; CorrelationId: " + correlationId)
-    topicMetadataRequest.append("; ClientId: " + clientId)
-    topicMetadataRequest.append("; Topics: " + topics.mkString(","))
-    topicMetadataRequest.toString()
+    describe(true)
   }
 
   override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request):
Unit = {
@@ -88,4 +82,15 @@ case class TopicMetadataRequest(val versionId: Short,
     val errorResponse = TopicMetadataResponse(topicMetadata, correlationId)
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
   }
-}
+
+  override def describe(details: Boolean): String = {
+    val topicMetadataRequest = new StringBuilder
+    topicMetadataRequest.append("Name: " + this.getClass.getSimpleName)
+    topicMetadataRequest.append("; Version: " + versionId)
+    topicMetadataRequest.append("; CorrelationId: " + correlationId)
+    topicMetadataRequest.append("; ClientId: " + clientId)
+    if(details)
+      topicMetadataRequest.append("; Topics: " + topics.mkString(","))
+    topicMetadataRequest.toString()
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2e2c607/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
index 290f263..f6b7429 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
@@ -57,4 +57,6 @@ case class TopicMetadataResponse(topicsMetadata: Seq[TopicMetadata],
     val brokers = (parts.flatMap(_.replicas)) ++ (parts.map(_.leader).collect{case Some(l)
=> l})
     brokers.map(b => (b.id, b)).toMap
   }
+
+  override def describe(details: Boolean):String = { toString }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2e2c607/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
index 2ead364..54dd7bd 100644
--- a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
@@ -101,6 +101,15 @@ case class UpdateMetadataRequest (versionId: Short,
   }
 
   override def toString(): String = {
+    describe(true)
+  }
+
+  override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request):
Unit = {
+    val errorResponse = new UpdateMetadataResponse(correlationId, ErrorMapping.codeFor(e.getCause.asInstanceOf[Class[Throwable]]))
+    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+  }
+
+  override def describe(details: Boolean): String = {
     val updateMetadataRequest = new StringBuilder
     updateMetadataRequest.append("Name:" + this.getClass.getSimpleName)
     updateMetadataRequest.append(";Version:" + versionId)
@@ -108,13 +117,9 @@ case class UpdateMetadataRequest (versionId: Short,
     updateMetadataRequest.append(";ControllerEpoch:" + controllerEpoch)
     updateMetadataRequest.append(";CorrelationId:" + correlationId)
     updateMetadataRequest.append(";ClientId:" + clientId)
-    updateMetadataRequest.append(";PartitionState:" + partitionStateInfos.mkString(","))
     updateMetadataRequest.append(";AliveBrokers:" + aliveBrokers.mkString(","))
+    if(details)
+      updateMetadataRequest.append(";PartitionState:" + partitionStateInfos.mkString(","))
     updateMetadataRequest.toString()
   }
-
-  override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request):
Unit = {
-    val errorResponse = new UpdateMetadataResponse(correlationId, ErrorMapping.codeFor(e.getCause.asInstanceOf[Class[Throwable]]))
-    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
-  }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2e2c607/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala b/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala
index b1e42c3..c583c1f 100644
--- a/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala
+++ b/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala
@@ -41,4 +41,6 @@ case class UpdateMetadataResponse(override val correlationId: Int,
     buffer.putInt(correlationId)
     buffer.putShort(errorCode)
   }
+
+  override def describe(details: Boolean):String = { toString }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2e2c607/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
index 05757a1..7e6da16 100644
--- a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
@@ -19,6 +19,9 @@ package kafka.javaapi
 import kafka.api._
 import java.nio.ByteBuffer
 import scala.collection.mutable
+import kafka.network.{BoundedByteBufferSend, RequestChannel}
+import kafka.common.ErrorMapping
+import kafka.network.RequestChannel.Response
 
 class TopicMetadataRequest(val versionId: Short,
                            override val correlationId: Int,
@@ -41,4 +44,26 @@ class TopicMetadataRequest(val versionId: Short,
 
   def sizeInBytes: Int = underlying.sizeInBytes()
 
+  override def toString(): String = {
+    describe(true)
+  }
+
+  override def describe(details: Boolean): String = {
+    val topicMetadataRequest = new StringBuilder
+    topicMetadataRequest.append("Name: " + this.getClass.getSimpleName)
+    topicMetadataRequest.append("; Version: " + versionId)
+    topicMetadataRequest.append("; CorrelationId: " + correlationId)
+    topicMetadataRequest.append("; ClientId: " + clientId)
+    if(details) {
+      topicMetadataRequest.append("; Topics: ")
+      val topicIterator = topics.iterator()
+      while (topicIterator.hasNext) {
+        val topic = topicIterator.next()
+        topicMetadataRequest.append("%s".format(topic))
+        if(topicIterator.hasNext)
+          topicMetadataRequest.append(",")
+      }
+    }
+    topicMetadataRequest.toString()
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2e2c607/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 330d3a0..a6ec970 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -83,7 +83,11 @@ object RequestChannel extends Logging {
       }
       if(requestLogger.isTraceEnabled)
         requestLogger.trace("Completed request:%s from client %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d"
-          .format(requestObj, remoteAddress, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime,
responseQueueTime, responseSendTime))
+          .format(requestObj.describe(true), remoteAddress, totalTime, requestQueueTime,
apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime))
+      else if(requestLogger.isDebugEnabled) {
+        requestLogger.debug("Completed request:%s from client %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d"
+          .format(requestObj.describe(false), remoteAddress, totalTime, requestQueueTime,
apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime))
+      }
     }
   }
   


Mime
View raw message