kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject kafka git commit: KAFKA-1650; (Follow-up patch) to support no data loss in mirror maker; reviewed by Joel Koshy
Date Thu, 25 Dec 2014 06:23:17 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 84521872d -> 10c6dec34


KAFKA-1650; (Follow-up patch) to support no data loss in mirror maker; reviewed by Joel Koshy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/10c6dec3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/10c6dec3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/10c6dec3

Branch: refs/heads/trunk
Commit: 10c6dec34dae8820bf7ce24839c938135f9a9189
Parents: 8452187
Author: Jiangjie Qin <becket.qin@gmail.com>
Authored: Tue Dec 23 17:04:45 2014 -0800
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Tue Dec 23 17:04:45 2014 -0800

----------------------------------------------------------------------
 .../kafka/consumer/ConsumerConnector.scala      |  15 +
 .../consumer/ZookeeperConsumerConnector.scala   |  56 ++--
 .../consumer/ZookeeperConsumerConnector.scala   |   9 +-
 .../main/scala/kafka/tools/MirrorMaker.scala    | 335 +++++++++++--------
 .../scala/kafka/utils/DoublyLinkedList.scala    | 126 +++++++
 .../test/scala/unit/kafka/utils/UtilsTest.scala |  50 +++
 6 files changed, 423 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/10c6dec3/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
index 62c0686..384be74 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
@@ -17,6 +17,9 @@
 
 package kafka.consumer
 
+import kafka.common.{OffsetAndMetadata, TopicAndPartition}
+import kafka.javaapi.consumer.ConsumerRebalanceListener
+
 import scala.collection._
 import kafka.utils.Logging
 import kafka.serializer._
