kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-3849; Add explanation on why polling every second in MirrorMaker is required
Date Sat, 09 Jul 2016 01:51:20 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a5da2a4f4 -> 2dfbf4b5f


KAFKA-3849; Add explanation on why polling every second in MirrorMaker is required

Author: Ashish Singh <asingh@cloudera.com>

Reviewers: Jason Gustafson <jason@confluent.io>, Grant Henke <granthenke@gmail.com>,
Ismael Juma <ismael@juma.me.uk>

Closes #1515 from SinghAsDev/KAFKA-3849


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2dfbf4b5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2dfbf4b5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2dfbf4b5

Branch: refs/heads/trunk
Commit: 2dfbf4b5fdd7698a83a1c3d078a846f3674235f4
Parents: a5da2a4
Author: Ashish Singh <asingh@cloudera.com>
Authored: Sat Jul 9 02:32:38 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Sat Jul 9 02:32:38 2016 +0100

----------------------------------------------------------------------
 core/src/main/scala/kafka/tools/MirrorMaker.scala | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2dfbf4b5/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 645882d..f800032 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -121,7 +121,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
         .ofType(classOf[String])
 
       val offsetCommitIntervalMsOpt = parser.accepts("offset.commit.interval.ms",
-        "Offset commit interval in ms")
+        "Offset commit interval in ms.")
         .withRequiredArg()
         .describedAs("offset commit interval in millisecond")
         .ofType(classOf[java.lang.Integer])
@@ -134,7 +134,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
         .ofType(classOf[String])
 
       val rebalanceListenerArgsOpt = parser.accepts("rebalance.listener.args",
-        "Arguments used by custom rebalance listener for mirror maker consumer")
+        "Arguments used by custom rebalance listener for mirror maker consumer.")
         .withRequiredArg()
         .describedAs("Arguments passed to custom rebalance listener constructor as a string.")
         .ofType(classOf[String])
@@ -554,6 +554,11 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
 
     override def receive() : BaseConsumerRecord = {
       if (recordIter == null || !recordIter.hasNext) {
+        // In scenarios where data does not arrive within offsetCommitIntervalMs and
+        // offsetCommitIntervalMs is less than poll's timeout, offset commit will be delayed
for any
+        // uncommitted record since last poll. Using one second as poll's timeout ensures
that
+        // offsetCommitIntervalMs, of value greater than 1 second, does not see delays in
offset
+        // commit.
         recordIter = consumer.poll(1000).iterator
         if (!recordIter.hasNext)
           throw new ConsumerTimeoutException


Mime
View raw message