kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch 2.1 updated: KAFKA-7501: Fix producer batch double deallocation when receiving message too large error on expired batch (#5807)
Date Mon, 22 Oct 2018 09:52:09 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 9e47dc0  KAFKA-7501: Fix producer batch double deallocation when receiving message
too large error on expired batch (#5807)
9e47dc0 is described below

commit 9e47dc0fcc84b925f33bd8985ea570d5aed0bf51
Author: Xiongqi Wu <xiongqi.wu@gmail.com>
AuthorDate: Mon Oct 22 01:36:24 2018 -0700

    KAFKA-7501: Fix producer batch double deallocation when receiving message too large error
on expired batch (#5807)
    
    Minor clean-ups for clarity included.
    
    Reviewers: Dong Lin <lindong28@gmail.com>, Ismael Juma <ismael@juma.me.uk>
---
 .../clients/producer/internals/ProducerBatch.java  | 10 ++++++--
 .../kafka/clients/producer/internals/Sender.java   |  8 +++---
 .../clients/producer/internals/SenderTest.java     | 30 ++++++++++++++++++++++
 3 files changed, 42 insertions(+), 6 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index 6e08185..0adbbf9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -72,9 +72,8 @@ public final class ProducerBatch {
     private long lastAttemptMs;
     private long lastAppendTime;
     private long drainedMs;
-    private String expiryErrorMessage;
     private boolean retry;
-    private boolean reopened = false;
+    private boolean reopened;
 
     public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long createdMs)
{
         this(tp, recordsBuilder, createdMs, false);
@@ -157,6 +156,13 @@ public final class ProducerBatch {
     }
 
     /**
+     * Return `true` if {@link #done(long, long, RuntimeException)} has been invoked at least
once, `false` otherwise.
+     */
+    public boolean isDone() {
+        return finalState() != null;
+    }
+
+    /**
      * Finalize the state of a batch. Final state, once set, is immutable. This function
may be called
      * once or twice on a batch. It may be called twice if
      * 1. An inflight batch expires before a response from the broker is received. The batch's
final
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index c50a85f..19d7af2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -184,9 +184,9 @@ public class Sender implements Runnable {
                     if (batch.hasReachedDeliveryTimeout(accumulator.getDeliveryTimeoutMs(),
now)) {
                         iter.remove();
                         // expireBatches is called in Sender.sendProducerData, before client.poll.
-                        // The batch.finalState() == null invariant should always hold. An
IllegalStateException
+                        // The !batch.isDone() invariant should always hold. An IllegalStateException
                         // exception will be thrown if the invariant is violated.
-                        if (batch.finalState() == null) {
+                        if (!batch.isDone()) {
                             expiredBatches.add(batch);
                         } else {
                             throw new IllegalStateException(batch.topicPartition + " batch
created at " +
@@ -576,7 +576,7 @@ public class Sender implements Runnable {
                                long now, long throttleUntilTimeMs) {
         Errors error = response.error;
 
-        if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 &&
+        if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 &&
!batch.isDone() &&
                 (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed()))
{
             // If the batch is too large, we split the batch and send the split batches again.
We do not decrement
             // the retry attempts in this case.
@@ -726,7 +726,7 @@ public class Sender implements Runnable {
     private boolean canRetry(ProducerBatch batch, ProduceResponse.PartitionResponse response,
long now) {
         return !batch.hasReachedDeliveryTimeout(accumulator.getDeliveryTimeoutMs(), now)
&&
             batch.attempts() < this.retries &&
-            batch.finalState() == null &&
+            !batch.isDone() &&
             ((response.error.exception() instanceof RetriableException) ||
                 (transactionManager != null && transactionManager.canRetry(response,
batch)));
     }
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 23ca2ae..8a8ddd3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -2037,6 +2037,36 @@ public class SenderTest {
     }
 
     @Test
+    public void testExpiredBatchDoesNotSplitOnMessageTooLargeError() throws Exception {
+        long deliverTimeoutMs = 1500L;
+        // create a producer batch with more than one record so it is eligible to split
+        Future<RecordMetadata> request1 =
+            accumulator.append(tp0, time.milliseconds(), "key1".getBytes(), "value1".getBytes(),
null, null,
+                MAX_BLOCK_TIMEOUT).future;
+        Future<RecordMetadata> request2 =
+            accumulator.append(tp0, time.milliseconds(), "key2".getBytes(), "value2".getBytes(),
null, null,
+                MAX_BLOCK_TIMEOUT).future;
+
+        sender.run(time.milliseconds());  // send request
+        assertEquals(1, client.inFlightRequestCount());
+        // return a MESSAGE_TOO_LARGE error
+        client.respond(produceResponse(tp0, -1, Errors.MESSAGE_TOO_LARGE, -1));
+
+        time.sleep(deliverTimeoutMs);
+        // expire the batch and process the response
+        sender.run(time.milliseconds());
+        assertTrue(request1.isDone());
+        assertTrue(request2.isDone());
+        assertEquals(0, client.inFlightRequestCount());
+        assertEquals(0, sender.inFlightBatches(tp0).size());
+
+        // run again and must not split big batch and resend anything.
+        sender.run(time.milliseconds());
+        assertEquals(0, client.inFlightRequestCount());
+        assertEquals(0, sender.inFlightBatches(tp0).size());
+    }
+
+    @Test
     public void testResetNextBatchExpiry() throws Exception {
         client = spy(new MockClient(time));
 


Mime
View raw message