kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.0 updated: KAFKA-8061; Handle concurrent ProducerId reset and call to Sender thread shutdown (#6388)
Date Thu, 07 Mar 2019 23:15:28 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new a3da6fb  KAFKA-8061; Handle concurrent ProducerId reset and call to Sender thread
shutdown (#6388)
a3da6fb is described below

commit a3da6fba1ee2626cb154df0a7381c02ff2f9470b
Author: Manikumar Reddy <manikumar.reddy@gmail.com>
AuthorDate: Fri Mar 8 03:58:21 2019 +0530

    KAFKA-8061; Handle concurrent ProducerId reset and call to Sender thread shutdown (#6388)
    
    In KAFKA-5503, we have added a check  for `running` flag in the loop inside maybeWaitForProducerId.
 This is to handle concurrent call to Sender close(), while we attempt to get the ProducerId.
This avoids blocking indefinitely when the producer is shutting down.
    
    This created a corner case, where Sender thread gets blocked, if we had concurrent producerId
reset and call to Sender thread close. The fix here is to check the `forceClose` flag in the
loop inside maybeWaitForProducerId instead of the `running` flag.
    
    Reviewers: Jason Gustafson <jason@confluent.io>
---
 .../kafka/clients/producer/internals/Sender.java   |  2 +-
 .../clients/producer/internals/SenderTest.java     | 75 ++++++++++++++++++++++
 2 files changed, 76 insertions(+), 1 deletion(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index b68ce36..a0fcad0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -422,7 +422,7 @@ public class Sender implements Runnable {
     }
 
     private void maybeWaitForProducerId() {
-        while (running && !transactionManager.hasProducerId() && !transactionManager.hasError())
{
+        while (!forceClose && !transactionManager.hasProducerId() && !transactionManager.hasError())
{
             try {
                 Node node = awaitLeastLoadedNodeReady(requestTimeoutMs);
                 if (node != null) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 87ce243..de4ce5f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -64,6 +64,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.DelayedReceive;
 import org.apache.kafka.test.MockSelector;
+import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -1156,6 +1157,80 @@ public class SenderTest {
     }
 
     @Test
+    public void testCloseWithProducerIdReset() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short)
0));
+        setupWithTransactionState(transactionManager);
+
+        Metrics m = new Metrics();
+        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+        Sender sender = new Sender(logContext, client, metadata, this.accumulator, true,
MAX_REQUEST_SIZE, ACKS_ALL, 10,
+            senderMetrics, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
+
+        Future<RecordMetadata> failedResponse = accumulator.append(tp0, time.milliseconds(),
"key".getBytes(),
+            "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        Future<RecordMetadata> successfulResponse = accumulator.append(tp1, time.milliseconds(),
"key".getBytes(),
+            "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());  // connect and send.
+
+        assertEquals(1, client.inFlightRequestCount());
+
+        Map<TopicPartition, OffsetAndError> responses = new LinkedHashMap<>();
+        responses.put(tp1, new OffsetAndError(-1, Errors.NOT_LEADER_FOR_PARTITION));
+        responses.put(tp0, new OffsetAndError(-1, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
+        client.respond(produceResponse(responses));
+        sender.initiateClose(); // initiate close
+        sender.run(time.milliseconds());
+        assertTrue(failedResponse.isDone());
+        assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException",
transactionManager.hasProducerId());
+
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                prepareInitPidResponse(Errors.NONE, producerId + 1, (short) 1);
+                sender.run(time.milliseconds());
+                return !accumulator.hasUndrained();
+            }
+        }, 5000, "Failed to drain batches");
+    }
+
+    @Test
+    public void testForceCloseWithProducerIdReset() throws Exception {
+        TransactionManager transactionManager = new TransactionManager();
+        transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(1L, (short) 0));
+        setupWithTransactionState(transactionManager);
+
+        Metrics m = new Metrics();
+        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+        Sender sender = new Sender(logContext, client, metadata, this.accumulator, true,
MAX_REQUEST_SIZE, ACKS_ALL, 10,
+            senderMetrics, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
+
+        Future<RecordMetadata> failedResponse = accumulator.append(tp0, time.milliseconds(),
"key".getBytes(),
+            "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        Future<RecordMetadata> successfulResponse = accumulator.append(tp1, time.milliseconds(),
"key".getBytes(),
+            "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());  // connect and send.
+
+        assertEquals(1, client.inFlightRequestCount());
+
+        Map<TopicPartition, OffsetAndError> responses = new LinkedHashMap<>();
+        responses.put(tp1, new OffsetAndError(-1, Errors.NOT_LEADER_FOR_PARTITION));
+        responses.put(tp0, new OffsetAndError(-1, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
+        client.respond(produceResponse(responses));
+        sender.run(time.milliseconds());
+        assertTrue(failedResponse.isDone());
+        assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException",
transactionManager.hasProducerId());
+        sender.forceClose(); // initiate force close
+        sender.run(time.milliseconds()); // this should not block
+        sender.run(); // run main loop to test forceClose flag
+        assertTrue("Pending batches are not aborted.", !accumulator.hasUndrained());
+        assertTrue(successfulResponse.isDone());
+    }
+
+    @Test
     public void testBatchesDrainedWithOldProducerIdShouldFailWithOutOfOrderSequenceOnSubsequentRetry()
throws Exception {
         final long producerId = 343434L;
         TransactionManager transactionManager = new TransactionManager();


Mime
View raw message