kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-5202: Handle topic deletion while trying to send txn markers
Date Tue, 30 May 2017 21:57:42 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 1e963d04d -> d834e2aa9


KAFKA-5202: Handle topic deletion while trying to send txn markers

Here is the sketch of this proposal:

1. When it is time to send the txn markers, only look for the leader node of the partition
once instead of retrying, and if that information is not available, it means the partition
is highly likely been removed since it was in the cache before. In this case, we just remove
the partition from the metadata object and skip putting into the corresponding queue, and
if all partitions' leader broker are non-available, complete this delayed operation to proceed
to write the complete txn log entry.

2. If the leader id is unknown from the cache but the corresponding node object with the listener
name is not available, it means that the leader is likely unavailable right now. Put it into
a separate queue and let sender thread retry fetching its metadata again each time upon draining
the queue.

One caveat of this approach is the delete-and-recreate case, and the argument is that since
all the messages are deleted anyways when deleting the topic-partition, it does not matter
whether the markers are on the log partitions or not.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Apurva Mehta <apurva@confluent.io>, Damian Guy <damian.guy@gmail.com>,
Jason Gustafson <jason@confluent.io>

Closes #3130 from guozhangwang/K5202-handle-topic-deletion

(cherry picked from commit 80223b14ee092e95d05b40b12631df2d6db7ef53)
Signed-off-by: Jason Gustafson <jason@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d834e2aa
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d834e2aa
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d834e2aa

Branch: refs/heads/0.11.0
Commit: d834e2aa9712461bf6a3c304ec2dee62217e3d19
Parents: 1e963d0
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Tue May 30 14:35:51 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Tue May 30 14:57:39 2017 -0700

----------------------------------------------------------------------
 .../TransactionMarkerChannelManager.scala       |  90 +++++++++---
 .../src/main/scala/kafka/server/KafkaApis.scala |   2 +-
 .../main/scala/kafka/server/MetadataCache.scala |  26 ++--
 .../scala/kafka/server/ReplicaManager.scala     |   1 -
 .../TransactionMarkerChannelManagerTest.scala   | 146 ++++++++++++-------
 5 files changed, 178 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d834e2aa/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 461867d..344863f 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -128,10 +128,9 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
 
   private val markersQueuePerBroker: concurrent.Map[Int, TxnMarkerQueue] = concurrent.TrieMap.empty[Int,
TxnMarkerQueue]
 
-  private val interBrokerListenerName: ListenerName = config.interBrokerListenerName
+  private val markersQueueForUnknownBroker = new TxnMarkerQueue(Node.noNode)
 
-  // TODO: What is reasonable for this
-  private val brokerNotAliveBackoffMs = 10
+  private val interBrokerListenerName: ListenerName = config.interBrokerListenerName
 
   private val txnMarkerSendThread: InterBrokerSendThread = {
     new InterBrokerSendThread("TxnMarkerSenderThread-" + config.brokerId, networkClient,
drainQueuedTransactionMarkers, time)
@@ -156,6 +155,9 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
   }
 
   // visible for testing
+  private[transaction] def queueForUnknownBroker = markersQueueForUnknownBroker
+
+  // visible for testing
   private[transaction] def senderThread = txnMarkerSendThread
 
   private[transaction] def addMarkersForBroker(broker: Node, txnTopicPartition: Int, txnIdAndMarker:
TxnIdAndMarkerEntry) {
@@ -171,6 +173,22 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
   }
 
   private[transaction] def drainQueuedTransactionMarkers(): Iterable[RequestAndCompletionHandler]
