kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-8061; Handle concurrent ProducerId reset and call to Sender thread shutdown (#6388)
Date Thu, 07 Mar 2019 22:28:40 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1f525cd  KAFKA-8061; Handle concurrent ProducerId reset and call to Sender thread
shutdown (#6388)
1f525cd is described below

commit 1f525cd2c90ec81fa9c3967d1303a905d1afccb2
Author: Manikumar Reddy <manikumar.reddy@gmail.com>
AuthorDate: Fri Mar 8 03:58:21 2019 +0530

    KAFKA-8061; Handle concurrent ProducerId reset and call to Sender thread shutdown (#6388)
    
    In KAFKA-5503, we have added a check  for `running` flag in the loop inside maybeWaitForProducerId.
 This is to handle concurrent call to Sender close(), while we attempt to get the ProducerId.
This avoids blocking indefinitely when the producer is shutting down.
    
    This created a corner case, where Sender thread gets blocked, if we had concurrent producerId
reset and call to Sender thread close. The fix here is to check the `forceClose` flag in the
loop inside maybeWaitForProducerId instead of the `running` flag.
    
    Reviewers: Jason Gustafson <jason@confluent.io>
---
 .../kafka/clients/producer/internals/Sender.java   |  2 +-
 .../clients/producer/internals/SenderTest.java     | 75 ++++++++++++++++++++++
 2 files changed, 76 insertions(+), 1 deletion(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 3de2eb4..d003f4d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -494,7 +494,7 @@ public class Sender implements Runnable {
     }
 
     private void maybeWaitForProducerId() {
-        while (running && !transactionManager.hasProducerId() && !transactionManager.hasError())
{
+        while (!forceClose && !transactionManager.hasProducerId() && !transactionManager.hasError())
{
             Node node = null;
             try {
                 node = awaitLeastLoadedNodeReady(requestTimeoutMs);
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 8d599c9..0aa752e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -84,6 +84,7 @@ import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.DelayedReceive;
 import org.apache.kafka.test.MockSelector;
+import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -1197,6 +1198,80 @@ public class SenderTest {
     }
 
     @Test
+    public void testCloseWithProducerIdReset() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short)
0));
+        setupWithTransactionState(transactionManager);
+
+        Metrics m = new Metrics();
+        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+        Sender sender = new Sender(logContext, client, metadata, this.accumulator, true,
MAX_REQUEST_SIZE, ACKS_ALL, 10,
+            senderMetrics, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
+
+        Future<RecordMetadata> failedResponse = accumulator.append(tp0, time.milliseconds(),
"key".getBytes(),
+            "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        Future<RecordMetadata> successfulResponse = accumulator.append(tp1, time.milliseconds(),
"key".getBytes(),
+            "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());  // connect and send.
+
+        assertEquals(1, client.inFlightRequestCount());
+
+        Map<TopicPartition, OffsetAndError> responses = new LinkedHashMap<>();
+        responses.put(tp1, new OffsetAndError(-1, Errors.NOT_LEADER_FOR_PARTITION));
+        responses.put(tp0, new OffsetAndError(-1, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
+        client.respond(produceResponse(responses));
+        sender.initiateClose(); // initiate close
+        sender.run(time.milliseconds());
+        assertTrue(failedResponse.isDone());
+        assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException",
transactionManager.hasProducerId());
+
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                prepareInitPidResponse(Errors.NONE, producerId + 1, (short) 1);
+                sender.run(time.milliseconds());
+                return !accumulator.hasUndrained();
+            }
+        }, 5000, "Failed to drain batches");
+    }
+
+    @Test
+    public void testForceCloseWithProducerIdReset() throws Exception {
+        TransactionManager transactionManager = new TransactionManager();
+        transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(1L, (short) 0));
+        setupWithTransactionState(transactionManager);
+
+        Metrics m = new Metrics();
+        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+        Sender sender = new Sender(logContext, client, metadata, this.accumulator, true,
MAX_REQUEST_SIZE, ACKS_ALL, 10,
+            senderMetrics, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
+
+        Future<RecordMetadata> failedResponse = accumulator.append(tp0, time.milliseconds(),
"key".getBytes(),
+            "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        Future<RecordMetadata> successfulResponse = accumulator.append(tp1, time.milliseconds(),
"key".getBytes(),
+            "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());  // connect and send.
+
+        assertEquals(1, client.inFlightRequestCount());
+
+        Map<TopicPartition, OffsetAndError> responses = new LinkedHashMap<>();
+        responses.put(tp1, new OffsetAndError(-1, Errors.NOT_LEADER_FOR_PARTITION));
+        responses.put(tp0, new OffsetAndError(-1, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
+        client.respond(produceResponse(responses));
+        sender.run(time.milliseconds());
+        assertTrue(failedResponse.isDone());
+        assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException",
transactionManager.hasProducerId());
+        sender.forceClose(); // initiate force close
+        sender.run(time.milliseconds()); // this should not block
+        sender.run(); // run main loop to test forceClose flag
+        assertTrue("Pending batches are not aborted.", !accumulator.hasUndrained());
+        assertTrue(successfulResponse.isDone());
+    }
+
+    @Test
     public void testBatchesDrainedWithOldProducerIdShouldFailWithOutOfOrderSequenceOnSubsequentRetry()
throws Exception {
         final long producerId = 343434L;
         TransactionManager transactionManager = new TransactionManager();


Mime
View raw message