@@ -76,6 +79,18 @@ trait ConsumerConnector {
    * KAFKA-1743: This method added for backward compatibility.
    */
   def commitOffsets
+
+  /**
+   * Commit offsets from an external offsets map.
+   * @param offsetsToCommit the offsets to be committed.
+   */
+  def commitOffsets(offsetsToCommit: immutable.Map[TopicAndPartition, OffsetAndMetadata],
retryOnFailure: Boolean)
+
+  /**
+   * Wire in a consumer rebalance listener to be executed when consumer rebalance occurs.
+   * @param listener The consumer rebalance listener to wire in
+   */
+  def setConsumerRebalanceListener(listener: ConsumerRebalanceListener)
   
   /**
    *  Shut down the connector

http://git-wip-us.apache.org/repos/asf/kafka/blob/10c6dec3/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index e991d21..191a867 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -302,25 +302,28 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   def commitOffsets { commitOffsets(true) }
 
   def commitOffsets(isAutoCommit: Boolean) {
-    commitOffsets(isAutoCommit, null)
+
+    val offsetsToCommit =
+      immutable.Map(topicRegistry.flatMap { case (topic, partitionTopicInfos) =>
+        partitionTopicInfos.map { case (partition, info) =>
+          TopicAndPartition(info.topic, info.partitionId) -> OffsetAndMetadata(info.getConsumeOffset())
+        }
+      }.toSeq: _*)
+
+    commitOffsets(offsetsToCommit, isAutoCommit)
+
   }
 
-  def commitOffsets(isAutoCommit: Boolean,
-                    topicPartitionOffsets: immutable.Map[TopicAndPartition, OffsetAndMetadata])
{
-    var retriesRemaining = 1 + (if (isAutoCommit) config.offsetsCommitMaxRetries else 0)
// no retries for commits from auto-commit
+  def commitOffsets(offsetsToCommit: immutable.Map[TopicAndPartition, OffsetAndMetadata],
isAutoCommit: Boolean) {
+    trace("OffsetMap: %s".format(offsetsToCommit))
+    var retriesRemaining = 1 + (if (isAutoCommit) 0 else config.offsetsCommitMaxRetries)
// no retries for commits from auto-commit
     var done = false
-
     while (!done) {
-      val committed = offsetsChannelLock synchronized { // committed when we receive either
no error codes or only MetadataTooLarge errors
-        val offsetsToCommit = if (topicPartitionOffsets == null) {immutable.Map(topicRegistry.flatMap
{ case (topic, partitionTopicInfos) =>
-          partitionTopicInfos.map { case (partition, info) =>
-            TopicAndPartition(info.topic, info.partitionId) -> OffsetAndMetadata(info.getConsumeOffset())
-          }
-        }.toSeq:_*)} else topicPartitionOffsets
-
+      val committed = offsetsChannelLock synchronized {
+        // committed when we receive either no error codes or only MetadataTooLarge errors
         if (offsetsToCommit.size > 0) {
           if (config.offsetsStorage == "zookeeper") {
-            offsetsToCommit.foreach { case(topicAndPartition, offsetAndMetadata) =>
+            offsetsToCommit.foreach { case (topicAndPartition, offsetAndMetadata) =>
               commitOffsetToZooKeeper(topicAndPartition, offsetAndMetadata.offset)
             }
             true
@@ -334,25 +337,25 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
               trace("Offset commit response: %s.".format(offsetCommitResponse))
 
               val (commitFailed, retryableIfFailed, shouldRefreshCoordinator, errorCount)
= {
-                offsetCommitResponse.commitStatus.foldLeft(false, false, false, 0) { case(folded,
(topicPartition, errorCode)) =>
+                offsetCommitResponse.commitStatus.foldLeft(false, false, false, 0) { case
(folded, (topicPartition, errorCode)) =>
 
                   if (errorCode == ErrorMapping.NoError && config.dualCommitEnabled)
{
-                      val offset = offsetsToCommit(topicPartition).offset
-                      commitOffsetToZooKeeper(topicPartition, offset)
+                    val offset = offsetsToCommit(topicPartition).offset
+                    commitOffsetToZooKeeper(topicPartition, offset)
                   }
 
                   (folded._1 || // update commitFailed
-                     errorCode != ErrorMapping.NoError,
+                    errorCode != ErrorMapping.NoError,
 
-                  folded._2 || // update retryableIfFailed - (only metadata too large is
not retryable)
-                    (errorCode != ErrorMapping.NoError && errorCode != ErrorMapping.OffsetMetadataTooLargeCode),
+                    folded._2 || // update retryableIfFailed - (only metadata too large is
not retryable)
+                      (errorCode != ErrorMapping.NoError && errorCode != ErrorMapping.OffsetMetadataTooLargeCode),
 
-                  folded._3 || // update shouldRefreshCoordinator
-                    errorCode == ErrorMapping.NotCoordinatorForConsumerCode ||
-                    errorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode,
+                    folded._3 || // update shouldRefreshCoordinator
+                      errorCode == ErrorMapping.NotCoordinatorForConsumerCode ||
+                      errorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode,
 
-                  // update error count
-                  folded._4 + (if (errorCode != ErrorMapping.NoError) 1 else 0))
+                    // update error count
+                    folded._4 + (if (errorCode != ErrorMapping.NoError) 1 else 0))
                 }
               }
               debug(errorCount + " errors in offset commit response.")
@@ -381,11 +384,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
         }
       }
 
-      done = if (isShuttingDown.get() && isAutoCommit) { // should not retry indefinitely
if shutting down
+      done = {
         retriesRemaining -= 1
         retriesRemaining == 0 || committed
-      } else
-        true
+      }
 
       if (!done) {
         debug("Retrying offset commit in %d ms".format(config.offsetsChannelBackoffMs))

http://git-wip-us.apache.org/repos/asf/kafka/blob/10c6dec3/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
index 9baad34..bfd8d37 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
@@ -18,9 +18,8 @@ package kafka.javaapi.consumer
 
 import kafka.serializer._
 import kafka.consumer._
-import kafka.common.MessageStreamsExistException
-import scala.collection.mutable
-import scala.collection.JavaConversions
+import kafka.common.{OffsetAndMetadata, TopicAndPartition, MessageStreamsExistException}
+import scala.collection.{immutable, mutable, JavaConversions}
 import java.util.concurrent.atomic.AtomicBoolean
 
 /**
@@ -115,6 +114,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     underlying.commitOffsets(retryOnFailure)
   }
 
+  def commitOffsets(offsetsToCommit: java.util.Map[TopicAndPartition, OffsetAndMetadata],
retryOnFailure: Boolean) {
+    underlying.commitOffsets(offsetsToCommit.asInstanceOf[immutable.Map[TopicAndPartition,
OffsetAndMetadata]], retryOnFailure)
+  }
+
   def setConsumerRebalanceListener(consumerRebalanceListener: ConsumerRebalanceListener)
{
     underlying.setConsumerRebalanceListener(consumerRebalanceListener)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/10c6dec3/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 53cb16c..191542c 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -23,10 +23,11 @@ import kafka.javaapi.consumer.ConsumerRebalanceListener
 import kafka.utils._
 import kafka.consumer._
 import kafka.serializer._
-import kafka.producer.{OldProducer, NewShinyProducer}
+import kafka.producer.{KeyedMessage, ProducerConfig}
 import kafka.metrics.KafkaMetricsGroup
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
-import org.apache.kafka.clients.producer.{RecordMetadata, ProducerRecord}
+import org.apache.kafka.clients.producer.{KafkaProducer, RecordMetadata, ProducerRecord}
+import org.apache.kafka.common.KafkaException
 
 import scala.collection.JavaConversions._
 
@@ -47,10 +48,14 @@ import java.util.concurrent._
  *
  * If new producer is used, the offset will be committed based on the new producer's callback.
An offset map is
  * maintained and updated on each send() callback. A separate offset commit thread will commit
the offset periodically.
- * @note For mirror maker, MaxInFlightRequests of producer should be set to 1 for producer
if the order of the messages
- *       needs to be preserved. Mirror maker also depends on the in-order delivery to guarantee
no data loss.
- *       We are not force it to be 1 because in some use cases throughput might be important
whereas out of order or
- *       minor data loss is acceptable.
+ * @note For mirror maker, the following settings are required to make sure there is no data
loss:
+ *       1. use new producer with following settings
+ *            acks=all
+ *            retries=max integer
+ *            block.on.buffer.full=true
+ *       2. Consumer Settings
+ *            auto.commit.enable=false
+ *       If --no.data.loss flag is set in option, then those settings are automatically applied.
  */
 object MirrorMaker extends Logging with KafkaMetricsGroup {
 
@@ -58,23 +63,39 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
   private var consumerThreads: Seq[ConsumerThread] = null
   private var producerThreads: Seq[ProducerThread] = null
   private val isShuttingdown: AtomicBoolean = new AtomicBoolean(false)
-  private var offsetCommitThread: OffsetCommitThread = null
+  private val scheduler: KafkaScheduler = new KafkaScheduler(threads = 1)
 
-  private val valueFactory = (k: TopicAndPartition) => new Pool[Int, Long]
-  private val topicPartitionOffsetMap: Pool[TopicAndPartition, Pool[Int, Long]] =
-      new Pool[TopicAndPartition, Pool[Int,Long]](Some(valueFactory))
+  private val unackedOffsetsMap: Pool[TopicAndPartition, UnackedOffsets] =
+      new Pool[TopicAndPartition, UnackedOffsets](valueFactory = Some((k: TopicAndPartition)
=> new UnackedOffsets))
   // Track the messages unacked for consumer rebalance
-  private var numMessageUnacked: AtomicInteger = new AtomicInteger(0)
-  private var consumerRebalanceListener: MirrorMakerConsumerRebalanceListener = null
+  private var numUnackedMessages: AtomicInteger = new AtomicInteger(0)
+  private var numSkippedUnackedMessages: AtomicInteger = new AtomicInteger(0)
+  private var consumerRebalanceListener: ConsumerRebalanceListener = null
   // This is to indicate whether the rebalance is going on so the producer callback knows
if
-  // the rebalance latch needs to be pulled.
-  private var inRebalance: AtomicBoolean = new AtomicBoolean(false)
+  // the flag indicates internal consumer rebalance callback is waiting for all the messages
sent to be acked.
+  private var waitingForMessageAcks: Boolean = false
 
   private val shutdownMessage : MirrorMakerRecord = new MirrorMakerRecord("shutdown", 0,
0, null, "shutdown".getBytes)
 
-  newGauge("MirrorMaker-Unacked-Messages",
+  newGauge("MirrorMaker-NumUnackedMessages",
     new Gauge[Int] {
-      def value = numMessageUnacked.get()
+      def value = numUnackedMessages.get()
+    })
+
+  // The number of unacked offsets in the unackedOffsetsMap
+  newGauge("MirrorMaker-UnackedOffsetListsSize",
+    new Gauge[Int] {
+      def value = unackedOffsetsMap.iterator.map{
+        case(_, unackedOffsets) => unackedOffsets.size
+      }.sum
+    })
+
+  // If a message send failed after retries are exhausted. The offset of the messages will
also be removed from
+  // the unacked offset list to avoid offset commit being stuck on that offset. In this case,
the offset of that
+  // message was not really acked, but was skipped. This metric records the number of skipped
offsets.
+  newGauge("MirrorMaker-NumSkippedOffsets",
+    new Gauge[Int] {
+      def value = numSkippedUnackedMessages.get()
     })
 
   def main(args: Array[String]) {
@@ -98,6 +119,9 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
     val useNewProducerOpt = parser.accepts("new.producer",
       "Use the new producer implementation.")
 
+    val noDataLossOpt = parser.accepts("no.data.loss",
+      "Configure the mirror maker to have no data loss.")
+
     val numProducersOpt = parser.accepts("num.producers",
       "Number of producer instances")
       .withRequiredArg()
@@ -145,6 +169,12 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
       .ofType(classOf[java.lang.Integer])
       .defaultsTo(60000)
 
+    val consumerRebalanceListenerOpt = parser.accepts("consumer.rebalance.listener",
+      "The consumer rebalance listener to use for mirror maker consumer.")
+      .withRequiredArg()
+      .describedAs("A custom rebalance listener of type ConsumerRebalanceListener")
+      .ofType(classOf[String])
+
     val helpOpt = parser.accepts("help", "Print this message.")
     
     if(args.length == 0)
@@ -170,7 +200,11 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
     val offsetCommitIntervalMs = options.valueOf(offsetCommitIntervalMsOpt).intValue()
 
     // create consumer connector
-    val consumerConfigProps = Utils.loadProps(options.valuesOf(consumerConfigOpt).head)
+    val consumerConfigProps = Utils.loadProps(options.valueOf(consumerConfigOpt))
+    val noDataLoss = options.has(noDataLossOpt)
+    // disable consumer auto commit because offset will be committed by offset commit thread.
+    if (noDataLoss)
+      consumerConfigProps.setProperty("auto.commit.enable","false")
     val consumerConfig = new ConsumerConfig(consumerConfigProps)
     connector = new ZookeeperConsumerConnector(consumerConfig)
 
@@ -178,20 +212,30 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
     val mirrorDataChannel = new DataChannel(bufferSize, bufferByteSize, numInputs = numStreams,
numOutputs = numProducers)
 
     // set consumer rebalance listener
-    // Customized consumer rebalance listener should extend MirrorMakerConsumerRebalanceListener
-    // and take datachannel as argument.
-    val customRebalanceListenerClass = consumerConfigProps.getProperty("consumer.rebalance.listener")
-    consumerRebalanceListener = {
-      if (customRebalanceListenerClass == null) {
-        new MirrorMakerConsumerRebalanceListener(mirrorDataChannel)
-      } else
-        Utils.createObject[MirrorMakerConsumerRebalanceListener](customRebalanceListenerClass,
mirrorDataChannel)
+    // custom rebalance listener will be invoked after internal listener finishes its work.
+    val customRebalanceListenerClass = options.valueOf(consumerRebalanceListenerOpt)
+    val customRebalanceListener = {
+      if (customRebalanceListenerClass != null)
+        Utils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass)
+      else
+        null
     }
+    consumerRebalanceListener = new InternalRebalanceListener(mirrorDataChannel, Some(customRebalanceListener))
     connector.setConsumerRebalanceListener(consumerRebalanceListener)
 
     // create producer threads
-    val useNewProducer = options.has(useNewProducerOpt)
     val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt))
+    val useNewProducer = {
+      // Override producer settings if no.data.loss is set
+      if (noDataLoss) {
+        producerProps.setProperty("retries",Int.MaxValue.toString)
+        producerProps.setProperty("block.on.buffer.full", "true")
+        producerProps.setProperty("acks","all")
+        true
+      } else {
+        options.has(useNewProducerOpt)
+      }
+    }
     val clientId = producerProps.getProperty("client.id", "")
     producerThreads = (0 until numProducers).map(i => {
       producerProps.setProperty("client.id", clientId + "-" + i)
@@ -203,17 +247,17 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
       new ProducerThread(mirrorDataChannel, producer, i)
     })
 
-    // create offset commit thread
-    if (useNewProducer) {
+    // start offset commit thread
+    if (noDataLoss) {
       /**
-       * The offset commit thread periodically commit consumed offsets to the source cluster.
With the new producer,
+       * The offset commit thread periodically commit consumed offsets. With the new producer,
        * the offsets are updated upon the returned future metadata of the send() call; with
the old producer,
        * the offsets are updated upon the consumer's iterator advances. By doing this, it
is guaranteed no data
        * loss even when mirror maker is uncleanly shutdown with the new producer, while with
the old producer
        * messages inside the data channel could be lost upon mirror maker unclean shutdown.
        */
-      offsetCommitThread = new OffsetCommitThread(offsetCommitIntervalMs)
-      offsetCommitThread.start()
+      scheduler.startup()
+      scheduler.schedule("offset-commit", commitOffsets, 0, offsetCommitIntervalMs, TimeUnit.MILLISECONDS)
     }
 
     // create consumer threads
@@ -252,7 +296,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
   def cleanShutdown() {
     if (isShuttingdown.compareAndSet(false, true)) {
       info("Start clean shutdown.")
-      // Consumer threads will exit when isCleanShutdown is set.
+      // Shutdown consumer threads.
       info("Shutting down consumer threads.")
       if (consumerThreads != null) {
         consumerThreads.foreach(_.shutdown())
@@ -265,12 +309,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
         producerThreads.foreach(_.awaitShutdown())
       }
       // offset commit thread should only be shutdown after producer threads are shutdown,
so we don't lose offsets.
-      info("Shutting down offset commit thread.")
-      if (offsetCommitThread != null) {
-        offsetCommitThread.shutdown()
-        offsetCommitThread.awaitShutdown()
-      }
-      // connector can only be shutdown after offsets are committed.
+      scheduler.shutdown()
+      swallow(commitOffsets())
+
+      // connector should only be shutdown after offsets are committed.
       info("Shutting down consumer connectors.")
       if (connector != null)
         connector.shutdown()
@@ -457,155 +499,134 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
     }
   }
 
-  class OffsetCommitThread(commitIntervalMs: Int) extends Thread with Logging with KafkaMetricsGroup
{
-    private val threadName = "mirrormaker-offset-commit-thread"
-    private val shutdownComplete: CountDownLatch = new CountDownLatch(1)
-    this.logIdent = "[%s]".format(threadName)
-    var shutdownFlag: Boolean = false
-    var commitCounter: Int = 0
-
-    this.setName(threadName)
-
-    newGauge("MirrorMaker-Offset-Commit-Counter",
-      new Gauge[Int] {
-        def value = commitCounter
-      })
-
-    /**
-     * Use the connector to commit all the offsets.
-     */
-    override def run() {
-      info("Starting mirror maker offset commit thread")
-      try {
-        while (!shutdownFlag) {
-          Thread.sleep(commitIntervalMs)
-          commitOffset()
-        }
-      } catch {
-        case t: Throwable => fatal("Exits due to", t)
-      } finally {
-        swallow(commitOffset())
-        shutdownComplete.countDown()
-        info("Offset commit thread exited")
-        if (!isShuttingdown.get()) {
-          fatal("Offset commit thread exited abnormally, stopping the whole mirror maker.")
-          System.exit(-1)
-        }
-      }
-    }
-
-    def commitOffset() {
-      val offsetsToCommit = collection.immutable.Map(topicPartitionOffsetMap.map {
-        case (topicPartition, partitionOffsetMap) =>
-        topicPartition -> OffsetAndMetadata(getOffsetToCommit(partitionOffsetMap), null)
+  private def commitOffsets()  {
+    try {
+      info("Committing offsets")
+      val offsetsToCommit = collection.immutable.Map(unackedOffsetsMap.map {
+        case (topicPartition, unackedOffsets) =>
+          topicPartition -> OffsetAndMetadata(unackedOffsets.getOffsetToCommit, null)
       }.toSeq: _*)
-      trace("committing offset: %s".format(offsetsToCommit))
       if (connector == null) {
         warn("No consumer connector available to commit offset.")
       } else {
-        connector.commitOffsets(
-          isAutoCommit = false,
-          topicPartitionOffsets = offsetsToCommit
-        )
-        commitCounter += 1
-      }
-    }
-
-    private def getOffsetToCommit(offsetsMap: Pool[Int, Long]): Long = {
-      val offsets = offsetsMap.map(_._2).toSeq.sorted
-      val iter = offsets.iterator
-      var offsetToCommit = iter.next()
-      while (iter.hasNext && offsetToCommit + 1 == iter.next())
-        offsetToCommit += 1
-      // The committed offset will be the first offset of un-consumed message, hence we need
to increment by one.
-      offsetToCommit + 1
-    }
-
-    def shutdown() {
-      shutdownFlag = true
-    }
-
-    def awaitShutdown() {
-      try {
-        shutdownComplete.await()
-        info("Offset commit thread shutdown complete")
-      } catch {
-        case ie: InterruptedException => {
-          warn("Shutdown of the offset commit thread interrupted")
-        }
+        connector.commitOffsets(offsetsToCommit, isAutoCommit = false)
       }
+    } catch {
+      case e: OutOfMemoryError =>
+        fatal("Shutting down mirror maker due to error when committing offsets.", e)
+        System.exit(-1)
+      case t: Throwable =>
+        warn("Offsets commit failed due to ", t)
     }
   }
 
   private[kafka] trait MirrorMakerBaseProducer {
-    def send(topicPartition: TopicAndPartition, offset: Long, key: Array[Byte], value: Array[Byte])
+    def send(sourceTopicPartition: TopicAndPartition, sourceOffset: Long, key: Array[Byte],
value: Array[Byte])
     def close()
   }
 
-  private class MirrorMakerNewProducer (val producerProps: Properties)
-      extends NewShinyProducer(producerProps) with MirrorMakerBaseProducer {
+  private class MirrorMakerNewProducer (val producerProps: Properties) extends MirrorMakerBaseProducer
{
 
-    override def send(topicPartition: TopicAndPartition, offset: Long, key: Array[Byte],
value: Array[Byte]) {
-      val record = new ProducerRecord[Array[Byte],Array[Byte]](topicPartition.topic, key,
value)
+    val sync = producerProps.getProperty("producer.type", "async").equals("sync")
+
+    val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps)
+
+    override def send(sourceTopicPartition: TopicAndPartition, sourceOffset: Long, key: Array[Byte],
value: Array[Byte]) {
+      val record = new ProducerRecord(sourceTopicPartition.topic, key, value)
       if(sync) {
-        topicPartitionOffsetMap.getAndMaybePut(topicPartition).put(this.producer.send(record).get().partition(),
offset)
+        this.producer.send(record).get()
+        unackedOffsetsMap.getAndMaybePut(sourceTopicPartition).maybeUpdateMaxOffsetSeen(sourceOffset)
       } else {
-        this.producer.send(record,
-          new MirrorMakerProducerCallback(topicPartition, offset, key, value))
-        numMessageUnacked.incrementAndGet()
+
+        val unackedOffsets = unackedOffsetsMap.getAndMaybePut(sourceTopicPartition)
+        // synchronize to ensure that addOffset precedes removeOffset
+        unackedOffsets synchronized {
+          val unackedOffset = new UnackedOffset(sourceOffset)
+          this.producer.send(record,
+            new MirrorMakerProducerCallback(sourceTopicPartition, unackedOffset, key, value))
+          // add offset to unackedOffsets
+          unackedOffsets.addOffset(unackedOffset)
+          numUnackedMessages.incrementAndGet()
+        }
       }
     }
+
+    override def close() {
+      this.producer.close()
+    }
   }
 
-  private class MirrorMakerOldProducer (val producerProps: Properties)
-      extends OldProducer(producerProps) with MirrorMakerBaseProducer {
+  private class MirrorMakerOldProducer (val producerProps: Properties) extends MirrorMakerBaseProducer
{
+
+    // default to byte array partitioner
+    if (producerProps.getProperty("partitioner.class") == null)
+      producerProps.setProperty("partitioner.class", classOf[kafka.producer.ByteArrayPartitioner].getName)
+    val producer = new kafka.producer.Producer[Array[Byte], Array[Byte]](new ProducerConfig(producerProps))
 
     override def send(topicPartition: TopicAndPartition, offset: Long, key: Array[Byte],
value: Array[Byte]) {
-      super.send(topicPartition.topic, key, value)
+      this.producer.send(new KeyedMessage[Array[Byte], Array[Byte]](topicPartition.topic,
key, value))
     }
 
     override def close() {
-      super.close()
+      this.producer.close()
     }
   }
 
   private class MirrorMakerProducerCallback (val topicPartition: TopicAndPartition,
-                                             val offset: Long,
+                                             val offset: UnackedOffset,
                                              val key: Array[Byte],
                                              val value: Array[Byte])
     extends ErrorLoggingCallback(topicPartition.topic, key, value, false) {
 
     override def onCompletion(metadata: RecordMetadata, exception: Exception) {
       if (exception != null) {
-        // Use default call back to log error
+        // Use default call back to log error. This means the max retries of producer has
reached and message
+        // still could not be sent. In this case we have to remove the offsets from list
to let the mirror maker
+        // move on. The message failed to be sent will be lost in target cluster.
+        warn("Not be able to send message, offset of "+ topicPartition + " will not advance.
Total number" +
+          "of skipped unacked messages is" + numSkippedUnackedMessages.incrementAndGet())
         super.onCompletion(metadata, exception)
       } else {
-        trace("updating offset:[%s] -> %d".format(topicPartition, offset))
-        topicPartitionOffsetMap.getAndMaybePut(topicPartition).put(metadata.partition(),
offset)
+        trace("Updating offset for %s to %d".format(topicPartition, offset))
       }
+      // remove the offset from the unackedOffsets
+      val unackedOffsets = unackedOffsetsMap.get(topicPartition)
+      unackedOffsets.removeOffset(offset)
       // Notify the rebalance callback only when all the messages handed to producer are
acked.
-      // There is a very slight chance that 1 message is held by producer thread and not
handed to producer.
+      // There is a very slight chance that one message is held by producer thread and not
handed to producer.
       // That message might have duplicate. We are not handling that here.
-      if (numMessageUnacked.decrementAndGet() == 0 && inRebalance.get()) {
-        inRebalance synchronized {inRebalance.notify()}
+      numUnackedMessages synchronized {
+        if (numUnackedMessages.decrementAndGet() == 0 && waitingForMessageAcks) {
+            numUnackedMessages.notify()
+        }
       }
     }
   }
 
-  class MirrorMakerConsumerRebalanceListener (dataChannel: DataChannel) extends ConsumerRebalanceListener
{
+  class InternalRebalanceListener (dataChannel: DataChannel, customRebalanceListener: Option[ConsumerRebalanceListener])
+      extends ConsumerRebalanceListener {
 
     override def beforeReleasingPartitions(partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]])
{
       info("Clearing data channel.")
       dataChannel.clear()
       info("Waiting until all the messages are acked.")
-      inRebalance synchronized {
-        inRebalance.set(true)
-        while (numMessageUnacked.get() > 0)
-          inRebalance.wait()
+      numUnackedMessages synchronized {
+        waitingForMessageAcks = true
+        while (numUnackedMessages.get() > 0) {
+          try {
+            numUnackedMessages.wait()
+          } catch {
+            case e: InterruptedException => info("Ignoring interrupt while waiting.")
+          }
+        }
+        waitingForMessageAcks = false
       }
       info("Committing offsets.")
-      offsetCommitThread.commitOffset()
-      inRebalance.set(true)
+      commitOffsets()
+
+      // invoke custom consumer rebalance listener
+      if (customRebalanceListener.isDefined)
+        customRebalanceListener.get.beforeReleasingPartitions(partitionOwnership)
     }
   }
 
@@ -617,5 +638,43 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
     def size = value.length + {if (key == null) 0 else key.length}
   }
 
-}
+  private class UnackedOffset(offset: Long) extends DoublyLinkedListNode[Long](offset) {
+
+  }
+
+  private class UnackedOffsets {
+    val offsetList = new DoublyLinkedList[Long]
+    var maxOffsetSeen: Long = -1L
+
+    def maybeUpdateMaxOffsetSeen(offset: Long) {
+      this synchronized {
+        maxOffsetSeen = math.max(maxOffsetSeen, offset)
+      }
+    }
 
+    def addOffset(offset: DoublyLinkedListNode[Long]) {
+      this synchronized {
+        offsetList.add(offset)
+        maybeUpdateMaxOffsetSeen(offset.element)
+      }
+    }
+
+    def removeOffset(offset: DoublyLinkedListNode[Long]) {
+      offsetList.remove(offset)
+    }
+
+    def getOffsetToCommit: Long = {
+      this synchronized {
+        val smallestUnackedOffset = offsetList.peek()
+        if (smallestUnackedOffset == null)
+          // list is empty, commit maxOffsetSeen + 1
+          maxOffsetSeen + 1
+        else
+          // commit the smallest unacked offset
+          smallestUnackedOffset.element
+      }
+    }
+
+    def size: Int = offsetList.size
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/10c6dec3/core/src/main/scala/kafka/utils/DoublyLinkedList.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/DoublyLinkedList.scala b/core/src/main/scala/kafka/utils/DoublyLinkedList.scala
new file mode 100644
index 0000000..e637ef3
--- /dev/null
+++ b/core/src/main/scala/kafka/utils/DoublyLinkedList.scala
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+/**
+ * Simple doubly LinkedList node
+ * @param element The element
+ * @tparam T The type of element
+ */
+class DoublyLinkedListNode[T] (val element: T) {
+  var prev: DoublyLinkedListNode[T] = null
+  var next: DoublyLinkedListNode[T] = null
+}
+
+/**
+ * A simple doubly linked list util to allow O(1) remove.
+ * @tparam T type of element in nodes
+ */
+@threadsafe
+class DoublyLinkedList[T] {
+  private var head: DoublyLinkedListNode[T] = null
+  private var tail: DoublyLinkedListNode[T] = null
+  @volatile private var listSize: Int = 0
+
+  /**
+   * Add offset to the tail of the list
+   * @param node the node to be added to the tail of the list
+   */
+  def add (node: DoublyLinkedListNode[T]) {
+    this synchronized {
+      if (head == null) {
+        // empty list
+        head = node
+        tail = node
+        node.prev = null
+        node.next = null
+      } else {
+        // add to tail
+        tail.next = node
+        node.next = null
+        node.prev = tail
+        tail = node
+      }
+      listSize += 1
+    }
+  }
+
+  /**
+   * Remove a node from the list. The list will not check if the node is really in the list.
+   * @param node the node to be removed from the list
+   */
+  def remove (node: DoublyLinkedListNode[T]) {
+    this synchronized {
+      if (node ne head)
+        node.prev.next = node.next
+      else
+        head = node.next
+
+      if (node ne tail)
+        node.next.prev = node.prev
+      else
+        tail = node.prev
+
+      node.prev = null
+      node.next = null
+
+      listSize -= 1
+    }
+  }
+
+  /**
+   * Remove the first node in the list and return it if the list is not empty.
+   * @return The first node in the list if the list is not empty. Return Null if the list
is empty.
+   */
+  def remove(): DoublyLinkedListNode[T] = {
+    this synchronized {
+      if (head != null) {
+        val node = head
+        remove(head)
+        node
+      } else {
+        null
+      }
+    }
+  }
+
+  /**
+   * Get the first node in the list without removing it.
+   * @return the first node in the list.
+   */
+  def peek(): DoublyLinkedListNode[T] = head
+
+  def size: Int = listSize
+
+  def iterator: Iterator[DoublyLinkedListNode[T]] = {
+    new IteratorTemplate[DoublyLinkedListNode[T]] {
+      var current = head
+      override protected def makeNext(): DoublyLinkedListNode[T] = {
+        this synchronized {
+          if (current != null) {
+            val nextNode = current
+            current = current.next
+            nextNode
+          } else {
+            allDone()
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/10c6dec3/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
index 066553c..8c3797a 100644
--- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
@@ -150,4 +150,54 @@ class UtilsTest extends JUnitSuite {
     assertEquals(2, result)
     assertFalse("Should be unlocked", lock.isLocked)
   }
+
+  @Test
+  def testDoublyLinkedList() {
+    val list = new DoublyLinkedList[Int]
+
+    // test remove from a single-entry list.
+    list.add(new DoublyLinkedListNode[Int](0))
+    list.remove()
+    assert(list.size == 0)
+    assert(list.peek() == null)
+
+    // test add
+    for (i <- 0 to 2) {
+      list.add(new DoublyLinkedListNode[Int](i))
+    }
+    val toBeRemoved1 = new DoublyLinkedListNode[Int](3)
+    list.add(toBeRemoved1)
+    for (i <- 4 to 6) {
+      list.add(new DoublyLinkedListNode[Int](i))
+    }
+    val toBeRemoved2 = new DoublyLinkedListNode[Int](7)
+    list.add(toBeRemoved2)
+
+    // test iterator
+    val iter = list.iterator
+    for (i <- 0 to 7) {
+      assert(iter.hasNext)
+      assert(iter.next().element == i)
+    }
+    assert(!iter.hasNext)
+
+    // remove from head
+    list.remove()
+    assert(list.peek().element == 1)
+    // remove from middle
+    list.remove(toBeRemoved1)
+    // remove from tail
+    list.remove(toBeRemoved2)
+
+    // List = [1,2,4,5,6]
+    val iter2 = list.iterator
+    for (i <- Array[Int](1,2,4,5,6)) {
+      assert(iter2.hasNext)
+      assert(iter2.next().element == i)
+    }
+
+    // test size
+    assert(list.size == 5)
+  }
+
 }


Mime
View raw message