This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c050503 KAFKA-7709: Fix ConcurrentModificationException when retrieving expired
inflight batches on multiple partitions. (#6005)
c050503 is described below
commit c05050346468cc27bcfb3a43bde90c47533a386c
Author: Mark Cho <markcho.011@gmail.com>
AuthorDate: Thu Dec 6 16:55:01 2018 -0800
KAFKA-7709: Fix ConcurrentModificationException when retrieving expired inflight batches
on multiple partitions. (#6005)
Reviewers: Dhruvil Shah <dhruvil@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
.../kafka/clients/producer/internals/Sender.java | 10 +++---
.../clients/producer/internals/SenderTest.java | 38 ++++++++++++++++++++++
2 files changed, 44 insertions(+), 4 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 19d7af2..644f456 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -174,8 +174,9 @@ public class Sender implements Runnable {
*/
private List<ProducerBatch> getExpiredInflightBatches(long now) {
List<ProducerBatch> expiredBatches = new ArrayList<>();
- for (Map.Entry<TopicPartition, List<ProducerBatch>> entry : inFlightBatches.entrySet())
{
- TopicPartition topicPartition = entry.getKey();
+
+ for (Iterator<Map.Entry<TopicPartition, List<ProducerBatch>>> batchIt
= inFlightBatches.entrySet().iterator(); batchIt.hasNext();) {
+ Map.Entry<TopicPartition, List<ProducerBatch>> entry = batchIt.next();
List<ProducerBatch> partitionInFlightBatches = entry.getValue();
if (partitionInFlightBatches != null) {
Iterator<ProducerBatch> iter = partitionInFlightBatches.iterator();
@@ -197,8 +198,9 @@ public class Sender implements Runnable {
break;
}
}
- if (partitionInFlightBatches.isEmpty())
- inFlightBatches.remove(topicPartition);
+ if (partitionInFlightBatches.isEmpty()) {
+ batchIt.remove();
+ }
}
}
return expiredBatches;
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 606637e..b3146dd 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -2079,6 +2079,44 @@ public class SenderTest {
}
+ @Test
+ public void testExpiredBatchesInMultiplePartitions() throws Exception {
+ long deliveryTimeoutMs = 1500L;
+ setupWithTransactionState(null, true, null);
+
+ // Send multiple ProduceRequest across multiple partitions.
+ Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(),
"k1".getBytes(), "v1".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+ Future<RecordMetadata> request2 = accumulator.append(tp1, time.milliseconds(),
"k2".getBytes(), "v2".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
+
+ // Send request.
+ sender.run(time.milliseconds());
+ assertEquals(1, client.inFlightRequestCount());
+ assertEquals("Expect one in-flight batch in accumulator", 1, sender.inFlightBatches(tp0).size());
+
+ Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<>();
+ responseMap.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L));
+ client.respond(new ProduceResponse(responseMap));
+
+ // Successfully expire both batches.
+ time.sleep(deliveryTimeoutMs);
+ sender.run(time.milliseconds());
+ assertEquals("Expect zero in-flight batch in accumulator", 0, sender.inFlightBatches(tp0).size());
+
+ try {
+ request1.get();
+ fail("The expired batch should throw a TimeoutException");
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof TimeoutException);
+ }
+
+ try {
+ request2.get();
+ fail("The expired batch should throw a TimeoutException");
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof TimeoutException);
+ }
+ }
+
private class MatchingBufferPool extends BufferPool {
IdentityHashMap<ByteBuffer, Boolean> allocatedBuffers;
|