kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.0 updated: KAFKA-7296; Handle coordinator loading error in TxnOffsetCommit (#5514)
Date Thu, 16 Aug 2018 17:06:50 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 1b0340b  KAFKA-7296; Handle coordinator loading error in TxnOffsetCommit (#5514)
1b0340b is described below

commit 1b0340b83288b5cf0e69342796323ab084251d2b
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Thu Aug 16 09:37:32 2018 -0700

    KAFKA-7296; Handle coordinator loading error in TxnOffsetCommit (#5514)
    
    We should check TxnOffsetCommit responses for the COORDINATOR_LOADING_IN_PROGRESS error
code and retry if we see it. Additionally, if we encounter an abortable error, we need to
ensure that pending transaction offset commits are cleared.
    
    Reviewers: Viktor Somogyi <viktorsomogyi@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
---
 .../producer/internals/TransactionManager.java     | 33 ++++++++++----------
 .../producer/internals/TransactionManagerTest.java | 35 +++++++++++++++-------
 2 files changed, 41 insertions(+), 27 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index b242d5a..c0685c9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -1272,50 +1272,49 @@ public class TransactionManager {
         public void handleResponse(AbstractResponse response) {
             TxnOffsetCommitResponse txnOffsetCommitResponse = (TxnOffsetCommitResponse) response;
             boolean coordinatorReloaded = false;
-            boolean hadFailure = false;
             Map<TopicPartition, Errors> errors = txnOffsetCommitResponse.errors();
 
+            log.debug("Received TxnOffsetCommit response for consumer group {}: {}", builder.consumerGroupId(),
+                    errors);
+
             for (Map.Entry<TopicPartition, Errors> entry : errors.entrySet()) {
                 TopicPartition topicPartition = entry.getKey();
                 Errors error = entry.getValue();
                 if (error == Errors.NONE) {
-                    log.debug("Successfully added offsets {} from consumer group {} to transaction.",
-                            builder.offsets(), builder.consumerGroupId());
                     pendingTxnOffsetCommits.remove(topicPartition);
                 } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
                         || error == Errors.NOT_COORDINATOR
                         || error == Errors.REQUEST_TIMED_OUT) {
-                    hadFailure = true;
                     if (!coordinatorReloaded) {
                         coordinatorReloaded = true;
                         lookupCoordinator(FindCoordinatorRequest.CoordinatorType.GROUP, builder.consumerGroupId());
                     }
-                } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
-                    hadFailure = true;
+                } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION
+                        || error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
+                    // If the topic is unknown or the coordinator is loading, retry with
the current coordinator
+                    continue;
                 } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                     abortableError(new GroupAuthorizationException(builder.consumerGroupId()));
-                    return;
+                    break;
                 } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED
                         || error == Errors.INVALID_PRODUCER_EPOCH
                         || error == Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT) {
                     fatalError(error.exception());
-                    return;
+                    break;
                 } else {
                     fatalError(new KafkaException("Unexpected error in TxnOffsetCommitResponse:
" + error.message()));
-                    return;
+                    break;
                 }
             }
 
-            if (!hadFailure || !result.isSuccessful()) {
-                // all attempted partitions were either successful, or there was a fatal
failure.
-                // either way, we are not retrying, so complete the request.
+            if (result.isCompleted()) {
+                pendingTxnOffsetCommits.clear();
+            } else if (pendingTxnOffsetCommits.isEmpty()) {
                 result.done();
-                return;
-            }
-
-            // retry the commits which failed with a retriable error.
-            if (!pendingTxnOffsetCommits.isEmpty())
+            } else {
+                // Retry the commits which failed with a retriable error
                 reenqueue();
+            }
         }
     }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 558ec72..d2bc18e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -125,7 +125,8 @@ public class TransactionManagerTest {
         Metrics metrics = new Metrics(metricConfig, time);
         SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(metrics);
 
-        this.accumulator = new RecordAccumulator(logContext, batchSize, 1024 * 1024, CompressionType.NONE,
0L, 0L, metrics, time, apiVersions, transactionManager);
+        this.accumulator = new RecordAccumulator(logContext, batchSize, 1024 * 1024, CompressionType.NONE,
+                0L, 0L, metrics, time, apiVersions, transactionManager);
         this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator,
true, MAX_REQUEST_SIZE, ACKS_ALL,
                 MAX_RETRIES, senderMetrics, this.time, REQUEST_TIMEOUT, 50, transactionManager,
apiVersions);
         this.metadata.update(this.cluster, Collections.<String>emptySet(), time.milliseconds());
@@ -916,13 +917,13 @@ public class TransactionManagerTest {
         final String consumerGroupId = "consumer";
         final long pid = 13131L;
         final short epoch = 1;
-        final TopicPartition tp = new TopicPartition("foo", 0);
+        final TopicPartition tp1 = new TopicPartition("foo", 0);
 
         doInitTransactions(pid, epoch);
 
         transactionManager.beginTransaction();
         TransactionalRequestResult sendOffsetsResult = transactionManager.sendOffsetsToTransaction(
-                singletonMap(tp, new OffsetAndMetadata(39L)), consumerGroupId);
+                singletonMap(tp1, new OffsetAndMetadata(39L)), consumerGroupId);
 
         prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, pid, epoch);
         sender.run(time.milliseconds());  // AddOffsetsToTxn Handled, TxnOffsetCommit Enqueued
