This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 883e91f MINOR: Include additional detail in fetch error message (#6036)
883e91f is described below
commit 883e91f49f179eacfa8ed68ea74b82c647f5a8a6
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Mon Dec 17 15:51:01 2018 -0800
MINOR: Include additional detail in fetch error message (#6036)
This patch adds additional information in the log message after a fetch failure to make
debugging easier.
Reviewers: David Arthur <mumrah@gmail.com>
---
.../java/org/apache/kafka/common/requests/FetchRequest.java | 2 +-
core/src/main/scala/kafka/api/Request.scala | 10 ++++++++++
core/src/main/scala/kafka/server/ReplicaManager.scala | 8 ++++++--
3 files changed, 17 insertions(+), 3 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 8d94bfd..b3443a1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -233,7 +233,7 @@ public class FetchRequest extends AbstractRequest {
@Override
public String toString() {
- return "(offset=" + fetchOffset +
+ return "(fetchOffset=" + fetchOffset +
", logStartOffset=" + logStartOffset +
", maxBytes=" + maxBytes +
", currentLeaderEpoch=" + currentLeaderEpoch +
diff --git a/core/src/main/scala/kafka/api/Request.scala b/core/src/main/scala/kafka/api/Request.scala
index b6ec273..653b5f6 100644
--- a/core/src/main/scala/kafka/api/Request.scala
+++ b/core/src/main/scala/kafka/api/Request.scala
@@ -24,4 +24,14 @@ object Request {
// Broker ids are non-negative int.
def isValidBrokerId(brokerId: Int): Boolean = brokerId >= 0
+
+ def describeReplicaId(replicaId: Int): String = {
+ replicaId match {
+ case OrdinaryConsumerId => "consumer"
+ case DebuggingConsumerId => "debug consumer"
+ case FutureLocalReplicaId => "future local replica"
+ case id if isValidBrokerId(id) => s"replica [$id]"
+ case id => s"invalid replica [$id]"
+ }
+ }
}
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index cf861ee..84b2d48 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -885,13 +885,13 @@ class ReplicaManager(val config: KafkaConfig,
brokerTopicStats.topicStats(tp.topic).totalFetchRequestRate.mark()
brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()
+ val adjustedMaxBytes = math.min(fetchInfo.maxBytes, limitBytes)
try {
trace(s"Fetching log segment for partition $tp, offset $offset, partition fetch size
$partitionFetchSize, " +
s"remaining response limit $limitBytes" +
(if (minOneMessage) s", ignoring response/partition size limits" else ""))
val partition = getPartitionOrException(tp, expectLeader = fetchOnlyFromLeader)
- val adjustedMaxBytes = math.min(fetchInfo.maxBytes, limitBytes)
val fetchTimeMs = time.milliseconds
// Try the read first, this tells us whether we need all of adjustedFetchSize for
this partition
@@ -945,7 +945,11 @@ class ReplicaManager(val config: KafkaConfig,
case e: Throwable =>
brokerTopicStats.topicStats(tp.topic).failedFetchRequestRate.mark()
brokerTopicStats.allTopicsStats.failedFetchRequestRate.mark()
- error(s"Error processing fetch operation on partition $tp, offset $offset", e)
+
+ val fetchSource = Request.describeReplicaId(replicaId)
+ error(s"Error processing fetch with max size $adjustedMaxBytes from $fetchSource
" +
+ s"on partition $tp: $fetchInfo", e)
+
LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
highWatermark = -1L,
leaderLogStartOffset = -1L,
|