kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-2821; fix deadlock in group metadata write callback
Date Fri, 13 Nov 2015 19:11:59 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 4efe4ac6d -> 002ec9c79


KAFKA-2821; fix deadlock in group metadata write callback

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>

Closes #519 from hachikuji/KAFKA-2821


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/002ec9c7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/002ec9c7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/002ec9c7

Branch: refs/heads/trunk
Commit: 002ec9c796acf8570754522a4eb1b0c3ee9aab13
Parents: 4efe4ac
Author: Jason Gustafson <jason@confluent.io>
Authored: Fri Nov 13 11:11:56 2015 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Fri Nov 13 11:11:56 2015 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/cluster/Partition.scala    | 61 +++++++++++++++-----
 .../kafka/coordinator/GroupCoordinator.scala    | 25 +++++---
 .../coordinator/GroupMetadataManager.scala      | 46 ++++++++-------
 3 files changed, 87 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/002ec9c7/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 59d025b..70c8d99 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -161,8 +161,8 @@ class Partition(val topic: String,
    *  and setting the new leader and ISR
    */
   def makeLeader(controllerId: Int,
-                 partitionStateInfo: PartitionStateInfo, correlationId: Int): Boolean = {
-    inWriteLock(leaderIsrUpdateLock) {
+                 partitionStateInfo: PartitionStateInfo, correlationId: Int) {
+    val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
       val allReplicas = partitionStateInfo.allReplicas
       val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
       val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
@@ -186,8 +186,11 @@ class Partition(val topic: String,
         if (r.brokerId != localBrokerId) r.updateLogReadResult(LogReadResult.UnknownLogReadResult))
       // we may need to increment high watermark since ISR could be down to 1
       maybeIncrementLeaderHW(newLeaderReplica)
-      true
     }
+
+    // some delayed operations may be unblocked after HW changed
+    if (leaderHWIncremented)
+      tryCompleteDelayedRequests()
   }
 
   /**
@@ -255,7 +258,7 @@ class Partition(val topic: String,
    * This function can be triggered when a replica's LEO has incremented
    */
   def maybeExpandIsr(replicaId: Int) {
-    inWriteLock(leaderIsrUpdateLock) {
+    val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
       // check if this replica needs to be added to the ISR
       leaderReplicaIfLocal() match {
         case Some(leaderReplica) =>
@@ -277,9 +280,13 @@ class Partition(val topic: String,
           // since the replica maybe now be in the ISR and its LEO has just incremented
           maybeIncrementLeaderHW(leaderReplica)
 
-        case None => // nothing to do if no longer leader
+        case None => false // nothing to do if no longer leader
       }
     }
+
+    // some delayed operations may be unblocked after HW changed
+    if (leaderHWIncremented)
+      tryCompleteDelayedRequests()
   }
 
   /*
@@ -333,28 +340,36 @@ class Partition(val topic: String,
    * 1. Partition ISR changed
    * 2. Any replica's LEO changed
    *
+   * Returns true if the HW was incremented, and false otherwise.
    * Note There is no need to acquire the leaderIsrUpdate lock here
    * since all callers of this private API acquire that lock
    */
-  private def maybeIncrementLeaderHW(leaderReplica: Replica) {
+  private def maybeIncrementLeaderHW(leaderReplica: Replica): Boolean = {
     val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset)
     val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)
     val oldHighWatermark = leaderReplica.highWatermark
     if(oldHighWatermark.precedes(newHighWatermark)) {
       leaderReplica.highWatermark = newHighWatermark
       debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId,
newHighWatermark))
-      // some delayed operations may be unblocked after HW changed
-      val requestKey = new TopicPartitionOperationKey(this.topic, this.partitionId)
-      replicaManager.tryCompleteDelayedFetch(requestKey)
-      replicaManager.tryCompleteDelayedProduce(requestKey)
+      true
     } else {
       debug("Skipping update high watermark since Old hw %s is larger than new hw %s for
partition [%s,%d]. All leo's are %s"
         .format(oldHighWatermark, newHighWatermark, topic, partitionId, allLogEndOffsets.mkString(",")))
+      false
     }
   }
 
+  /**
+   * Try to complete any pending requests. This should be called without holding the leaderIsrUpdateLock.
+   */
+  private def tryCompleteDelayedRequests() {
+    val requestKey = new TopicPartitionOperationKey(this.topic, this.partitionId)
+    replicaManager.tryCompleteDelayedFetch(requestKey)
+    replicaManager.tryCompleteDelayedProduce(requestKey)
+  }
+
   def maybeShrinkIsr(replicaMaxLagTimeMs: Long) {
-    inWriteLock(leaderIsrUpdateLock) {
+    val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
       leaderReplicaIfLocal() match {
         case Some(leaderReplica) =>
           val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs)
@@ -366,12 +381,20 @@ class Partition(val topic: String,
             // update ISR in zk and in cache
             updateIsr(newInSyncReplicas)
             // we may need to increment high watermark since ISR could be down to 1
-            maybeIncrementLeaderHW(leaderReplica)
+
             replicaManager.isrShrinkRate.mark()
+            maybeIncrementLeaderHW(leaderReplica)
+          } else {
+            false
           }
-        case None => // do nothing if no longer leader
+
+        case None => false // do nothing if no longer leader
       }
     }