= {
+    val txnIdAndMarkerEntries: java.util.List[TxnIdAndMarkerEntry] = new util.ArrayList[TxnIdAndMarkerEntry]()
+    markersQueueForUnknownBroker.forEachTxnTopicPartition { case (_, queue) =>
+      queue.drainTo(txnIdAndMarkerEntries)
+    }
+
+    for (txnIdAndMarker: TxnIdAndMarkerEntry <- txnIdAndMarkerEntries.asScala) {
+      val transactionalId = txnIdAndMarker.txnId
+      val producerId = txnIdAndMarker.txnMarkerEntry.producerId
+      val producerEpoch = txnIdAndMarker.txnMarkerEntry.producerEpoch
+      val txnResult = txnIdAndMarker.txnMarkerEntry.transactionResult
+      val coordinatorEpoch = txnIdAndMarker.txnMarkerEntry.coordinatorEpoch
+      val topicPartitions = txnIdAndMarker.txnMarkerEntry.partitions.asScala.toSet
+
+      addTxnMarkersToBrokerQueue(transactionalId, producerId, producerEpoch, txnResult, coordinatorEpoch,
topicPartitions)
+    }
+
     markersQueuePerBroker.map { case (brokerId: Int, brokerRequestQueue: TxnMarkerQueue)
=>
       val txnIdAndMarkerEntries: java.util.List[TxnIdAndMarkerEntry] = new util.ArrayList[TxnIdAndMarkerEntry]()
       brokerRequestQueue.forEachTxnTopicPartition { case (_, queue) =>
@@ -254,31 +272,65 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
                                  result: TransactionResult, coordinatorEpoch: Int,
                                  topicPartitions: immutable.Set[TopicPartition]): Unit =
{
     val txnTopicPartition = txnStateManager.partitionFor(transactionalId)
-    val partitionsByDestination: immutable.Map[Node, immutable.Set[TopicPartition]] = topicPartitions.groupBy
{ topicPartition: TopicPartition =>
-      var brokerNode: Option[Node] = None
+    val partitionsByDestination: immutable.Map[Option[Node], immutable.Set[TopicPartition]]
= topicPartitions.groupBy { topicPartition: TopicPartition =>
+      metadataCache.getPartitionLeaderEndpoint(topicPartition.topic, topicPartition.partition,
interBrokerListenerName)
+    }
 
-      // TODO: instead of retry until succeed, we can first put it into an unknown broker
queue and let the sender thread to look for its broker and migrate them
-      while (brokerNode.isEmpty) {
-        brokerNode = metadataCache.getPartitionLeaderEndpoint(topicPartition.topic, topicPartition.partition,
interBrokerListenerName)
+    for ((broker: Option[Node], topicPartitions: immutable.Set[TopicPartition]) <- partitionsByDestination)
{
+      broker match {
+        case Some(brokerNode) =>
+          val marker = new TxnMarkerEntry(producerId, producerEpoch, coordinatorEpoch, result,
topicPartitions.toList.asJava)
+          val txnIdAndMarker = TxnIdAndMarkerEntry(transactionalId, marker)
+
+          if (brokerNode == Node.noNode) {
+            // if the leader of the partition is known but node not available, put it into
an unknown broker queue
+            // and let the sender thread to look for its broker and migrate them later
+            markersQueueForUnknownBroker.addMarkers(txnTopicPartition, txnIdAndMarker)
+          } else {
+            addMarkersForBroker(brokerNode, txnTopicPartition, txnIdAndMarker)
+          }
 
-        if (brokerNode.isEmpty) {
-          trace(s"Couldn't find leader endpoint for partition: $topicPartition, retrying.")
-          time.sleep(brokerNotAliveBackoffMs)
-        }
-      }
-      brokerNode.get
-    }
+        case None =>
+          txnStateManager.getAndMaybeAddTransactionState(transactionalId) match {
+            case Left(error) =>
+              info(s"Encountered $error trying to fetch transaction metadata for $transactionalId
with coordinator epoch $coordinatorEpoch; cancel sending markers to its partition leaders")
+              txnMarkerPurgatory.cancelForKey(transactionalId)
 
-    for ((broker: Node, topicPartitions: immutable.Set[TopicPartition]) <- partitionsByDestination)
{
-      val marker = new TxnMarkerEntry(producerId, producerEpoch, coordinatorEpoch, result,
topicPartitions.toList.asJava)
-      val txnIdAndMarker = TxnIdAndMarkerEntry(transactionalId, marker)
-      addMarkersForBroker(broker, txnTopicPartition, txnIdAndMarker)
+            case Right(Some(epochAndMetadata)) =>
+              if (epochAndMetadata.coordinatorEpoch != coordinatorEpoch) {
+                info(s"The cached metadata has changed to $epochAndMetadata (old coordinator
epoch is $coordinatorEpoch) since preparing to send markers; cancel sending markers to its
partition leaders")
+                txnMarkerPurgatory.cancelForKey(transactionalId)
+              } else {
+                // if the leader of the partition is unknown, skip sending the txn marker
since
+                // the partition is likely to be deleted already
+                info(s"Couldn't find leader endpoint for partitions $topicPartitions while
trying to send transaction markers for " +
+                  s"$transactionalId, these partitions are likely deleted already and hence
can be skipped")
+
+                val txnMetadata = epochAndMetadata.transactionMetadata
+
+                txnMetadata synchronized {
+                  topicPartitions.foreach(txnMetadata.removePartition)
+                }
+
+                txnMarkerPurgatory.checkAndComplete(transactionalId)
+              }
+
+            case Right(None) =>
+              throw new IllegalStateException(s"The coordinator still owns the transaction
partition for $transactionalId, but there is " +
+                s"no metadata in the cache; this is not expected")
+          }
+      }
     }
 
     networkClient.wakeup()
   }
 
   def removeMarkersForTxnTopicPartition(txnTopicPartitionId: Int): Unit = {
+    markersQueueForUnknownBroker.removeMarkersForTxnTopicPartition(txnTopicPartitionId).foreach
{ queue =>
+      for (entry: TxnIdAndMarkerEntry <- queue.asScala)
+        removeMarkersForTxnId(entry.txnId)
+    }
+
     markersQueuePerBroker.foreach { case(_, brokerQueue) =>
       brokerQueue.removeMarkersForTxnTopicPartition(txnTopicPartitionId).foreach { queue
=>
         for (entry: TxnIdAndMarkerEntry <- queue.asScala)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d834e2aa/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index dd6f18d..459cb27 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1558,7 +1558,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
       val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics)
=
         partitionsToAdd.asScala.partition { tp =>
-          authorize(request.session, Describe, new Resource(Topic, tp.topic)) &&
metadataCache.contains(tp.topic)
+          authorize(request.session, Describe, new Resource(Topic, tp.topic)) &&
metadataCache.contains(tp)
         }
 
       val unauthorizedForWriteRequestInfo = existingAndAuthorizedForDescribeTopics.filterNot
{ tp =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/d834e2aa/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 4e1cd37..466645b 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -160,20 +160,20 @@ class MetadataCache(brokerId: Int) extends Logging {
     }
   }
 
+  // if the leader is not known, return None;
+  // if the leader is known and corresponding node is available, return Some(node)
+  // if the leader is known but corresponding node with the listener name is not available,
return Some(NO_NODE)
   def getPartitionLeaderEndpoint(topic: String, partitionId: Int, listenerName: ListenerName):
Option[Node] = {
     inReadLock(partitionMetadataLock) {
-      cache.get(topic).flatMap(_.get(partitionId)) match {
-        case Some(partitionInfo) =>
-          val leaderId = partitionInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
-          try {
-            getAliveEndpoint(leaderId, listenerName)
-          } catch {
-            case e: BrokerEndPointNotAvailableException =>
-              None
-          }
-
-        case None =>
-          None
+      cache.get(topic).flatMap(_.get(partitionId)) map { partitionInfo =>
+        val leaderId = partitionInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
+
+        aliveNodes.get(leaderId) match {
+          case Some(nodeMap) =>
+            nodeMap.getOrElse(listenerName, Node.noNode)
+          case None =>
+            Node.noNode
+        }
       }
     }
   }
@@ -235,6 +235,8 @@ class MetadataCache(brokerId: Int) extends Logging {
     }
   }
 
+  def contains(tp: TopicPartition): Boolean = getPartitionInfo(tp.topic, tp.partition).isDefined
+
   private def removePartitionInfo(topic: String, partitionId: Int): Boolean = {
     cache.get(topic).map { infos =>
       infos.remove(partitionId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d834e2aa/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index cc5bfb0..5e1c9c1 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -340,7 +340,6 @@ class ReplicaManager(val config: KafkaConfig,
                     entriesPerPartition: Map[TopicPartition, MemoryRecords],
                     responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
                     delayedProduceLock: Option[Object] = None) {
-
     if (isValidRequiredAcks(requiredAcks)) {
       val sTime = time.milliseconds
       val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d834e2aa/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
index 991bfbe..4015a4f 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
@@ -43,13 +43,16 @@ class TransactionMarkerChannelManagerTest {
   private val transactionalId2 = "txnId2"
   private val producerId1 = 0.asInstanceOf[Long]
   private val producerId2 = 1.asInstanceOf[Long]
-  private val producerId3 = 1.asInstanceOf[Long]
   private val producerEpoch = 0.asInstanceOf[Short]
   private val txnTopicPartition1 = 0
   private val txnTopicPartition2 = 1
   private val coordinatorEpoch = 0
   private val txnTimeoutMs = 0
   private val txnResult = TransactionResult.COMMIT
+  private val txnMetadata1 = new TransactionMetadata(transactionalId1, producerId1, producerEpoch,
txnTimeoutMs,
+    PrepareCommit, mutable.Set[TopicPartition](partition1, partition2), 0L, 0L)
+  private val txnMetadata2 = new TransactionMetadata(transactionalId2, producerId2, producerEpoch,
txnTimeoutMs,
+    PrepareCommit, mutable.Set[TopicPartition](partition1), 0L, 0L)
 
   private val txnMarkerPurgatory = new DelayedOperationPurgatory[DelayedTxnMarker]("txn-purgatory-name",
     new MockTimer,
@@ -73,6 +76,13 @@ class TransactionMarkerChannelManagerTest {
     EasyMock.expect(txnStateManager.partitionFor(transactionalId2))
       .andReturn(txnTopicPartition2)
       .anyTimes()
+    EasyMock.expect(txnStateManager.getAndMaybeAddTransactionState(EasyMock.eq(transactionalId1),
EasyMock.anyObject[Option[TransactionMetadata]]()))
+      .andReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))))
+      .anyTimes()
+    EasyMock.expect(txnStateManager.getAndMaybeAddTransactionState(EasyMock.eq(transactionalId2),
EasyMock.anyObject[Option[TransactionMetadata]]()))
+      .andReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata2))))
+      .anyTimes()
+
     EasyMock.replay(txnStateManager)
   }
 
@@ -82,39 +92,38 @@ class TransactionMarkerChannelManagerTest {
   }
 
   @Test
-  def shouldGenerateRequestPerBroker(): Unit = {
+  def shouldGenerateRequestPerPartitionPerBroker(): Unit = {
     mockCache()
 
     EasyMock.expect(metadataCache.getPartitionLeaderEndpoint(
       EasyMock.eq(partition1.topic),
       EasyMock.eq(partition1.partition),
-      EasyMock.anyObject()))
-      .andReturn(Some(broker1))
-      .anyTimes()
-
+      EasyMock.anyObject())
+    ).andReturn(Some(broker1)).anyTimes()
     EasyMock.expect(metadataCache.getPartitionLeaderEndpoint(
       EasyMock.eq(partition2.topic),
       EasyMock.eq(partition2.partition),
-      EasyMock.anyObject()))
-      .andReturn(Some(broker2))
-      .anyTimes()
+      EasyMock.anyObject())
+    ).andReturn(Some(broker2)).anyTimes()
 
     EasyMock.replay(metadataCache)
 
-    val txnMetadata = new TransactionMetadata(transactionalId1, producerId1, producerEpoch,
txnTimeoutMs,
-      PrepareCommit, mutable.Set[TopicPartition](partition1, partition2), 0L, 0L)
-    channelManager.addTxnMarkersToSend(transactionalId1, coordinatorEpoch, txnResult, txnMetadata,
txnMetadata.prepareComplete(time.milliseconds()))
+    channelManager.addTxnMarkersToSend(transactionalId1, coordinatorEpoch, txnResult, txnMetadata1,
txnMetadata1.prepareComplete(time.milliseconds()))
+    channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, txnResult, txnMetadata2,
txnMetadata2.prepareComplete(time.milliseconds()))
 
-    assertEquals(1, txnMarkerPurgatory.watched)
-    assertEquals(1, channelManager.queueForBroker(broker1.id).get.totalNumMarkers())
-    assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers())
+    assertEquals(2, txnMarkerPurgatory.watched)
+    assertEquals(2, channelManager.queueForBroker(broker1.id).get.totalNumMarkers())
     assertEquals(1, channelManager.queueForBroker(broker1.id).get.totalNumMarkers(txnTopicPartition1))
+    assertEquals(1, channelManager.queueForBroker(broker1.id).get.totalNumMarkers(txnTopicPartition2))
+    assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers())
     assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition1))
