kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7709: Fix ConcurrentModificationException when retrieving expired inflight batches on multiple partitions. (#6005)
Date Fri, 07 Dec 2018 00:55:13 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c050503  KAFKA-7709: Fix ConcurrentModificationException when retrieving expired
inflight batches on multiple partitions. (#6005)
c050503 is described below

commit c05050346468cc27bcfb3a43bde90c47533a386c
Author: Mark Cho <markcho.011@gmail.com>
AuthorDate: Thu Dec 6 16:55:01 2018 -0800

    KAFKA-7709: Fix ConcurrentModificationException when retrieving expired inflight batches
on multiple partitions. (#6005)
    
    Reviewers: Dhruvil Shah <dhruvil@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../kafka/clients/producer/internals/Sender.java   | 10 +++---
 .../clients/producer/internals/SenderTest.java     | 38 ++++++++++++++++++++++
 2 files changed, 44 insertions(+), 4 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 19d7af2..644f456 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -174,8 +174,9 @@ public class Sender implements Runnable {
      */
     private List<ProducerBatch> getExpiredInflightBatches(long now) {
         List<ProducerBatch> expiredBatches = new ArrayList<>();
-        for (Map.Entry<TopicPartition, List<ProducerBatch>> entry : inFlightBatches.entrySet())
{
-            TopicPartition topicPartition = entry.getKey();
+
+        for (Iterator<Map.Entry<TopicPartition, List<ProducerBatch>>> batchIt
= inFlightBatches.entrySet().iterator(); batchIt.hasNext();) {
+            Map.Entry<TopicPartition, List<ProducerBatch>> entry = batchIt.next();
             List<ProducerBatch> partitionInFlightBatches = entry.getValue();
             if (partitionInFlightBatches != null) {
                 Iterator<ProducerBatch> iter = partitionInFlightBatches.iterator();
@@ -197,8 +198,9 @@ public class Sender implements Runnable {
                         break;
                     }
                 }
-                if (partitionInFlightBatches.isEmpty())
-                    inFlightBatches.remove(topicPartition);
+                if (partitionInFlightBatches.isEmpty()) {
+                    batchIt.remove();
+                }
             }
         }
         return expiredBatches;
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 606637e..b3146dd 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -2079,6 +2079,44 @@ public class SenderTest {
 
     }
 
+    @Test
+    public void testExpiredBatchesInMultiplePartitions() throws Exception {
+        long deliveryTimeoutMs = 1500L;
+        setupWithTransactionState(null, true, null);
+
+        // Send multiple ProduceRequest across multiple partitions.
+        Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(),
"k1".getBytes(), "v1".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+        Future<RecordMetadata> request2 = accumulator.append(tp1, time.milliseconds(),
"k2".getBytes(), "v2".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+
+        // Send request.
+        sender.run(time.milliseconds());
+        assertEquals(1, client.inFlightRequestCount());
+        assertEquals("Expect one in-flight batch in accumulator", 1, sender.inFlightBatches(tp0).size());
+
+        Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<>();
+        responseMap.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L));
+        client.respond(new ProduceResponse(responseMap));
+
+        // Successfully expire both batches.
+        time.sleep(deliveryTimeoutMs);
+        sender.run(time.milliseconds());
+        assertEquals("Expect zero in-flight batch in accumulator", 0, sender.inFlightBatches(tp0).size());
+
+        try {
+            request1.get();
+            fail("The expired batch should throw a TimeoutException");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof TimeoutException);
+        }
+
+        try {
+            request2.get();
+            fail("The expired batch should throw a TimeoutException");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof TimeoutException);
+        }
+    }
+
     private class MatchingBufferPool extends BufferPool {
         IdentityHashMap<ByteBuffer, Boolean> allocatedBuffers;
 


Mime
View raw message