kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-5269; Retry on unknown topic/partition error in transactional requests
Date Sat, 20 May 2017 17:22:50 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 037f63882 -> 55330cc29


KAFKA-5269; Retry on unknown topic/partition error in transactional requests

We should retry AddPartitionsToTxnRequest and TxnOffsetCommitRequest when receiving an UNKNOWN_TOPIC_OR_PARTITION
error.

As described in the JIRA: It turns out that the `UNKNOWN_TOPIC_OR_PARTITION` is returned from
the request handler in KafkaAPis for the AddPartitionsToTxn and the TxnOffsetCommitRequest
when the broker's metadata doesn't contain one or more partitions in the request. This can
happen for instance when the broker is bounced and has not received the cluster metadata yet.

We should retry in these cases, as this is the model followed by the consumer when committing
offsets, and by the producer with a ProduceRequest.

Author: Apurva Mehta <apurva@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Jason Gustafson <jason@confluent.io>

Closes #3094 from apurvam/KAFKA-5269-handle-unknown-topic-partition-in-transaction-manager


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/55330cc2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/55330cc2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/55330cc2

Branch: refs/heads/0.11.0
Commit: 55330cc2931cc05cb1172cc207ada60154a6250a
Parents: 037f638
Author: Apurva Mehta <apurva@confluent.io>
Authored: Fri May 19 18:51:37 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Sat May 20 10:03:39 2017 -0700

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   |   3 +-
 .../producer/internals/TransactionManager.java  |  10 +-
 .../apache/kafka/common/protocol/Errors.java    |   3 +-
 .../internals/TransactionManagerTest.java       | 154 +++++++++++++++----
 .../kafka/api/TransactionsBounceTest.scala      |   4 +-
 5 files changed, 140 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/55330cc2/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 1ba13b2..ac0169a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -51,6 +51,7 @@ import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.AbstractRecords;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.RecordBatch;
