From commits-return-5488-apmail-kafka-commits-archive=kafka.apache.org@kafka.apache.org Thu Dec 15 21:24:04 2016 Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F275519C5C for ; Thu, 15 Dec 2016 21:24:03 +0000 (UTC) Received: (qmail 43415 invoked by uid 500); 15 Dec 2016 21:24:03 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 43384 invoked by uid 500); 15 Dec 2016 21:24:03 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 43371 invoked by uid 99); 15 Dec 2016 21:24:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 15 Dec 2016 21:24:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id ACDC3DFBFF; Thu, 15 Dec 2016 21:24:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jqin@apache.org To: commits@kafka.apache.org Message-Id: <1cd363fc3585439099c7853b51fc33ca@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: KAFKA-4521; MirrorMaker should flush all messages before releasing partition ownership during rebalance Date: Thu, 15 Dec 2016 21:24:03 +0000 (UTC) Repository: kafka Updated Branches: refs/heads/0.10.1 395e85412 -> 7226d6ed9 KAFKA-4521; MirrorMaker should flush all messages before releasing partition ownership during rebalance Author: Dong Lin Author: Dong Lin Reviewers: Jiangjie Qin Closes #2241 from lindong28/KAFKA-4521 (cherry picked from commit 908b6d1148df963d21a70aaa73a7a87571b965a9) Signed-off-by: Jiangjie Qin Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7226d6ed Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7226d6ed Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7226d6ed Branch: refs/heads/0.10.1 Commit: 7226d6ed9d4e61395c624ee875a2b12972ede664 Parents: 395e854 Author: Dong Lin Authored: Thu Dec 15 13:23:01 2016 -0800 Committer: Jiangjie Qin Committed: Thu Dec 15 13:23:47 2016 -0800 ---------------------------------------------------------------------- .../main/scala/kafka/tools/MirrorMaker.scala | 50 ++++++++++++++++++-- 1 file changed, 46 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/7226d6ed/core/src/main/scala/kafka/tools/MirrorMaker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 17b8f0b..114f542 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -470,11 +470,14 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } def maybeFlushAndCommitOffsets() { - if (System.currentTimeMillis() - lastOffsetCommitMs > offsetCommitIntervalMs) { - debug("Committing MirrorMaker state automatically.") + val commitRequested = mirrorMakerConsumer.commitRequested() + if (commitRequested || System.currentTimeMillis() - lastOffsetCommitMs > offsetCommitIntervalMs) { + debug("Committing MirrorMaker state.") producer.flush() commitOffsets(mirrorMakerConsumer) lastOffsetCommitMs = System.currentTimeMillis() + if (commitRequested) + mirrorMakerConsumer.notifyCommit() } } @@ -503,12 +506,16 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { private[kafka] trait MirrorMakerBaseConsumer extends BaseConsumer { def init() + def commitRequested(): Boolean + def notifyCommit() + def requestAndWaitForCommit() def hasData : Boolean } private class MirrorMakerOldConsumer(connector: ZookeeperConsumerConnector, filterSpec: TopicFilter) extends MirrorMakerBaseConsumer { private var iter: ConsumerIterator[Array[Byte], Array[Byte]] = null + private var immediateCommitRequested: Boolean = false override def init() { // Creating one stream per each connector instance @@ -518,6 +525,29 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { iter = stream.iterator() } + override def requestAndWaitForCommit() { + this.synchronized { + // skip wait() if mirrorMakerConsumer has not been initialized + if (iter != null) { + immediateCommitRequested = true + this.wait() + } + } + } + + override def notifyCommit() { + this.synchronized { + immediateCommitRequested = false + this.notifyAll() + } + } + + override def commitRequested(): Boolean = { + this.synchronized { + immediateCommitRequested + } + } + override def hasData = iter.hasNext() override def receive() : BaseConsumerRecord = { @@ -570,6 +600,18 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } } + override def requestAndWaitForCommit() { + // Do nothing + } + + override def notifyCommit() { + // Do nothing + } + + override def commitRequested(): Boolean = { + false + } + override def hasData = true override def receive() : BaseConsumerRecord = { @@ -632,8 +674,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { extends ConsumerRebalanceListener { override def beforeReleasingPartitions(partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]]) { - producer.flush() - commitOffsets(mirrorMakerConsumer) + // The zookeeper listener thread, which executes this method, needs to wait for MirrorMakerThread to flush data and commit offset + mirrorMakerConsumer.requestAndWaitForCommit() // invoke custom consumer rebalance listener customRebalanceListenerForOldConsumer.foreach(_.beforeReleasingPartitions(partitionOwnership)) }