+
+    // some delayed operations may be unblocked after HW changed
+    if (leaderHWIncremented)
+      tryCompleteDelayedRequests()
   }
 
   def getOutOfSyncReplicas(leaderReplica: Replica, maxLagMs: Long): Set[Replica] = {
@@ -397,7 +420,7 @@ class Partition(val topic: String,
   }
 
   def appendMessagesToLeader(messages: ByteBufferMessageSet, requiredAcks: Int = 0) = {
-    inReadLock(leaderIsrUpdateLock) {
+    val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
       val leaderReplicaOpt = leaderReplicaIfLocal()
       leaderReplicaOpt match {
         case Some(leaderReplica) =>
@@ -415,13 +438,19 @@ class Partition(val topic: String,
           // probably unblock some follower fetch requests since log end offset has been
updated
           replicaManager.tryCompleteDelayedFetch(new TopicPartitionOperationKey(this.topic,
this.partitionId))
           // we may need to increment high watermark since ISR could be down to 1
-          maybeIncrementLeaderHW(leaderReplica)
-          info
+          (info, maybeIncrementLeaderHW(leaderReplica))
+
         case None =>
           throw new NotLeaderForPartitionException("Leader not local for partition [%s,%d]
on broker %d"
             .format(topic, partitionId, localBrokerId))
       }
     }
+
+    // some delayed operations may be unblocked after HW changed
+    if (leaderHWIncremented)
+      tryCompleteDelayedRequests()
+
+    info
   }
 
   private def updateIsr(newIsr: Set[Replica]) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/002ec9c7/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index e9c3c01..23309c1 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 
 import kafka.common.{OffsetAndMetadata, OffsetMetadataAndError, TopicAndPartition}
 import kafka.log.LogConfig
-import kafka.message.UncompressedCodec
+import kafka.message.{Message, UncompressedCodec}
 import kafka.server._
 import kafka.utils._
 import org.apache.kafka.common.protocol.Errors
@@ -254,6 +254,8 @@ class GroupCoordinator(val brokerId: Int,
                           memberId: String,
                           groupAssignment: Map[String, Array[Byte]],
                           responseCallback: SyncCallback) {
+    var delayedGroupStore: Option[DelayedStore] = None
+
     group synchronized {
       if (!group.has(memberId)) {
         responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code)
@@ -279,9 +281,7 @@ class GroupCoordinator(val brokerId: Int,
               val missing = group.allMembers -- groupAssignment.keySet
               val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
 
-              // persist the group metadata and upon finish transition to stable and propagate
the assignment
-              val generationId = group.generationId
-              groupManager.storeGroup(group, assignment, (errorCode: Short) => {
+              delayedGroupStore = Some(groupManager.prepareStoreGroup(group, assignment,
(errorCode: Short) => {
                 group synchronized {
                   // another member may have joined the group while we were awaiting this
callback,
                   // so we must ensure we are still in the AwaitingSync state and the same
generation
@@ -296,7 +296,7 @@ class GroupCoordinator(val brokerId: Int,
                     }
                   }
                 }
-              })
+              }))
             }
 
           case Stable =>
@@ -307,6 +307,10 @@ class GroupCoordinator(val brokerId: Int,
         }
       }
     }
+
+    // store the group metadata without holding the group lock to avoid the potential
+    // for deadlock when the callback is invoked
+    delayedGroupStore.foreach(groupManager.store)
   }
 
   def handleLeaveGroup(groupId: String, consumerId: String, responseCallback: Short =>
Unit) {
@@ -385,6 +389,8 @@ class GroupCoordinator(val brokerId: Int,
                           generationId: Int,
                           offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata],
                           responseCallback: immutable.Map[TopicAndPartition, Short] =>
Unit) {
+    var delayedOffsetStore: Option[DelayedStore] = None
+
     if (!isActive.get) {
       responseCallback(offsetMetadata.mapValues(_ => Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code))
     } else if (!isCoordinatorForGroup(groupId)) {
@@ -396,7 +402,8 @@ class GroupCoordinator(val brokerId: Int,
       if (group == null) {
         if (generationId < 0)
           // the group is not relying on Kafka for partition management, so allow the commit
-          groupManager.storeOffsets(groupId, memberId, generationId, offsetMetadata, responseCallback)
+          delayedOffsetStore = Some(groupManager.prepareStoreOffsets(groupId, memberId, generationId,
offsetMetadata,
+            responseCallback))
         else
           // the group has failed over to this coordinator (which will be handled in KAFKA-2017),
           // or this is a request coming from an older generation. either way, reject the
commit
@@ -412,11 +419,15 @@ class GroupCoordinator(val brokerId: Int,
           } else if (generationId != group.generationId) {
             responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code))
           } else {
-            groupManager.storeOffsets(groupId, memberId, generationId, offsetMetadata, responseCallback)
+            delayedOffsetStore = Some(groupManager.prepareStoreOffsets(groupId, memberId,
generationId,
+              offsetMetadata, responseCallback))
           }
         }
       }
     }