@@ -696,7 +697,7 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
             throw new IllegalStateException("Cannot perform a 'send' before completing a
call to initTransactions when transactions are enabled.");
 
         if (transactionManager.isFenced())
-            throw new ProducerFencedException("The current producer has been fenced off by
a another producer using the same transactional id.");
+            throw Errors.INVALID_PRODUCER_EPOCH.exception();
 
         if (transactionManager.isInErrorState()) {
             String errorMessage =

http://git-wip-us.apache.org/repos/asf/kafka/blob/55330cc2/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 55c1782..c6787f2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -179,7 +179,7 @@ public class TransactionManager {
     public synchronized TransactionalRequestResult beginAbortingTransaction() {
         ensureTransactional();
         if (isFenced())
-            throw new ProducerFencedException("There is a newer producer using the same transactional.id.");
+            throw Errors.INVALID_PRODUCER_EPOCH.exception();
         transitionTo(State.ABORTING_TRANSACTION);
         return beginCompletingTransaction(false);
     }
@@ -424,7 +424,7 @@ public class TransactionManager {
 
     private void maybeFailWithError() {
         if (isFenced())
-            throw new ProducerFencedException("There is a newer producer instance using the
same transactional id.");
+            throw Errors.INVALID_PRODUCER_EPOCH.exception();
         if (isInErrorState()) {
             String errorMessage = "Cannot execute transactional method because we are in
an error state.";
             if (lastError != null)
@@ -631,12 +631,12 @@ public class TransactionManager {
                 if (error == Errors.NONE || error == null) {
                     continue;
                 }
-
                 if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR)
{
                     lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION,
transactionalId);
                     reenqueue();
                     return;
-                } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS)
{
+                } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS
+                        || error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                     reenqueue();
                     return;
                 } else if (error == Errors.INVALID_PRODUCER_EPOCH) {
@@ -848,6 +848,8 @@ public class TransactionManager {
                         coordinatorReloaded = true;
                         lookupCoordinator(FindCoordinatorRequest.CoordinatorType.GROUP, builder.consumerGroupId());
                     }
+                } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+                    hadFailure = true;
                 } else if (error == Errors.INVALID_PRODUCER_EPOCH) {
                     fenced();
                     return;

http://git-wip-us.apache.org/repos/asf/kafka/blob/55330cc2/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index db94b2c..f94fb4d 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -429,7 +429,8 @@ public enum Errors {
                 return new DuplicateSequenceNumberException(message);
             }
         }),
-    INVALID_PRODUCER_EPOCH(47, "Producer attempted an operation with an old epoch",
+    INVALID_PRODUCER_EPOCH(47, "Producer attempted an operation with an old epoch. Either
there is a newer producer " +
+            "with the same transactionalId, or the producer's transaction has been expired
by the broker.",
         new ApiExceptionBuilder() {
             @Override
             public ApiException build(String message) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/55330cc2/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 6a35061..fcf0488 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -190,17 +190,7 @@ public class TransactionManagerTest {
 
         assertFalse(transactionManager.hasPendingOffsetCommits());
 
-        client.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                AddOffsetsToTxnRequest addOffsetsToTxnRequest = (AddOffsetsToTxnRequest)
body;
-                assertEquals(consumerGroupId, addOffsetsToTxnRequest.consumerGroupId());
-                assertEquals(transactionalId, addOffsetsToTxnRequest.transactionalId());
-                assertEquals(pid, addOffsetsToTxnRequest.producerId());
-                assertEquals(epoch, addOffsetsToTxnRequest.producerEpoch());
-                return true;
-            }
-        }, new AddOffsetsToTxnResponse(0, Errors.NONE));
+        prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, pid, epoch);
 
         sender.run(time.milliseconds());  // Send AddOffsetsRequest
         assertTrue(transactionManager.hasPendingOffsetCommits());  // We should now have
created and queued the offset commit request.
@@ -210,17 +200,7 @@ public class TransactionManagerTest {
         txnOffsetCommitResponse.put(tp1, Errors.NONE);
 
         prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP,
consumerGroupId);
-
-        client.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                TxnOffsetCommitRequest txnOffsetCommitRequest = (TxnOffsetCommitRequest)
body;
-                assertEquals(consumerGroupId, txnOffsetCommitRequest.consumerGroupId());
-                assertEquals(pid, txnOffsetCommitRequest.producerId());
-                assertEquals(epoch, txnOffsetCommitRequest.producerEpoch());
-                return true;
-            }
-        }, new TxnOffsetCommitResponse(0, txnOffsetCommitResponse));
+        prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, txnOffsetCommitResponse);
 
         assertEquals(null, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
         sender.run(time.milliseconds());  // try to send TxnOffsetCommitRequest, but find
we don't have a group coordinator.
@@ -542,15 +522,106 @@ public class TransactionManagerTest {
         assertTrue(transactionManager.isReadyForTransaction());  // make sure we are ready
for a transaction now.
     }
 
+    @Test
+    public void testHandlingOfUnknownTopicPartitionErrorOnAddPartitions() throws InterruptedException
{
+        client.setNode(brokerNode);
+        // This is called from the initTransactions method in the producer as the first order
of business.
+        // It finds the coordinator and then gets a PID.
+        final long pid = 13131L;
+        final short epoch = 1;
+        transactionManager.initializeTransactions();
+        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION,
transactionalId);
+
+        sender.run(time.milliseconds());  // find coordinator
+        sender.run(time.milliseconds());
+        assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
+
+        prepareInitPidResponse(Errors.NONE, false, pid, epoch);
+
+        sender.run(time.milliseconds());  // get pid.
+
+        assertTrue(transactionManager.hasProducerId());
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+
+        Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(),
"key".getBytes(),
+                "value".getBytes(), Record.EMPTY_HEADERS, new MockCallback(transactionManager),
MAX_BLOCK_TIMEOUT).future;
+
+        assertFalse(responseFuture.isDone());
+        prepareAddPartitionsToTxnResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, tp0, epoch,
pid);
+
+        sender.run(time.milliseconds());  // Send AddPartitionsRequest
+        assertFalse(transactionManager.transactionContainsPartition(tp0));  // The partition
should not yet be added.
+
+        prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
+        prepareProduceResponse(Errors.NONE, pid, epoch);
+        sender.run(time.milliseconds());  // Send AddPartitionsRequest successfully.
+        assertTrue(transactionManager.transactionContainsPartition(tp0));
+
+        sender.run(time.milliseconds());  // Send ProduceRequest.
+        assertTrue(responseFuture.isDone());
+    }
 
     @Test
-    public void shouldNotAddPartitionsToTransactionWhenTopicAuthorizationFailed() throws
Exception {
-        verifyAddPartitionsFailsWithPartitionLevelError(Errors.TOPIC_AUTHORIZATION_FAILED);
+    public void testHandlingOfUnknownTopicPartitionErrorOnTxnOffsetCommit() throws InterruptedException
{
+        client.setNode(brokerNode);
+        // This is called from the initTransactions method in the producer as the first order
of business.
+        // It finds the coordinator and then gets a PID.
+        final long pid = 13131L;
+        final short epoch = 1;
+        transactionManager.initializeTransactions();
+        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION,
transactionalId);
+
+        sender.run(time.milliseconds());  // find coordinator
+        sender.run(time.milliseconds());
+        assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
+
+        prepareInitPidResponse(Errors.NONE, false, pid, epoch);
+
+        sender.run(time.milliseconds());  // get pid.
+
+        assertTrue(transactionManager.hasProducerId());
+        transactionManager.beginTransaction();
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        offsets.put(tp1, new OffsetAndMetadata(1));
+        final String consumerGroupId = "myconsumergroup";
+
+        TransactionalRequestResult addOffsetsResult = transactionManager.sendOffsetsToTransaction(offsets,
consumerGroupId);
+        prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, pid, epoch);
+
+        sender.run(time.milliseconds());  // send AddOffsetsToTxnResult
+
+        assertFalse(addOffsetsResult.isCompleted());  // The request should complete only
after the TxnOffsetCommit completes.
+
+        Map<TopicPartition, Errors> txnOffsetCommitResponse = new HashMap<>();
+        txnOffsetCommitResponse.put(tp1, Errors.UNKNOWN_TOPIC_OR_PARTITION);
+
+        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP,
consumerGroupId);
+        prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, txnOffsetCommitResponse);
+
+        assertEquals(null, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
+        sender.run(time.milliseconds());  // try to send TxnOffsetCommitRequest, but find
we don't have a group coordinator.
+        sender.run(time.milliseconds());  // send find coordinator for group request
+        assertNotNull(transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
+        assertTrue(transactionManager.hasPendingOffsetCommits());
+
+        sender.run(time.milliseconds());  // send TxnOffsetCommitRequest request.
+
+        assertTrue(transactionManager.hasPendingOffsetCommits());  // The TxnOffsetCommit
failed.
+        assertFalse(addOffsetsResult.isCompleted());  // We should only be done after both
RPCs complete successfully.
+
+        txnOffsetCommitResponse.put(tp1, Errors.NONE);
+        prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, txnOffsetCommitResponse);
+        sender.run(time.milliseconds());  // Send TxnOffsetCommitRequest again.
+
+        assertTrue(addOffsetsResult.isCompleted());
+        assertTrue(addOffsetsResult.isSuccessful());
     }
 
     @Test
