Repository: kafka Updated Branches: refs/heads/0.10.1 395e85412 -> 7226d6ed9 KAFKA-4521; MirrorMaker should flush all messages before releasing partition ownership during rebalance Author: Dong Lin Author: Dong Lin Reviewers: Jiangjie Qin Closes #2241 from lindong28/KAFKA-4521 (cherry picked from commit 908b6d1148df963d21a70aaa73a7a87571b965a9) Signed-off-by: Jiangjie Qin Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7226d6ed Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7226d6ed Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7226d6ed Branch: refs/heads/0.10.1 Commit: 7226d6ed9d4e61395c624ee875a2b12972ede664 Parents: 395e854 Author: Dong Lin Authored: Thu Dec 15 13:23:01 2016 -0800 Committer: Jiangjie Qin Committed: Thu Dec 15 13:23:47 2016 -0800 ---------------------------------------------------------------------- .../main/scala/kafka/tools/MirrorMaker.scala | 50 ++++++++++++++++++-- 1 file changed, 46 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/7226d6ed/core/src/main/scala/kafka/tools/MirrorMaker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 17b8f0b..114f542 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -470,11 +470,14 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } def maybeFlushAndCommitOffsets() { - if (System.currentTimeMillis() - lastOffsetCommitMs > offsetCommitIntervalMs) { - debug("Committing MirrorMaker state automatically.") + val commitRequested = mirrorMakerConsumer.commitRequested() + if (commitRequested || System.currentTimeMillis() - lastOffsetCommitMs > offsetCommitIntervalMs) { + debug("Committing MirrorMaker state.") producer.flush() commitOffsets(mirrorMakerConsumer) lastOffsetCommitMs = System.currentTimeMillis() + if (commitRequested) + mirrorMakerConsumer.notifyCommit() } } @@ -503,12 +506,16 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { private[kafka] trait MirrorMakerBaseConsumer extends BaseConsumer { def init() + def commitRequested(): Boolean + def notifyCommit() + def requestAndWaitForCommit() def hasData : Boolean } private class MirrorMakerOldConsumer(connector: ZookeeperConsumerConnector, filterSpec: TopicFilter) extends MirrorMakerBaseConsumer { private var iter: ConsumerIterator[Array[Byte], Array[Byte]] = null + private var immediateCommitRequested: Boolean = false override def init() { // Creating one stream per each connector instance @@ -518,6 +525,29 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { iter = stream.iterator() } + override def requestAndWaitForCommit() { + this.synchronized { + // skip wait() if mirrorMakerConsumer has not been initialized + if (iter != null) { + immediateCommitRequested = true + this.wait() + } + } + } + + override def notifyCommit() { + this.synchronized { + immediateCommitRequested = false + this.notifyAll() + } + } + + override def commitRequested(): Boolean = { + this.synchronized { + immediateCommitRequested + } + } + override def hasData = iter.hasNext() override def receive() : BaseConsumerRecord = { @@ -570,6 +600,18 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } } + override def requestAndWaitForCommit() { + // Do nothing + } + + override def notifyCommit() { + // Do nothing + } + + override def commitRequested(): Boolean = { + false + } + override def hasData = true override def receive() : BaseConsumerRecord = { @@ -632,8 +674,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { extends ConsumerRebalanceListener { override def beforeReleasingPartitions(partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]]) { - producer.flush() - commitOffsets(mirrorMakerConsumer) + // The zookeeper listener thread, which executes this method, needs to wait for MirrorMakerThread to flush data and commit offset + mirrorMakerConsumer.requestAndWaitForCommit() // invoke custom consumer rebalance listener customRebalanceListenerForOldConsumer.foreach(_.beforeReleasingPartitions(partitionOwnership)) }