kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: MINOR: Avoid trace logging computation in `checkEnoughReplicasReachOffset`
Date Tue, 31 May 2016 16:03:38 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 3fd9be49a -> fed3f1f88


MINOR: Avoid trace logging computation in `checkEnoughReplicasReachOffset`

`numAcks` is only used in the `trace` logging statement so it should be a `def` instead of
a `val`. Also took the chance to improve the code and documentation a little.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1449 from ijuma/minor-avoid-trace-logging-computation-in-partition


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fed3f1f8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fed3f1f8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fed3f1f8

Branch: refs/heads/trunk
Commit: fed3f1f8890b219e4247fd9de1305ad18679ff99
Parents: 3fd9be4
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Tue May 31 09:03:18 2016 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Tue May 31 09:03:18 2016 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/cluster/Partition.scala    | 42 ++++++++++----------
 .../scala/kafka/server/DelayedProduce.scala     | 22 ++++------
 2 files changed, 30 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fed3f1f8/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 4e79bdc..ea22e87 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -296,46 +296,48 @@ class Partition(val topic: String,
   }
 
   /*
-   * Note that this method will only be called if requiredAcks = -1
-   * and we are waiting for all replicas in ISR to be fully caught up to
-   * the (local) leader's offset corresponding to this produce request
-   * before we acknowledge the produce request.
+   * Returns a tuple where the first element is a boolean indicating whether enough replicas
reached `requiredOffset`
+   * and the second element is an error (which would be `Errors.NONE` for no error).
+   *
+   * Note that this method will only be called if requiredAcks = -1 and we are waiting for
all replicas in ISR to be
+   * fully caught up to the (local) leader's offset corresponding to this produce request
before we acknowledge the
+   * produce request.
    */
-  def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Short) = {
+  def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Errors) = {
     leaderReplicaIfLocal() match {
       case Some(leaderReplica) =>
         // keep the current immutable replica list reference
         val curInSyncReplicas = inSyncReplicas
-        val numAcks = curInSyncReplicas.count(r => {
+
+        def numAcks = curInSyncReplicas.count { r =>
           if (!r.isLocal)
             if (r.logEndOffset.messageOffset >= requiredOffset) {
-              trace("Replica %d of %s-%d received offset %d".format(r.brokerId, topic, partitionId,
requiredOffset))
+              trace(s"Replica ${r.brokerId} of ${topic}-${partitionId} received offset $requiredOffset")
               true
             }
             else
               false
           else
             true /* also count the local (leader) replica */
-        })
+        }
 
-        trace("%d acks satisfied for %s-%d with acks = -1".format(numAcks, topic, partitionId))
+        trace(s"$numAcks acks satisfied for ${topic}-${partitionId} with acks = -1")
 
         val minIsr = leaderReplica.log.get.config.minInSyncReplicas
 
-        if (leaderReplica.highWatermark.messageOffset >= requiredOffset ) {
+        if (leaderReplica.highWatermark.messageOffset >= requiredOffset) {
           /*
-          * The topic may be configured not to accept messages if there are not enough replicas
in ISR
-          * in this scenario the request was already appended locally and then added to the
purgatory before the ISR was shrunk
-          */
-          if (minIsr <= curInSyncReplicas.size) {
-            (true, Errors.NONE.code)
-          } else {
-            (true, Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND.code)
-          }
+           * The topic may be configured not to accept messages if there are not enough replicas
in ISR
+           * in this scenario the request was already appended locally and then added to
the purgatory before the ISR was shrunk
+           */
+          if (minIsr <= curInSyncReplicas.size)
+            (true, Errors.NONE)
+          else
+            (true, Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND)
         } else
-          (false, Errors.NONE.code)
+          (false, Errors.NONE)
       case None =>
-        (false, Errors.NOT_LEADER_FOR_PARTITION.code)
+        (false, Errors.NOT_LEADER_FOR_PARTITION)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fed3f1f8/core/src/main/scala/kafka/server/DelayedProduce.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala
index be1be4f..5a59d3b 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -82,32 +82,26 @@ class DelayedProduce(delayMs: Long,
   override def tryComplete(): Boolean = {
     // check for each partition if it still has pending acks
     produceMetadata.produceStatus.foreach { case (topicAndPartition, status) =>
-      trace("Checking produce satisfaction for %s, current status %s"
-        .format(topicAndPartition, status))
+      trace(s"Checking produce satisfaction for ${topicAndPartition}, current status $status")
       // skip those partitions that have already been satisfied
       if (status.acksPending) {
-        val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
-        val (hasEnough, errorCode) = partitionOpt match {
+        val (hasEnough, error) = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
match {
           case Some(partition) =>
             partition.checkEnoughReplicasReachOffset(status.requiredOffset)
           case None =>
             // Case A
-            (false, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
+            (false, Errors.UNKNOWN_TOPIC_OR_PARTITION)
         }
-        if (errorCode != Errors.NONE.code) {
-          // Case B.1
+        // Case B.1 || B.2
+        if (error != Errors.NONE || hasEnough) {
           status.acksPending = false
-          status.responseStatus.errorCode = errorCode
-        } else if (hasEnough) {
-          // Case B.2
-          status.acksPending = false
-          status.responseStatus.errorCode = Errors.NONE.code
+          status.responseStatus.errorCode = error.code
         }
       }
     }
 
-    // check if each partition has satisfied at lease one of case A and case B
-    if (! produceMetadata.produceStatus.values.exists(p => p.acksPending))
+    // check if every partition has satisfied at least one of case A or B
+    if (!produceMetadata.produceStatus.values.exists(_.acksPending))
       forceComplete()
     else
       false


Mime
View raw message