Repository: kafka Updated Branches: refs/heads/trunk f141e647a -> 85599bc3e KAFKA-3147; Memory records is not writable in MirrorMaker Remove the batch from the RecordAccumulator once its closed while aborting batches. Make sure we don't accept new batch appends to RecordAccumulator while the producer is being closed. Author: Mayuresh Gharat Reviewers: Jiangjie Qin, Ismael Juma, Guozhang Wang Closes #825 from MayureshGharat/KAFKA-3147 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/85599bc3 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/85599bc3 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/85599bc3 Branch: refs/heads/trunk Commit: 85599bc3e861c77cd91c55273debe396da85deeb Parents: f141e64 Author: Mayuresh Gharat Authored: Fri Feb 12 17:15:41 2016 +0800 Committer: Guozhang Wang Committed: Fri Feb 12 17:15:41 2016 +0800 ---------------------------------------------------------------------- .../kafka/clients/producer/internals/RecordAccumulator.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/85599bc3/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index d36234c..3c710c8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -156,11 +156,11 @@ public final class RecordAccumulator { // abortIncompleteBatches(). appendsInProgress.incrementAndGet(); try { - if (closed) - throw new IllegalStateException("Cannot send after the producer is closed."); // check if we have an in-progress batch Deque dq = dequeFor(tp); synchronized (dq) { + if (closed) + throw new IllegalStateException("Cannot send after the producer is closed."); RecordBatch last = dq.peekLast(); if (last != null) { FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds()); @@ -452,6 +452,7 @@ public final class RecordAccumulator { // Close the batch before aborting synchronized (dq) { batch.records.close(); + dq.remove(batch); } batch.done(-1L, new IllegalStateException("Producer is closed forcefully.")); deallocate(batch);