+    assertEquals(0, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition2))
 
     val expectedBroker1Request = new WriteTxnMarkersRequest.Builder(
-      Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch,
coordinatorEpoch, TransactionResult.COMMIT, Utils.mkList(partition1)))).build()
+      Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch,
coordinatorEpoch, txnResult, Utils.mkList(partition1)),
+        new WriteTxnMarkersRequest.TxnMarkerEntry(producerId2, producerEpoch, coordinatorEpoch,
txnResult, Utils.mkList(partition1)))).build()
     val expectedBroker2Request = new WriteTxnMarkersRequest.Builder(
-      Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch,
coordinatorEpoch, TransactionResult.COMMIT, Utils.mkList(partition2)))).build()
+      Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch,
coordinatorEpoch, txnResult, Utils.mkList(partition2)))).build()
 
     val requests: Map[Node, WriteTxnMarkersRequest] = senderThread.generateRequests().map
{ handler =>
       (handler.destination, handler.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build())
@@ -125,59 +134,83 @@ class TransactionMarkerChannelManagerTest {
   }
 
   @Test
-  def shouldGenerateRequestPerPartitionPerBroker(): Unit = {
+  def shouldSkipSendMarkersWhenLeaderNotFound(): Unit = {
     mockCache()
 
     EasyMock.expect(metadataCache.getPartitionLeaderEndpoint(
       EasyMock.eq(partition1.topic),
       EasyMock.eq(partition1.partition),
-      EasyMock.anyObject()))
-      .andReturn(Some(broker1))
-      .anyTimes()
+      EasyMock.anyObject())
+    ).andReturn(None).anyTimes()
+    EasyMock.expect(metadataCache.getPartitionLeaderEndpoint(
+      EasyMock.eq(partition2.topic),
+      EasyMock.eq(partition2.partition),
+      EasyMock.anyObject())
+    ).andReturn(Some(broker2)).anyTimes()
 
     EasyMock.replay(metadataCache)
 
-    val txnMetadata1 = new TransactionMetadata(transactionalId1, producerId1, producerEpoch,
txnTimeoutMs,
-      PrepareCommit, mutable.Set[TopicPartition](partition1), 0L, 0L)
-    val txnMetadata2 = new TransactionMetadata(transactionalId2, producerId2, producerEpoch,
txnTimeoutMs,
-      PrepareCommit, mutable.Set[TopicPartition](partition1), 0L, 0L)
     channelManager.addTxnMarkersToSend(transactionalId1, coordinatorEpoch, txnResult, txnMetadata1,
txnMetadata1.prepareComplete(time.milliseconds()))
     channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, txnResult, txnMetadata2,
txnMetadata2.prepareComplete(time.milliseconds()))
 