+
+    // store the offsets without holding the group lock
+    delayedOffsetStore.foreach(groupManager.store)
   }
 
   def handleFetchOffsets(groupId: String,

http://git-wip-us.apache.org/repos/asf/kafka/blob/002ec9c7/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 047970e..4ac2c7a 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -46,6 +46,10 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
 
 import com.yammer.metrics.core.Gauge
 
+
+case class DelayedStore(messageSet: Map[TopicAndPartition, MessageSet],
+                        callback: Map[TopicAndPartition, ProducerResponseStatus] => Unit)
+
 class GroupMetadataManager(val brokerId: Int,
                            val config: OffsetConfig,
                            replicaManager: ReplicaManager,
@@ -165,9 +169,9 @@ class GroupMetadataManager(val brokerId: Int,
     }
   }
 
-  def storeGroup(group: GroupMetadata,
-                 groupAssignment: Map[String, Array[Byte]],
-                 responseCallback: Short => Unit) {
+  def prepareStoreGroup(group: GroupMetadata,
+                        groupAssignment: Map[String, Array[Byte]],
+                        responseCallback: Short => Unit): DelayedStore = {
     // construct the message to append
     val message = new Message(
       key = GroupMetadataManager.groupMetadataKey(group.groupId),
@@ -179,6 +183,8 @@ class GroupMetadataManager(val brokerId: Int,
     val groupMetadataMessageSet = Map(groupMetadataPartition ->
       new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, message))
 
+    val generationId = group.generationId
+
     // set the callback function to insert the created group into cache after log append
completed
     def putCacheCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus])
{
       // the append response should only contain the topics partition
@@ -193,7 +199,7 @@ class GroupMetadataManager(val brokerId: Int,
       var responseCode = Errors.NONE.code
       if (status.error != ErrorMapping.NoError) {
         debug("Metadata from group %s with generation %d failed when appending to log due
to %s"
-          .format(group.groupId, group.generationId, ErrorMapping.exceptionNameFor(status.error)))
+          .format(group.groupId, generationId, ErrorMapping.exceptionNameFor(status.error)))
 
         // transform the log append error code to the corresponding the commit status error
code
         responseCode = if (status.error == ErrorMapping.UnknownTopicOrPartitionCode) {
@@ -205,13 +211,13 @@ class GroupMetadataManager(val brokerId: Int,
           || status.error == ErrorMapping.InvalidFetchSizeCode) {
 
           error("Appending metadata message for group %s generation %d failed due to %s,
returning UNKNOWN error code to the client"
-            .format(group.groupId, group.generationId, ErrorMapping.exceptionNameFor(status.error)))
+            .format(group.groupId, generationId, ErrorMapping.exceptionNameFor(status.error)))
 
           Errors.UNKNOWN.code
         } else {
 
           error("Appending metadata message for group %s generation %d failed due to unexpected
error: %s"
-            .format(group.groupId, group.generationId, status.error))
+            .format(group.groupId, generationId, status.error))
 
           status.error
         }
@@ -220,25 +226,27 @@ class GroupMetadataManager(val brokerId: Int,
       responseCallback(responseCode)
     }
 
+    DelayedStore(groupMetadataMessageSet, putCacheCallback)
+  }
+
+  def store(delayedAppend: DelayedStore) {
     // call replica manager to append the group message
     replicaManager.appendMessages(
       config.offsetCommitTimeoutMs.toLong,
       config.offsetCommitRequiredAcks,
       true, // allow appending to internal offset topic
-      groupMetadataMessageSet,
-      putCacheCallback)
+      delayedAppend.messageSet,
+      delayedAppend.callback)
   }
 
-
-
   /**
    * Store offsets by appending it to the replicated log and then inserting to cache
    */
-  def storeOffsets(groupId: String,
-                   consumerId: String,
-                   generationId: Int,
-                   offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata],
-                   responseCallback: immutable.Map[TopicAndPartition, Short] => Unit)
{
+  def prepareStoreOffsets(groupId: String,
+                          consumerId: String,
+                          generationId: Int,
+                          offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata],
+                          responseCallback: immutable.Map[TopicAndPartition, Short] =>
Unit): DelayedStore = {
     // first filter out partitions with offset metadata size exceeding limit
     val filteredOffsetMetadata = offsetMetadata.filter { case (topicAndPartition, offsetAndMetadata)
=>
       validateOffsetMetadataLength(offsetAndMetadata.metadata)
@@ -304,13 +312,7 @@ class GroupMetadataManager(val brokerId: Int,
       responseCallback(commitStatus)
     }
 
-    // call replica manager to append the offset messages
-    replicaManager.appendMessages(
-      config.offsetCommitTimeoutMs.toLong,
-      config.offsetCommitRequiredAcks,
-      true, // allow appending to internal offset topic
-      offsetsAndMetadataMessageSet,
-      putCacheCallback)
+    DelayedStore(offsetsAndMetadataMessageSet, putCacheCallback)
   }
 
   /**


Mime
View raw message