kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6768; Transactional producer may hang in close with pending requests (#4842)
Date Mon, 09 Apr 2018 22:39:15 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0a8f35b  KAFKA-6768; Transactional producer may hang in close with pending requests
(#4842)
0a8f35b is described below

commit 0a8f35b68415bb3f79b0ec61df6cb0ab0db937c4
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Mon Apr 9 15:39:07 2018 -0700

    KAFKA-6768; Transactional producer may hang in close with pending requests (#4842)
    
    This patch fixes an edge case in producer shutdown which prevents `close()` from completing
due to a pending request which will never be sent due to shutdown initiation. I have added
a test case which reproduces the scenario.
    
    Reviewers: Apurva Mehta <apurva@confluent.io>, Ismael Juma <ismael@juma.me.uk>
---
 .../kafka/clients/producer/internals/Sender.java     |  2 +-
 .../producer/internals/TransactionManagerTest.java   | 20 ++++++++++++++++++++
 2 files changed, 21 insertions(+), 1 deletion(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 426b273..0514c99 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -329,7 +329,7 @@ public class Sender implements Runnable {
             return false;
 
         AbstractRequest.Builder<?> requestBuilder = nextRequestHandler.requestBuilder();
-        while (running) {
+        while (!forceClose) {
             Node targetNode = null;
             try {
                 if (nextRequestHandler.needsCoordinator()) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 6fcf480..558ec72 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -133,6 +133,26 @@ public class TransactionManagerTest {
     }
 
     @Test
+    public void testSenderShutdownWithPendingAddPartitions() throws Exception {
+        long pid = 13131L;
+        short epoch = 1;
+        doInitTransactions(pid, epoch);
+        transactionManager.beginTransaction();
+
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+        FutureRecordMetadata sendFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
+
+        prepareAddPartitionsToTxn(tp0, Errors.NONE);
+        prepareProduceResponse(Errors.NONE, pid, epoch);
+
+        sender.initiateClose();
+        sender.run();
+
+        assertTrue(sendFuture.isDone());
+    }
+
+    @Test
     public void testEndTxnNotSentIfIncompleteBatches() {
         long pid = 13131L;
         short epoch = 1;

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.

Mime
View raw message