@@ -931,7 +932,7 @@ public class TransactionManagerTest {
         prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.GROUP, consumerGroupId);
         sender.run(time.milliseconds());  // FindCoordinator Returned
 
-        prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, singletonMap(tp, Errors.GROUP_AUTHORIZATION_FAILED));
+        prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, singletonMap(tp1, Errors.GROUP_AUTHORIZATION_FAILED));
         sender.run(time.milliseconds());  // TxnOffsetCommit Handled
 
         assertTrue(transactionManager.hasError());
@@ -939,6 +940,7 @@ public class TransactionManagerTest {
         assertTrue(sendOffsetsResult.isCompleted());
         assertFalse(sendOffsetsResult.isSuccessful());
         assertTrue(sendOffsetsResult.error() instanceof GroupAuthorizationException);
+        assertFalse(transactionManager.hasPendingOffsetCommits());
 
         GroupAuthorizationException exception = (GroupAuthorizationException) sendOffsetsResult.error();
         assertEquals(consumerGroupId, exception.groupId());
@@ -1749,7 +1751,16 @@ public class TransactionManagerTest {
     }
 
     @Test
-    public void testHandlingOfUnknownTopicPartitionErrorOnTxnOffsetCommit() throws InterruptedException
{
+    public void testHandlingOfUnknownTopicPartitionErrorOnTxnOffsetCommit() {
+        testRetriableErrorInTxnOffsetCommit(Errors.UNKNOWN_TOPIC_OR_PARTITION);
+    }
+
+    @Test
+    public void testHandlingOfCoordinatorLoadingErrorOnTxnOffsetCommit() {
+        testRetriableErrorInTxnOffsetCommit(Errors.COORDINATOR_LOAD_IN_PROGRESS);
+    }
+
+    private void testRetriableErrorInTxnOffsetCommit(Errors error) {
         final long pid = 13131L;
         final short epoch = 1;
 
@@ -1758,6 +1769,7 @@ public class TransactionManagerTest {
         transactionManager.beginTransaction();
 
         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        offsets.put(tp0, new OffsetAndMetadata(1));
         offsets.put(tp1, new OffsetAndMetadata(1));
         final String consumerGroupId = "myconsumergroup";
 
@@ -1769,12 +1781,13 @@ public class TransactionManagerTest {
         assertFalse(addOffsetsResult.isCompleted());  // The request should complete only
after the TxnOffsetCommit completes.
 
         Map<TopicPartition, Errors> txnOffsetCommitResponse = new HashMap<>();
-        txnOffsetCommitResponse.put(tp1, Errors.UNKNOWN_TOPIC_OR_PARTITION);
+        txnOffsetCommitResponse.put(tp0, Errors.NONE);
+        txnOffsetCommitResponse.put(tp1, error);
 
         prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.GROUP, consumerGroupId);
         prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, txnOffsetCommitResponse);
 
-        assertEquals(null, transactionManager.coordinator(CoordinatorType.GROUP));
+        assertNull(transactionManager.coordinator(CoordinatorType.GROUP));
         sender.run(time.milliseconds());  // try to send TxnOffsetCommitRequest, but find
we don't have a group coordinator.
         sender.run(time.milliseconds());  // send find coordinator for group request
         assertNotNull(transactionManager.coordinator(CoordinatorType.GROUP));
@@ -1799,7 +1812,7 @@ public class TransactionManagerTest {
     }
 
     @Test
-    public void shouldNotSendAbortTxnRequestWhenOnlyAddPartitionsRequestFailed() throws Exception
{
+    public void shouldNotSendAbortTxnRequestWhenOnlyAddPartitionsRequestFailed() {
         final long pid = 13131L;
         final short epoch = 1;
 
@@ -2395,7 +2408,9 @@ public class TransactionManagerTest {
         };
     }
 
-    private void prepareAddOffsetsToTxnResponse(Errors error, final String consumerGroupId,
final long producerId,
+    private void prepareAddOffsetsToTxnResponse(final Errors error,
+                                                final String consumerGroupId,
+                                                final long producerId,
                                                 final short producerEpoch) {
         client.prepareResponse(new MockClient.RequestMatcher() {
             @Override
@@ -2445,7 +2460,7 @@ public class TransactionManagerTest {
 
     private void assertAbortableError(Class<? extends RuntimeException> cause) {
         try {
-            transactionManager.beginTransaction();
+            transactionManager.beginCommit();
             fail("Should have raised " + cause.getSimpleName());
         } catch (KafkaException e) {
             assertTrue(cause.isAssignableFrom(e.getCause().getClass()));


Mime
View raw message