kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: MINOR: Include additional detail in fetch error message (#6036)
Date Mon, 17 Dec 2018 23:51:41 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4039226  MINOR: Include additional detail in fetch error message (#6036)
4039226 is described below

commit 40392266aa6c1b4c5314fd89337fecee768441a2
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Mon Dec 17 15:51:01 2018 -0800

    MINOR: Include additional detail in fetch error message (#6036)
    
    This patch adds additional information in the log message after a fetch failure to make
debugging easier.
    
    Reviewers: David Arthur <mumrah@gmail.com>
---
 .../java/org/apache/kafka/common/requests/FetchRequest.java    |  2 +-
 core/src/main/scala/kafka/api/Request.scala                    | 10 ++++++++++
 core/src/main/scala/kafka/server/ReplicaManager.scala          |  8 ++++++--
 3 files changed, 17 insertions(+), 3 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 8d94bfd..b3443a1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -233,7 +233,7 @@ public class FetchRequest extends AbstractRequest {
 
         @Override
         public String toString() {
-            return "(offset=" + fetchOffset +
+            return "(fetchOffset=" + fetchOffset +
                     ", logStartOffset=" + logStartOffset +
                     ", maxBytes=" + maxBytes +
                     ", currentLeaderEpoch=" + currentLeaderEpoch +
diff --git a/core/src/main/scala/kafka/api/Request.scala b/core/src/main/scala/kafka/api/Request.scala
index b6ec273..653b5f6 100644
--- a/core/src/main/scala/kafka/api/Request.scala
+++ b/core/src/main/scala/kafka/api/Request.scala
@@ -24,4 +24,14 @@ object Request {
 
   // Broker ids are non-negative int.
   def isValidBrokerId(brokerId: Int): Boolean = brokerId >= 0
+
+  def describeReplicaId(replicaId: Int): String = {
+    replicaId match {
+      case OrdinaryConsumerId => "consumer"
+      case DebuggingConsumerId => "debug consumer"
+      case FutureLocalReplicaId => "future local replica"
+      case id if isValidBrokerId(id) => s"replica [$id]"
+      case id => s"invalid replica [$id]"
+    }
+  }
 }
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 2f64302..4cc3feb 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -886,13 +886,13 @@ class ReplicaManager(val config: KafkaConfig,
       brokerTopicStats.topicStats(tp.topic).totalFetchRequestRate.mark()
       brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()
 
+      val adjustedMaxBytes = math.min(fetchInfo.maxBytes, limitBytes)
       try {
         trace(s"Fetching log segment for partition $tp, offset $offset, partition fetch size
$partitionFetchSize, " +
           s"remaining response limit $limitBytes" +
           (if (minOneMessage) s", ignoring response/partition size limits" else ""))
 
         val partition = getPartitionOrException(tp, expectLeader = fetchOnlyFromLeader)
-        val adjustedMaxBytes = math.min(fetchInfo.maxBytes, limitBytes)
         val fetchTimeMs = time.milliseconds
 
         // Try the read first, this tells us whether we need all of adjustedFetchSize for
this partition
@@ -946,7 +946,11 @@ class ReplicaManager(val config: KafkaConfig,
         case e: Throwable =>
           brokerTopicStats.topicStats(tp.topic).failedFetchRequestRate.mark()
           brokerTopicStats.allTopicsStats.failedFetchRequestRate.mark()
-          error(s"Error processing fetch operation on partition $tp, offset $offset", e)
+
+          val fetchSource = Request.describeReplicaId(replicaId)
+          error(s"Error processing fetch with max size $adjustedMaxBytes from $fetchSource
" +
+            s"on partition $tp: $fetchInfo", e)
+
           LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
                         highWatermark = -1L,
                         leaderLogStartOffset = -1L,


Mime
View raw message