kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject git commit: kafka-649; Cleanup log4j logging (extra); patched by Jun Rao; reviewed by Neha Narkhede
Date Wed, 29 May 2013 17:00:59 GMT
Updated Branches:
  refs/heads/0.8 312ed2e67 -> e4f287db6


kafka-649; Cleanup log4j logging (extra); patched by Jun Rao; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e4f287db
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e4f287db
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e4f287db

Branch: refs/heads/0.8
Commit: e4f287db6142cafdf1ddb4ebf7110180ec06f7c4
Parents: 312ed2e
Author: Jun Rao <junrao@gmail.com>
Authored: Wed May 29 10:00:51 2013 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed May 29 10:00:51 2013 -0700

----------------------------------------------------------------------
 .../kafka/controller/PartitionLeaderSelector.scala |    7 +++--
 core/src/main/scala/kafka/server/KafkaApis.scala   |   22 +++++++++-----
 core/src/main/scala/kafka/utils/ZkUtils.scala      |    8 +++---
 3 files changed, 22 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e4f287db/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index 21b0e24..a47b142 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -63,13 +63,14 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext)
exten
               case false =>
                 ControllerStats.uncleanLeaderElectionRate.mark()
                 val newLeader = liveAssignedReplicasToThisPartition.head
-                warn("No broker in ISR is alive for %s. Elect leader from broker %s. There's
potential data loss."
-                     .format(topicAndPartition, newLeader))
+                warn("No broker in ISR is alive for %s. Elect leader %d from live brokers
%s. There's potential data loss."
+                     .format(topicAndPartition, newLeader, liveAssignedReplicasToThisPartition.mkString(",")))
                 new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion
+ 1)
             }
           case false =>
             val newLeader = liveBrokersInIsr.head
-            debug("Some broker in ISR is alive for %s. Select %d from ISR to be the leader.".format(topicAndPartition,
newLeader))
+            debug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader."
+                  .format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(",")))
             new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList,
currentLeaderIsrZkPathVersion + 1)
         }
         info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(),
topicAndPartition))

http://git-wip-us.apache.org/repos/asf/kafka/blob/e4f287db/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 93e2f04..dd88ccd 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -239,16 +239,18 @@ class KafkaApis(val requestChannel: RequestChannel,
           Runtime.getRuntime.halt(1)
           null
         case utpe: UnknownTopicOrPartitionException =>
-          warn("Produce request: " + utpe.getMessage)
+          warn("Produce request with correlation id %d from client %s on partition %s failed
due to %s".format(
+               producerRequest.correlationId, producerRequest.clientId, topicAndPartition,
utpe.getMessage))
           new ProduceResult(topicAndPartition, utpe)
         case nle: NotLeaderForPartitionException =>
-          warn("Produce request: " + nle.getMessage)
+          warn("Produce request with correlation id %d from client %s on partition %s failed
due to %s".format(
+               producerRequest.correlationId, producerRequest.clientId, topicAndPartition,
nle.getMessage))
           new ProduceResult(topicAndPartition, nle)
         case e =>
           BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark()
           BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark()
-          error("Error processing ProducerRequest with correlation id %d from client %s on
%s:%d"
-            .format(producerRequest.correlationId, producerRequest.clientId, topicAndPartition.topic,
topicAndPartition.partition), e)
+          error("Error processing ProducerRequest with correlation id %d from client %s on
partition %s"
+            .format(producerRequest.correlationId, producerRequest.clientId, topicAndPartition),
e)
           new ProduceResult(topicAndPartition, e)
        }
     }
@@ -326,10 +328,12 @@ class KafkaApis(val requestChannel: RequestChannel,
             // since failed fetch requests metric is supposed to indicate failure of a broker
in handling a fetch request
             // for a partition it is the leader for
             case utpe: UnknownTopicOrPartitionException =>
-              warn("Fetch request: " + utpe.getMessage)
+              warn("Fetch request with correlation id %d from client %s on partition [%s,%d]
failed due to %s".format(
+                   fetchRequest.correlationId, fetchRequest.clientId, topic, partition, utpe.getMessage))
               new FetchResponsePartitionData(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]),
-1L, MessageSet.Empty)
             case nle: NotLeaderForPartitionException =>
-              warn("Fetch request: " + nle.getMessage)
+              warn("Fetch request with correlation id %d from client %s on partition [%s,%d]
failed due to %s".format(
+                fetchRequest.correlationId, fetchRequest.clientId, topic, partition, nle.getMessage))
               new FetchResponsePartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]),
-1L, MessageSet.Empty)
             case t =>
               BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
@@ -402,10 +406,12 @@ class KafkaApis(val requestChannel: RequestChannel,
         // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are
special cased since these error messages
         // are typically transient and there is no value in logging the entire stack trace
for the same
         case utpe: UnknownTopicOrPartitionException =>
-          warn(utpe.getMessage)
+          warn("Offset request with correlation id %d from client %s on partition %s failed
due to %s".format(
+               offsetRequest.correlationId, offsetRequest.clientId, topicAndPartition, utpe.getMessage))
           (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]),
Nil) )
         case nle: NotLeaderForPartitionException =>
-          warn(nle.getMessage)
+          warn("Offset request with correlation id %d from client %s on partition %s failed
due to %s".format(
+               offsetRequest.correlationId, offsetRequest.clientId, topicAndPartition,nle.getMessage))
           (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]),
Nil) )
         case e =>
           warn("Error while responding to offset request", e)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e4f287db/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 4f6fcd4..3775eb4 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -333,8 +333,8 @@ object ZkUtils extends Logging {
       (true, stat.getVersion)
     } catch {
       case e: Exception =>
-        error("Conditional update of path %s with data %s and expected version %d failed".format(path,
data,
-          expectVersion), e)
+        error("Conditional update of path %s with data %s and expected version %d failed
due to %s".format(path, data,
+          expectVersion, e.getMessage))
         (false, -1)
     }
   }
@@ -352,8 +352,8 @@ object ZkUtils extends Logging {
     } catch {
       case nne: ZkNoNodeException => throw nne
       case e: Exception =>
-        error("Conditional update of path %s with data %s and expected version %d failed".format(path,
data,
-          expectVersion), e)
+        error("Conditional update of path %s with data %s and expected version %d failed
due to %s".format(path, data,
+          expectVersion, e.getMessage))
         (false, -1)
     }
   }


Mime
View raw message