-    assertEquals(2, txnMarkerPurgatory.watched)
-    assertEquals(2, channelManager.queueForBroker(broker1.id).get.totalNumMarkers())
-    assertEquals(1, channelManager.queueForBroker(broker1.id).get.totalNumMarkers(txnTopicPartition1))
-    assertEquals(1, channelManager.queueForBroker(broker1.id).get.totalNumMarkers(txnTopicPartition2))
-
-    val expectedBroker1Request = new WriteTxnMarkersRequest.Builder(
-      Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch,
coordinatorEpoch, TransactionResult.COMMIT, Utils.mkList(partition1)),
-        new WriteTxnMarkersRequest.TxnMarkerEntry(producerId2, producerEpoch, coordinatorEpoch,
TransactionResult.COMMIT, Utils.mkList(partition1)))).build()
-
-    val requests: Map[Node, WriteTxnMarkersRequest] = senderThread.generateRequests().map
{ handler =>
-      (handler.destination, handler.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build())
-    }.toMap
-
-    assertEquals(Map(broker1 -> expectedBroker1Request), requests)
-    assertTrue(senderThread.generateRequests().isEmpty)
+    assertEquals(1, txnMarkerPurgatory.watched)
+    assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers())
+    assertTrue(channelManager.queueForBroker(broker1.id).isEmpty)
+    assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition1))
+    assertEquals(0, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition2))
   }
 
   @Test
