kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lind...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7211; MM should handle TimeoutException in commitSync
Date Tue, 04 Sep 2018 20:56:26 GMT
This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 79a608b  KAFKA-7211; MM should handle TimeoutException in commitSync
79a608b is described below

commit 79a608b286fd6271b841d3cc7997bf69227de912
Author: huxihx <huxi_2b@hotmail.com>
AuthorDate: Tue Sep 4 13:55:49 2018 -0700

    KAFKA-7211; MM should handle TimeoutException in commitSync
    
    With KIP-266 introduced, MirrorMaker should handle TimeoutException thrown in commitSync().
Besides, MM should only commit offsets for existsing topics.
    
    Author: huxihx <huxi_2b@hotmail.com>
    
    Reviewers: Dong Lin <lindong28@gmail.com>
    
    Closes #5492 from huxihx/KAFKA-7211
---
 core/src/main/scala/kafka/tools/MirrorMaker.scala  | 67 +++++++++++++++-------
 .../kafka/tools/MirrorMakerIntegrationTest.scala   | 30 ++++++++++
 2 files changed, 76 insertions(+), 21 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index d55d96b..b6fd918 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -34,17 +34,18 @@ import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord,
RecordMetadata}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.errors.WakeupException
+import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.errors.{TimeoutException, WakeupException}
 import org.apache.kafka.common.record.RecordBatch
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.HashMap
+import scala.util.{Failure, Success, Try}
 import scala.util.control.ControlThrowable
 
 /**
  * The mirror maker has the following architecture:
- * - There are N mirror maker thread shares one ZookeeperConsumerConnector and each owns
a Kafka stream.
+ * - There are N mirror maker thread, each of which is equipped with a separate KafkaConsumer
instance.
  * - All the mirror maker threads share one producer.
  * - Each mirror maker thread periodically flushes the producer and then commits all offsets.
  *
@@ -70,6 +71,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
   private var offsetCommitIntervalMs = 0
   private var abortOnSendFailure: Boolean = true
   @volatile private var exitingOnSendFailure: Boolean = false
+  private var lastSuccessfulCommitTime = -1L
+  private val time = Time.SYSTEM
 
   // If a message send failed after retries are exhausted. The offset of the messages will
also be removed from
   // the unacked offset list to avoid offset commit being stuck on that offset. In this case,
the offset of that
@@ -268,24 +271,45 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
     consumers.map(consumer => new ConsumerWrapper(consumer, customRebalanceListener, whitelist))
   }
 
-  def commitOffsets(consumerWrapper: ConsumerWrapper) {
+  def commitOffsets(consumerWrapper: ConsumerWrapper): Unit = {
     if (!exitingOnSendFailure) {
-      trace("Committing offsets.")
-      try {
-        consumerWrapper.commit()
-      } catch {
-        case e: WakeupException =>
-          // we only call wakeup() once to close the consumer,
-          // so if we catch it in commit we can safely retry
-          // and re-throw to break the loop
+      var retry = 0
+      var retryNeeded = true
+      while (retryNeeded) {
+        trace("Committing offsets.")
+        try {
           consumerWrapper.commit()
-          throw e
+          lastSuccessfulCommitTime = time.milliseconds
+          retryNeeded = false
+        } catch {
+          case e: WakeupException =>
+            // we only call wakeup() once to close the consumer,
+            // so if we catch it in commit we can safely retry
+            // and re-throw to break the loop
+            commitOffsets(consumerWrapper)
+            throw e
+
+          case _: TimeoutException =>
+            Try(consumerWrapper.consumer.listTopics) match {
+              case Success(visibleTopics) =>
+                consumerWrapper.offsets.retain((tp, _) => visibleTopics.containsKey(tp.topic))
+              case Failure(e) =>
+                warn("Failed to list all authorized topics after committing offsets timed
out: ", e)
+            }
 
-        case _: CommitFailedException =>
-          warn("Failed to commit offsets because the consumer group has rebalanced and assigned
partitions to " +
-            "another instance. If you see this regularly, it could indicate that you need
to either increase " +
-            s"the consumer's ${ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG} or reduce the number
of records " +
-            s"handled on each iteration with ${ConsumerConfig.MAX_POLL_RECORDS_CONFIG}")
+            retry += 1
+            warn("Failed to commit offsets because the offset commit request processing can
not be completed in time. " +
+              s"If you see this regularly, it could indicate that you need to increase the
consumer's ${ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG} " +
+              s"Last successful offset commit timestamp=$lastSuccessfulCommitTime, retry
count=$retry")
+            Thread.sleep(100)
+
+          case _: CommitFailedException =>
+            retryNeeded = false
+            warn("Failed to commit offsets because the consumer group has rebalanced and
assigned partitions to " +
+              "another instance. If you see this regularly, it could indicate that you need
to either increase " +
+              s"the consumer's ${ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG} or reduce the
number of records " +
+              s"handled on each iteration with ${ConsumerConfig.MAX_POLL_RECORDS_CONFIG}")
+        }
       }
     } else {
       info("Exiting on send failure, skip committing offsets.")
@@ -423,14 +447,15 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
   }
 
   // Visible for testing
-  private[tools] class ConsumerWrapper(consumer: Consumer[Array[Byte], Array[Byte]],
+  private[tools] class ConsumerWrapper(private[tools] val consumer: Consumer[Array[Byte],
Array[Byte]],
                                        customRebalanceListener: Option[ConsumerRebalanceListener],
                                        whitelistOpt: Option[String]) {
     val regex = whitelistOpt.getOrElse(throw new IllegalArgumentException("New consumer only
supports whitelist."))
     var recordIter: java.util.Iterator[ConsumerRecord[Array[Byte], Array[Byte]]] = null
 
     // We manually maintain the consumed offsets for historical reasons and it could be simplified
-    private val offsets = new HashMap[TopicPartition, Long]()
+    // Visible for testing
+    private[tools] val offsets = new HashMap[TopicPartition, Long]()
 
     def init() {
       debug("Initiating consumer")
@@ -474,7 +499,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
     }
 
     def commit() {
-      consumer.commitSync(offsets.map { case (tp, offset) =>  (tp, new OffsetAndMetadata(offset,
""))}.asJava)
+      consumer.commitSync(offsets.map { case (tp, offset) => (tp, new OffsetAndMetadata(offset))
}.asJava)
       offsets.clear()
     }
   }
diff --git a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
index 0a17819..7212b3b 100644
--- a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
@@ -24,15 +24,45 @@ import kafka.tools.MirrorMaker.{ConsumerWrapper, MirrorMakerProducer}
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.TimeoutException
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
 import org.junit.Test
+import org.junit.Assert._
 
 class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
 
   override def generateConfigs: Seq[KafkaConfig] =
     TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps(_, new Properties()))
 
+  @Test(expected = classOf[TimeoutException])
+  def testCommitOffsetsThrowTimeoutException(): Unit = {
+    val consumerProps = new Properties
+    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group")
+    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    consumerProps.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "1")
+    val consumer = new KafkaConsumer(consumerProps, new ByteArrayDeserializer, new ByteArrayDeserializer)
+    val mirrorMakerConsumer = new ConsumerWrapper(consumer, None, whitelistOpt = Some("any"))
+    mirrorMakerConsumer.offsets.put(new TopicPartition("test", 0), 0L)
+    mirrorMakerConsumer.commit()
+  }
+
+  @Test
+  def testCommitOffsetsRemoveNonExistentTopics(): Unit = {
+    val consumerProps = new Properties
+    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group")
+    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    consumerProps.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "2000")
+    val consumer = new KafkaConsumer(consumerProps, new ByteArrayDeserializer, new ByteArrayDeserializer)
+    val mirrorMakerConsumer = new ConsumerWrapper(consumer, None, whitelistOpt = Some("any"))
+    mirrorMakerConsumer.offsets.put(new TopicPartition("nonexistent-topic1", 0), 0L)
+    mirrorMakerConsumer.offsets.put(new TopicPartition("nonexistent-topic2", 0), 0L)
+    MirrorMaker.commitOffsets(mirrorMakerConsumer)
+    assertTrue("Offsets for non-existent topics should be removed", mirrorMakerConsumer.offsets.isEmpty)
+  }
+
   @Test
   def testCommaSeparatedRegex(): Unit = {
     val topic = "new-topic"


Mime
View raw message