kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-816 Reduce noise in Kafka server logs due to NotLeaderForPartitionException; reviewed by Jun Rao
Date Fri, 22 Mar 2013 16:32:31 GMT
Updated Branches:
  refs/heads/0.8 08b2a37c3 -> 51421fcc0


KAFKA-816 Reduce noise in Kafka server logs due to NotLeaderForPartitionException; reviewed
by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/51421fcc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/51421fcc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/51421fcc

Branch: refs/heads/0.8
Commit: 51421fcc0111031bb77f779a6f6c00520d526a34
Parents: 08b2a37
Author: Neha Narkhede <neha.narkhede@gmail.com>
Authored: Fri Mar 22 09:32:27 2013 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Fri Mar 22 09:32:27 2013 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/server/KafkaApis.scala |   28 ++++++++++++++++-
 1 files changed, 27 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/51421fcc/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index cfabfc1..87ca6b0 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -197,10 +197,19 @@ class KafkaApis(val requestChannel: RequestChannel,
               .format(messages.size, topicAndPartition.topic, topicAndPartition.partition,
start, end))
         ProduceResult(topicAndPartition, start, end)
       } catch {
+        // NOTE: Failed produce requests is not incremented for UnknownTopicOrPartitionException
and NotLeaderForPartitionException
+        // since failed produce requests metric is supposed to indicate failure of a broker
in handling a produce request
+        // for a partition it is the leader for
         case e: KafkaStorageException =>
           fatal("Halting due to unrecoverable I/O error while handling produce request: ",
e)
           Runtime.getRuntime.halt(1)
           null
+        case utpe: UnknownTopicOrPartitionException =>
+          warn(utpe.getMessage)
+          new ProduceResult(topicAndPartition, utpe)
+        case nle: NotLeaderForPartitionException =>
+          warn(nle.getMessage)
+          new ProduceResult(topicAndPartition, nle)
         case e =>
           BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark()
           BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark()
@@ -278,7 +287,16 @@ class KafkaApis(val requestChannel: RequestChannel,
               new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messages)
             }
           } catch {
-            case t: Throwable =>
+            // NOTE: Failed fetch requests is not incremented for UnknownTopicOrPartitionException
and NotLeaderForPartitionException
+            // since failed fetch requests metric is supposed to indicate failure of a broker
in handling a fetch request
+            // for a partition it is the leader for
+            case utpe: UnknownTopicOrPartitionException =>
+              warn(utpe.getMessage)
+              new FetchResponsePartitionData(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]),
-1L, MessageSet.Empty)
+            case nle: NotLeaderForPartitionException =>
+              warn(nle.getMessage)
+              new FetchResponsePartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]),
-1L, MessageSet.Empty)
+            case t =>
               BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
               BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark()
               error("error when processing request " + (topic, partition, offset, fetchSize),
t)
@@ -344,6 +362,14 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
         (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.NoError, offsets))
       } catch {
+        // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are
special cased since these error messages
+        // are typically transient and there is no value in logging the entire stack trace
for the same
+        case utpe: UnknownTopicOrPartitionException =>
+          warn(utpe.getMessage)
+          (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]),
Nil) )
+        case nle: NotLeaderForPartitionException =>
+          warn(nle.getMessage)
+          (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]),
Nil) )
         case e =>
           warn("Error while responding to offset request", e)
           (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
Nil) )


Mime
View raw message