-  def shouldRetryGettingLeaderWhenNotFound(): Unit = {
+  def shouldSaveForLaterWhenLeaderUnknownButNotAvailable(): Unit = {
     mockCache()
 
     EasyMock.expect(metadataCache.getPartitionLeaderEndpoint(
       EasyMock.eq(partition1.topic),
       EasyMock.eq(partition1.partition),
       EasyMock.anyObject())
-    ).andReturn(None)
-     .andReturn(None)
-     .andReturn(Some(broker1))
+    ).andReturn(Some(Node.noNode))
+      .andReturn(Some(Node.noNode))
+      .andReturn(Some(Node.noNode))
+      .andReturn(Some(Node.noNode))
+      .andReturn(Some(broker1))
+      .andReturn(Some(broker1))
+    EasyMock.expect(metadataCache.getPartitionLeaderEndpoint(
+      EasyMock.eq(partition2.topic),
+      EasyMock.eq(partition2.partition),
+      EasyMock.anyObject())
+    ).andReturn(Some(broker2)).anyTimes()
 
     EasyMock.replay(metadataCache)
 
-    channelManager.addTxnMarkersToBrokerQueue(transactionalId1, producerId1, producerEpoch,
TransactionResult.COMMIT, coordinatorEpoch, Set[TopicPartition](partition1))
+    channelManager.addTxnMarkersToSend(transactionalId1, coordinatorEpoch, txnResult, txnMetadata1,
txnMetadata1.prepareComplete(time.milliseconds()))
+    channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, txnResult, txnMetadata2,
txnMetadata2.prepareComplete(time.milliseconds()))
 
