From commits-return-3592-apmail-kafka-commits-archive=kafka.apache.org@kafka.apache.org Fri Feb 12 09:15:53 2016 Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3404C188DC for ; Fri, 12 Feb 2016 09:15:53 +0000 (UTC) Received: (qmail 70740 invoked by uid 500); 12 Feb 2016 09:15:53 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 70710 invoked by uid 500); 12 Feb 2016 09:15:53 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 70701 invoked by uid 99); 12 Feb 2016 09:15:53 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 12 Feb 2016 09:15:53 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E039EE03CD; Fri, 12 Feb 2016 09:15:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: guozhang@apache.org To: commits@kafka.apache.org Message-Id: <6a5f6e5f362f4f87ad313db4c83008aa@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: KAFKA-3147; Memory records is not writable in MirrorMaker Date: Fri, 12 Feb 2016 09:15:52 +0000 (UTC) Repository: kafka Updated Branches: refs/heads/trunk f141e647a -> 85599bc3e KAFKA-3147; Memory records is not writable in MirrorMaker Remove the batch from the RecordAccumulator once its closed while aborting batches. Make sure we don't accept new batch appends to RecordAccumulator while the producer is being closed. Author: Mayuresh Gharat Reviewers: Jiangjie Qin, Ismael Juma, Guozhang Wang Closes #825 from MayureshGharat/KAFKA-3147 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/85599bc3 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/85599bc3 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/85599bc3 Branch: refs/heads/trunk Commit: 85599bc3e861c77cd91c55273debe396da85deeb Parents: f141e64 Author: Mayuresh Gharat Authored: Fri Feb 12 17:15:41 2016 +0800 Committer: Guozhang Wang Committed: Fri Feb 12 17:15:41 2016 +0800 ---------------------------------------------------------------------- .../kafka/clients/producer/internals/RecordAccumulator.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/85599bc3/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index d36234c..3c710c8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -156,11 +156,11 @@ public final class RecordAccumulator { // abortIncompleteBatches(). appendsInProgress.incrementAndGet(); try { - if (closed) - throw new IllegalStateException("Cannot send after the producer is closed."); // check if we have an in-progress batch Deque dq = dequeFor(tp); synchronized (dq) { + if (closed) + throw new IllegalStateException("Cannot send after the producer is closed."); RecordBatch last = dq.peekLast(); if (last != null) { FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds()); @@ -452,6 +452,7 @@ public final class RecordAccumulator { // Close the batch before aborting synchronized (dq) { batch.records.close(); + dq.remove(batch); } batch.done(-1L, new IllegalStateException("Producer is closed forcefully.")); deallocate(batch);