-    public void shouldNotAddPartitionsToTransactionWhenUnknownTopicOrPartition() throws Exception
{
-        verifyAddPartitionsFailsWithPartitionLevelError(Errors.UNKNOWN_TOPIC_OR_PARTITION);
+    public void shouldNotAddPartitionsToTransactionWhenTopicAuthorizationFailed() throws
Exception {
+        verifyAddPartitionsFailsWithPartitionLevelError(Errors.TOPIC_AUTHORIZATION_FAILED);
     }
 
     private void verifyAddPartitionsFailsWithPartitionLevelError(final Errors error) throws
InterruptedException {
@@ -679,10 +750,41 @@ public class TransactionManagerTest {
         }, new EndTxnResponse(0, error));
     }
 
+    private void prepareAddOffsetsToTxnResponse(Errors error, final String consumerGroupId,
final long producerId,
+                                                final short producerEpoch) {
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                AddOffsetsToTxnRequest addOffsetsToTxnRequest = (AddOffsetsToTxnRequest)
body;
+                assertEquals(consumerGroupId, addOffsetsToTxnRequest.consumerGroupId());
+                assertEquals(transactionalId, addOffsetsToTxnRequest.transactionalId());
+                assertEquals(producerId, addOffsetsToTxnRequest.producerId());
+                assertEquals(producerEpoch, addOffsetsToTxnRequest.producerEpoch());
+                return true;
+            }
+        }, new AddOffsetsToTxnResponse(0, error));
+    }
+
+    private void prepareTxnOffsetCommitResponse(final String consumerGroupId, final long
producerId,
+                                                final short producerEpoch, Map<TopicPartition,
Errors> txnOffsetCommitResponse) {
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                TxnOffsetCommitRequest txnOffsetCommitRequest = (TxnOffsetCommitRequest)
body;
+                assertEquals(consumerGroupId, txnOffsetCommitRequest.consumerGroupId());
+                assertEquals(producerId, txnOffsetCommitRequest.producerId());
+                assertEquals(producerEpoch, txnOffsetCommitRequest.producerEpoch());
+                return true;
+            }
+        }, new TxnOffsetCommitResponse(0, txnOffsetCommitResponse));
+
+    }
+
     private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error,
int throttleTimeMs) {
         ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error,
offset, RecordBatch.NO_TIMESTAMP);
         Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = Collections.singletonMap(tp,
resp);
         return new ProduceResponse(partResp, throttleTimeMs);
     }
 
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/55330cc2/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
index f1fd365..110e680 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
@@ -72,7 +72,7 @@ class TransactionsBounceTest extends KafkaServerTestHarness {
       .map(KafkaConfig.fromProps(_, overridingProps))
   }
 
-  @Ignore  // need to fix KAFKA-5268 and KAFKA-5269 before re-enabling
+  @Ignore  // Disabling this as it is flaky on Jenkins.
   @Test
   def testBrokerFailure() {
     // basic idea is to seed a topic with 10000 records, and copy it transactionally while
bouncing brokers
@@ -99,7 +99,7 @@ class TransactionsBounceTest extends KafkaServerTestHarness {
         val records = TestUtils.pollUntilAtLeastNumRecords(consumer, toRead)
         trace(s"received ${records.size} messages. sending them transactionally to $outputTopic")
         producer.beginTransaction()
-        val shouldAbort = iteration % 10 == 0
+        val shouldAbort = iteration % 2 == 0
         records.zipWithIndex.foreach { case (record, i) =>
           producer.send(
             TestUtils.producerRecordWithExpectedTransactionStatus(outputTopic, record.key,
record.value, !shouldAbort),


Mime
View raw message