-    EasyMock.verify(metadataCache)
+    assertEquals(2, txnMarkerPurgatory.watched)
+    assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers())
+    assertTrue(channelManager.queueForBroker(broker1.id).isEmpty)
+    assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition1))
+    assertEquals(0, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition2))
+    assertEquals(2, channelManager.queueForUnknownBroker.totalNumMarkers())
+    assertEquals(1, channelManager.queueForUnknownBroker.totalNumMarkers(txnTopicPartition1))
+    assertEquals(1, channelManager.queueForUnknownBroker.totalNumMarkers(txnTopicPartition2))
+
+    val expectedBroker1Request = new WriteTxnMarkersRequest.Builder(
+      Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch,
coordinatorEpoch, txnResult, Utils.mkList(partition1)),
+        new WriteTxnMarkersRequest.TxnMarkerEntry(producerId2, producerEpoch, coordinatorEpoch,
txnResult, Utils.mkList(partition1)))).build()
+    val expectedBroker2Request = new WriteTxnMarkersRequest.Builder(
+      Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch,
coordinatorEpoch, txnResult, Utils.mkList(partition2)))).build()
+
+    val firstDrainedRequests: Map[Node, WriteTxnMarkersRequest] = senderThread.generateRequests().map
{ handler =>
+      (handler.destination, handler.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build())
+    }.toMap
+
+    assertEquals(Map(broker2 -> expectedBroker2Request), firstDrainedRequests)
+
+    val secondDrainedRequests: Map[Node, WriteTxnMarkersRequest] = senderThread.generateRequests().map
{ handler =>
+      (handler.destination, handler.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build())
+    }.toMap
+
+    assertEquals(Map(broker1 -> expectedBroker1Request), secondDrainedRequests)
   }
 
   @Test
