kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch trunk updated: MINOR: Some logging improvements for debugging delayed produce status (#4691)
Date Mon, 19 Mar 2018 11:08:19 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7041e76  MINOR: Some logging improvements for debugging delayed produce status (#4691)
7041e76 is described below

commit 7041e76bd6ef0c28ec8de2d07f2208171745f936
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Mon Mar 19 04:08:12 2018 -0700

    MINOR: Some logging improvements for debugging delayed produce status (#4691)
    
    A few small logging improvements which help debugging replication issues.
---
 core/src/main/scala/kafka/cluster/Partition.scala  | 29 +++++++++-------------
 .../main/scala/kafka/server/DelayedProduce.scala   | 10 ++++----
 .../scala/kafka/server/LogOffsetMetadata.scala     |  2 +-
 .../main/scala/kafka/tools/DumpLogSegments.scala   |  2 +-
 4 files changed, 19 insertions(+), 24 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 68faf00..6eb9611 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -404,22 +404,16 @@ class Partition(val topic: String,
         // keep the current immutable replica list reference
         val curInSyncReplicas = inSyncReplicas
 
-        def numAcks = curInSyncReplicas.count { r =>
-          if (!r.isLocal)
-            if (r.logEndOffset.messageOffset >= requiredOffset) {
-              trace(s"Replica ${r.brokerId} received offset $requiredOffset")
-              true
-            }
-            else
-              false
-          else
-            true /* also count the local (leader) replica */
+        if (isTraceEnabled) {
+          def logEndOffsetString(r: Replica) = s"broker ${r.brokerId}: ${r.logEndOffset.messageOffset}"
+          val (ackedReplicas, awaitingReplicas) = curInSyncReplicas.partition { replica =>
+            replica.logEndOffset.messageOffset >= requiredOffset
+          }
+          trace(s"Progress awaiting ISR acks for offset $requiredOffset: acked: ${ackedReplicas.map(logEndOffsetString)},
" +
+            s"awaiting ${awaitingReplicas.map(logEndOffsetString)}")
         }
 
-        trace(s"$numAcks acks satisfied with acks = -1")
-
         val minIsr = leaderReplica.log.get.config.minInSyncReplicas
-
         if (leaderReplica.highWatermark.messageOffset >= requiredOffset) {
           /*
            * The topic may be configured not to accept messages if there are not enough replicas
in ISR
@@ -468,9 +462,10 @@ class Partition(val topic: String,
       leaderReplica.highWatermark = newHighWatermark
       debug(s"High watermark updated to $newHighWatermark")
       true
-    } else  {
-      debug(s"Skipping update high watermark since new hw $newHighWatermark is not larger
than old hw $oldHighWatermark." +
-        s"All LEOs are ${allLogEndOffsets.mkString(",")}")
+    } else {
+      def logEndOffsetString(r: Replica) = s"replica ${r.brokerId}: ${r.logEndOffset}"
+      debug(s"Skipping update high watermark since new hw $newHighWatermark is not larger
than old hw $oldHighWatermark. " +
+        s"All current LEOs are ${assignedReplicas.map(logEndOffsetString)}")
       false
     }
   }
@@ -482,7 +477,7 @@ class Partition(val topic: String,
    */
   def lowWatermarkIfLeader: Long = {
     if (!isLeaderReplicaLocal)
-      throw new NotLeaderForPartitionException("Leader not local for partition %s on broker
%d".format(topicPartition, localBrokerId))
+      throw new NotLeaderForPartitionException(s"Leader not local for partition $topicPartition
on broker $localBrokerId")
     val logStartOffsets = allReplicas.collect {
       case replica if replicaManager.metadataCache.isBrokerAlive(replica.brokerId) || replica.brokerId
== Request.FutureLocalReplicaId => replica.logStartOffset
     }
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala
index 718ed24..dbecba4 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -34,8 +34,8 @@ import scala.collection._
 case class ProducePartitionStatus(requiredOffset: Long, responseStatus: PartitionResponse)
{
   @volatile var acksPending = false
 
-  override def toString = "[acksPending: %b, error: %d, startOffset: %d, requiredOffset:
%d]"
-    .format(acksPending, responseStatus.error.code, responseStatus.baseOffset, requiredOffset)
+  override def toString = s"[acksPending: $acksPending, error: ${responseStatus.error.code},
" +
+    s"startOffset: ${responseStatus.baseOffset}, requiredOffset: $requiredOffset]"
 }
 
 /**
@@ -44,8 +44,7 @@ case class ProducePartitionStatus(requiredOffset: Long, responseStatus:
Partitio
 case class ProduceMetadata(produceRequiredAcks: Short,
                            produceStatus: Map[TopicPartition, ProducePartitionStatus]) {
 
-  override def toString = "[requiredAcks: %d, partitionStatus: %s]"
-    .format(produceRequiredAcks, produceStatus)
+  override def toString = s"[requiredAcks: $produceRequiredAcks, partitionStatus: $produceStatus]"
 }
 
 /**
@@ -69,7 +68,7 @@ class DelayedProduce(delayMs: Long,
       status.acksPending = false
     }
 
-    trace("Initial partition status for %s is %s".format(topicPartition, status))
+    trace(s"Initial partition status for $topicPartition is $status")
   }
 
   /**
@@ -116,6 +115,7 @@ class DelayedProduce(delayMs: Long,
   override def onExpiration() {
     produceMetadata.produceStatus.foreach { case (topicPartition, status) =>
       if (status.acksPending) {
+        debug(s"Expiring produce request for partition $topicPartition with status $status")
         DelayedProduceMetrics.recordExpiration(topicPartition)
       }
     }
diff --git a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
index edc010e..effbaa0 100644
--- a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
+++ b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
@@ -79,6 +79,6 @@ case class LogOffsetMetadata(messageOffset: Long,
     segmentBaseOffset == LogOffsetMetadata.UnknownSegBaseOffset && relativePositionInSegment
== LogOffsetMetadata.UnknownFilePosition
   }
 
-  override def toString = messageOffset.toString + " [" + segmentBaseOffset + " : " + relativePositionInSegment
+ "]"
+  override def toString = s"(offset=$messageOffset segment=[$segmentBaseOffset:$relativePositionInSegment])"
 
 }
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 2fc203a..e96e1ad 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -417,7 +417,7 @@ object DumpLogSegments {
         }
       } else {
         if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
-          print("baseOffset: " + batch.baseOffset + " lastOffset: " + batch.lastOffset +
+          print("baseOffset: " + batch.baseOffset + " lastOffset: " + batch.lastOffset +
" count: " + batch.countOrNull +
             " baseSequence: " + batch.baseSequence + " lastSequence: " + batch.lastSequence
+
             " producerId: " + batch.producerId + " producerEpoch: " + batch.producerEpoch
+
             " partitionLeaderEpoch: " + batch.partitionLeaderEpoch + " isTransactional: "
+ batch.isTransactional)

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.

Mime
View raw message