kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-5610; WriteTxnMarker handler should return UNKNOWN_TOPIC_OR_PARTITION if replica is not available
Date Thu, 20 Jul 2017 20:08:15 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 8ebe41175 -> 930eefdab


KAFKA-5610; WriteTxnMarker handler should return UNKNOWN_TOPIC_OR_PARTITION if replica is
not available

Before this patch, we would instead return the non-retriable `UNSUPPORTED_FOR_MESSAGE_FORMAT`
causing markers to be lost.

Author: Apurva Mehta <apurva@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>

Closes #3550 from apurvam/KAFKA-5610-handleWriteTxnMarker-should-handle-emigration

(cherry picked from commit cdff011fe02cb8e14cb5bf226e47b83cbbec8f3c)
Signed-off-by: Jason Gustafson <jason@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/930eefda
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/930eefda
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/930eefda

Branch: refs/heads/0.11.0
Commit: 930eefdab9a0fb7151f8382ca157a3ed9626fd4a
Parents: 8ebe411
Author: Apurva Mehta <apurva@confluent.io>
Authored: Thu Jul 20 13:05:40 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Thu Jul 20 13:08:10 2017 -0700

----------------------------------------------------------------------
 .../src/main/scala/kafka/server/KafkaApis.scala | 23 ++++----
 .../scala/unit/kafka/server/KafkaApisTest.scala | 57 ++++++++++++++++++++
 2 files changed, 71 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/930eefda/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 31c3d5a..5911d4d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1543,24 +1543,29 @@ class KafkaApis(val requestChannel: RequestChannel,
     var skippedMarkers = 0
     for (marker <- markers.asScala) {
       val producerId = marker.producerId
-      val (goodPartitions, partitionsWithIncorrectMessageFormat) = marker.partitions.asScala.partition
{ partition =>
+      val partitionsWithCompatibleMessageFormat = new mutable.ArrayBuffer[TopicPartition]
+
+      val currentErrors = new ConcurrentHashMap[TopicPartition, Errors]()
+      marker.partitions.asScala.foreach { partition =>
         replicaManager.getMagic(partition) match {
-          case Some(magic) if magic >= RecordBatch.MAGIC_VALUE_V2 => true
-          case _ => false
+          case Some(magic) =>
+            if (magic < RecordBatch.MAGIC_VALUE_V2)
+              currentErrors.put(partition, Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT)
+            else
+              partitionsWithCompatibleMessageFormat += partition
+          case None =>
+            currentErrors.put(partition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
         }
       }
 
-      if (partitionsWithIncorrectMessageFormat.nonEmpty) {
-        val currentErrors = new ConcurrentHashMap[TopicPartition, Errors]()
-        partitionsWithIncorrectMessageFormat.foreach { partition => currentErrors.put(partition,
Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT) }
+      if (!currentErrors.isEmpty)
         updateErrors(producerId, currentErrors)
-      }
 
-      if (goodPartitions.isEmpty) {
+      if (partitionsWithCompatibleMessageFormat.isEmpty) {
         numAppends.decrementAndGet()
         skippedMarkers += 1
       } else {
-        val controlRecords = goodPartitions.map { partition =>
+        val controlRecords = partitionsWithCompatibleMessageFormat.map { partition =>
           val controlRecordType = marker.transactionResult match {
             case TransactionResult.COMMIT => ControlRecordType.COMMIT
             case TransactionResult.ABORT => ControlRecordType.ABORT

http://git-wip-us.apache.org/repos/asf/kafka/blob/930eefda/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index b48c3d0..86bd135 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -140,6 +140,25 @@ class KafkaApisTest {
   }
 
   @Test
+  def shouldRespondWithUnknownTopicWhenPartitionIsNotHosted(): Unit = {
+    val topicPartition = new TopicPartition("t", 0)
+    val (writeTxnMarkersRequest, request) = createWriteTxnMarkersRequest(Utils.mkList(topicPartition))
+    val expectedErrors = Map(topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION).asJava
+    val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture()
+
+    EasyMock.expect(replicaManager.getMagic(topicPartition))
+      .andReturn(None)
+    EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+    EasyMock.replay(replicaManager, replicaQuotaManager, requestChannel)
+
+    createKafkaApis().handleWriteTxnMarkersRequest(request)
+
+    val markersResponse = readResponse(ApiKeys.WRITE_TXN_MARKERS, writeTxnMarkersRequest,
capturedResponse)
+      .asInstanceOf[WriteTxnMarkersResponse]
+    assertEquals(expectedErrors, markersResponse.errors(1))
+  }
+
+  @Test
   def shouldRespondWithUnsupportedMessageFormatForBadPartitionAndNoErrorsForGoodPartition():
Unit = {
     val tp1 = new TopicPartition("t", 0)
     val tp2 = new TopicPartition("t1", 0)
@@ -178,6 +197,44 @@ class KafkaApisTest {
   }
 
   @Test
+  def shouldRespondWithUnknownTopicOrPartitionForBadPartitionAndNoErrorsForGoodPartition():
Unit = {
+    val tp1 = new TopicPartition("t", 0)
+    val tp2 = new TopicPartition("t1", 0)
+    val (writeTxnMarkersRequest, request) = createWriteTxnMarkersRequest(Utils.mkList(tp1,
tp2))
+    val expectedErrors = Map(tp1 -> Errors.UNKNOWN_TOPIC_OR_PARTITION, tp2 -> Errors.NONE).asJava
+
+    val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture()
+    val responseCallback: Capture[Map[TopicPartition, PartitionResponse] => Unit]  = EasyMock.newCapture()
+
+    EasyMock.expect(replicaManager.getMagic(tp1))
+      .andReturn(None)
+    EasyMock.expect(replicaManager.getMagic(tp2))
+      .andReturn(Some(RecordBatch.MAGIC_VALUE_V2))
+
+    EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
+      EasyMock.anyShort(),
+      EasyMock.eq(true),
+      EasyMock.eq(false),
+      EasyMock.anyObject(),
+      EasyMock.capture(responseCallback),
+      EasyMock.anyObject())).andAnswer(new IAnswer[Unit] {
+      override def answer(): Unit = {
+        responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE)))
+      }
+    })
+
+    EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+    EasyMock.replay(replicaManager, replicaQuotaManager, requestChannel)
+
+    createKafkaApis().handleWriteTxnMarkersRequest(request)
+
+    val markersResponse = readResponse(ApiKeys.WRITE_TXN_MARKERS, writeTxnMarkersRequest,
capturedResponse)
+      .asInstanceOf[WriteTxnMarkersResponse]
+    assertEquals(expectedErrors, markersResponse.errors(1))
+    EasyMock.verify(replicaManager)
+  }
+
+  @Test
   def shouldAppendToLogOnWriteTxnMarkersWhenCorrectMagicVersion(): Unit = {
     val topicPartition = new TopicPartition("t", 0)
     val request = createWriteTxnMarkersRequest(Utils.mkList(topicPartition))._2


Mime
View raw message