@@ -187,24 +220,26 @@ class TransactionMarkerChannelManagerTest {
     EasyMock.expect(metadataCache.getPartitionLeaderEndpoint(
       EasyMock.eq(partition1.topic),
       EasyMock.eq(partition1.partition),
-      EasyMock.anyObject()))
-      .andReturn(Some(broker1))
-      .anyTimes()
+      EasyMock.anyObject())
+    ).andReturn(Some(broker1)).anyTimes()
+    EasyMock.expect(metadataCache.getPartitionLeaderEndpoint(
+      EasyMock.eq(partition2.topic),
+      EasyMock.eq(partition2.partition),
+      EasyMock.anyObject())
+    ).andReturn(Some(broker2)).anyTimes()
 
     EasyMock.replay(metadataCache)
 
-    val txnMetadata1 = new TransactionMetadata(transactionalId1, producerId1, producerEpoch,
txnTimeoutMs,
-      PrepareCommit, mutable.Set[TopicPartition](partition1), 0L, 0L)
     channelManager.addTxnMarkersToSend(transactionalId1, coordinatorEpoch, txnResult, txnMetadata1,
txnMetadata1.prepareComplete(time.milliseconds()))
-
-    val txnMetadata2 = new TransactionMetadata(transactionalId2, producerId2, producerEpoch,
txnTimeoutMs,
-      PrepareCommit, mutable.Set[TopicPartition](partition1), 0L, 0L)
     channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, txnResult, txnMetadata2,
txnMetadata2.prepareComplete(time.milliseconds()))
 
     assertEquals(2, txnMarkerPurgatory.watched)
     assertEquals(2, channelManager.queueForBroker(broker1.id).get.totalNumMarkers())
     assertEquals(1, channelManager.queueForBroker(broker1.id).get.totalNumMarkers(txnTopicPartition1))
     assertEquals(1, channelManager.queueForBroker(broker1.id).get.totalNumMarkers(txnTopicPartition2))
+    assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers())
+    assertEquals(1, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition1))
+    assertEquals(0, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition2))
 
     channelManager.removeMarkersForTxnTopicPartition(txnTopicPartition1)
 
@@ -212,5 +247,8 @@ class TransactionMarkerChannelManagerTest {
     assertEquals(1, channelManager.queueForBroker(broker1.id).get.totalNumMarkers())
     assertEquals(0, channelManager.queueForBroker(broker1.id).get.totalNumMarkers(txnTopicPartition1))
     assertEquals(1, channelManager.queueForBroker(broker1.id).get.totalNumMarkers(txnTopicPartition2))
+    assertEquals(0, channelManager.queueForBroker(broker2.id).get.totalNumMarkers())
+    assertEquals(0, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition1))
+    assertEquals(0, channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition2))
   }
 }


Mime
View raw message