kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-1997; Refactor MirrorMaker based on KIP-3; reviewed by Joel Koshy and Guozhang Wang
Date Fri, 13 Mar 2015 22:09:08 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk b7439c808 -> c41c7b40b


KAFKA-1997; Refactor MirrorMaker based on KIP-3; reviewed by Joel Koshy and Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c41c7b40
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c41c7b40
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c41c7b40

Branch: refs/heads/trunk
Commit: c41c7b40b63ecd668c727a897f29e276a1c5adf7
Parents: b7439c8
Author: Jiangjie Qin <becket.qin@gmail.com>
Authored: Fri Mar 13 15:06:10 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Mar 13 15:07:48 2015 -0700

----------------------------------------------------------------------
 .../producer/internals/RecordAccumulator.java   |  28 +-
 .../kafka/consumer/PartitionAssignor.scala      |  38 +-
 .../consumer/ZookeeperConsumerConnector.scala   |  45 +-
 .../consumer/ConsumerRebalanceListener.java     |  12 +
 .../main/scala/kafka/tools/MirrorMaker.scala    | 691 ++++++-------------
 .../kafka/consumer/PartitionAssignorTest.scala  |   2 +-
 .../ZookeeperConsumerConnectorTest.scala        |  63 +-
 7 files changed, 358 insertions(+), 521 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c41c7b40/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index d5c79e2..88b4e4f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -12,27 +12,14 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
-import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MemoryRecords;
@@ -44,6 +31,19 @@ import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
 /**
  * This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords}
  * instances to be sent to the server.

http://git-wip-us.apache.org/repos/asf/kafka/blob/c41c7b40/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
index e6ff768..bc2e5b4 100644
--- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
@@ -19,16 +19,18 @@ package kafka.consumer
 
 import org.I0Itec.zkclient.ZkClient
 import kafka.common.TopicAndPartition
-import kafka.utils.{Utils, ZkUtils, Logging}
+import kafka.utils.{Pool, Utils, ZkUtils, Logging}
+
+import scala.collection.mutable
 
 trait PartitionAssignor {
 
   /**
    * Assigns partitions to consumer instances in a group.
-   * @return An assignment map of partition to consumer thread. This only includes assignments for threads that belong
-   *         to the given assignment-context's consumer.
+   * @return An assignment map of partition to this consumer group. This includes assignments for threads that belong
+   *         to the same consumer group.
    */
-  def assign(ctx: AssignmentContext): scala.collection.Map[TopicAndPartition, ConsumerThreadId]
+  def assign(ctx: AssignmentContext): Pool[String, mutable.Map[TopicAndPartition, ConsumerThreadId]]
 
 }
 
@@ -69,7 +71,10 @@ class AssignmentContext(group: String, val consumerId: String, excludeInternalTo
 class RoundRobinAssignor() extends PartitionAssignor with Logging {
 
   def assign(ctx: AssignmentContext) = {
-    val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]()
+
+    val valueFactory = (topic: String) => new mutable.HashMap[TopicAndPartition, ConsumerThreadId]
+    val partitionAssignment =
+      new Pool[String, mutable.Map[TopicAndPartition, ConsumerThreadId]](Some(valueFactory))
 
     if (ctx.consumersForTopic.size > 0) {
       // check conditions (a) and (b)
@@ -102,12 +107,12 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging {
 
       allTopicPartitions.foreach(topicPartition => {
         val threadId = threadAssignor.next()
-        if (threadId.consumer == ctx.consumerId)
-          partitionOwnershipDecision += (topicPartition -> threadId)
+        // record the partition ownership decision
+        val assignmentForConsumer = partitionAssignment.getAndMaybePut(threadId.consumer)
+        assignmentForConsumer += (topicPartition -> threadId)
       })
     }
-
-    partitionOwnershipDecision
+    partitionAssignment
   }
 }
 
@@ -123,9 +128,10 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging {
 class RangeAssignor() extends PartitionAssignor with Logging {
 
   def assign(ctx: AssignmentContext) = {
-    val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]()
-
-    for ((topic, consumerThreadIdSet) <- ctx.myTopicThreadIds) {
+    val valueFactory = (topic: String) => new mutable.HashMap[TopicAndPartition, ConsumerThreadId]
+    val partitionAssignment =
+      new Pool[String, mutable.Map[TopicAndPartition, ConsumerThreadId]](Some(valueFactory))
+    for (topic <- ctx.myTopicThreadIds.keySet) {
       val curConsumers = ctx.consumersForTopic(topic)
       val curPartitions: Seq[Int] = ctx.partitionsForTopic(topic)
 
@@ -135,7 +141,7 @@ class RangeAssignor() extends PartitionAssignor with Logging {
       info("Consumer " + ctx.consumerId + " rebalancing the following partitions: " + curPartitions +
         " for topic " + topic + " with consumers: " + curConsumers)
 
-      for (consumerThreadId <- consumerThreadIdSet) {
+      for (consumerThreadId <- curConsumers) {
         val myConsumerPosition = curConsumers.indexOf(consumerThreadId)
         assert(myConsumerPosition >= 0)
         val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
@@ -152,12 +158,12 @@ class RangeAssignor() extends PartitionAssignor with Logging {
             val partition = curPartitions(i)
             info(consumerThreadId + " attempting to claim partition " + partition)
             // record the partition ownership decision
-            partitionOwnershipDecision += (TopicAndPartition(topic, partition) -> consumerThreadId)
+            val assignmentForConsumer = partitionAssignment.getAndMaybePut(consumerThreadId.consumer)
+            assignmentForConsumer += (TopicAndPartition(topic, partition) -> consumerThreadId)
           }
         }
       }
     }
-
-    partitionOwnershipDecision
+    partitionAssignment
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c41c7b40/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 5487259..cca815a 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -670,7 +670,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
          */
         closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap)
         if (consumerRebalanceListener != null) {
-          info("Calling beforeReleasingPartitions() from rebalance listener.")
+          info("Invoking rebalance listener before relasing partition ownerships.")
           consumerRebalanceListener.beforeReleasingPartitions(
             if (topicRegistry.size == 0)
               new java.util.HashMap[String, java.util.Set[java.lang.Integer]]
@@ -682,12 +682,15 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
         }
         releasePartitionOwnership(topicRegistry)
         val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient)
-        val partitionOwnershipDecision = partitionAssignor.assign(assignmentContext)
+        val globalPartitionAssignment = partitionAssignor.assign(assignmentContext)
+        val partitionAssignment = Option(globalPartitionAssignment.get(assignmentContext.consumerId)).getOrElse(
+          mutable.HashMap.empty[TopicAndPartition, ConsumerThreadId]
+        )
         val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]](
           valueFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo]))
 
         // fetch current offsets for all topic-partitions
-        val topicPartitions = partitionOwnershipDecision.keySet.toSeq
+        val topicPartitions = partitionAssignment.keySet.toSeq
 
         val offsetFetchResponseOpt = fetchOffsets(topicPartitions)
 
@@ -698,7 +701,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
           topicPartitions.foreach(topicAndPartition => {
             val (topic, partition) = topicAndPartition.asTuple
             val offset = offsetFetchResponse.requestInfo(topicAndPartition).offset
-            val threadId = partitionOwnershipDecision(topicAndPartition)
+            val threadId = partitionAssignment(topicAndPartition)
             addPartitionTopicInfo(currentTopicRegistry, partition, topic, offset, threadId)
           })
 
@@ -706,10 +709,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
            * move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt
            * A rebalancing attempt is completed successfully only after the fetchers have been started correctly
            */
-          if(reflectPartitionOwnershipDecision(partitionOwnershipDecision)) {
-            allTopicsOwnedPartitionsCount = partitionOwnershipDecision.size
+          if(reflectPartitionOwnershipDecision(partitionAssignment)) {
+            allTopicsOwnedPartitionsCount = partitionAssignment.size
 
-            partitionOwnershipDecision.view.groupBy { case(topicPartition, consumerThreadId) => topicPartition.topic }
+            partitionAssignment.view.groupBy { case(topicPartition, consumerThreadId) => topicPartition.topic }
                                       .foreach { case (topic, partitionThreadPairs) =>
               newGauge("OwnedPartitionsCount",
                 new Gauge[Int] {
@@ -719,6 +722,30 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
             }
 
             topicRegistry = currentTopicRegistry
+            // Invoke beforeStartingFetchers callback if the consumerRebalanceListener is set.
+            if (consumerRebalanceListener != null) {
+              info("Invoking rebalance listener before starting fetchers.")
+
+              // Partition assignor returns the global partition assignment organized as a map of [TopicPartition, ThreadId]
+              // per consumer, and we need to re-organize it to a map of [Partition, ThreadId] per topic before passing
+              // to the rebalance callback.
+              val partitionAssginmentGroupByTopic = globalPartitionAssignment.values.flatten.groupBy[String] {
+                case (topicPartition, _) => topicPartition.topic
+              }
+              val partitionAssigmentMapForCallback = partitionAssginmentGroupByTopic.map({
+                case (topic, partitionOwnerShips) =>
+                  val partitionOwnershipForTopicScalaMap = partitionOwnerShips.map({
+                    case (topicAndPartition, consumerThreadId) =>
+                      topicAndPartition.partition -> consumerThreadId
+                  })
+                  topic -> mapAsJavaMap(collection.mutable.Map(partitionOwnershipForTopicScalaMap.toSeq:_*))
+                    .asInstanceOf[java.util.Map[java.lang.Integer, ConsumerThreadId]]
+              })
+              consumerRebalanceListener.beforeStartingFetchers(
+                consumerIdString,
+                mapAsJavaMap(collection.mutable.Map(partitionAssigmentMapForCallback.toSeq:_*))
+              )
+            }
             updateFetcher(cluster)
             true
           } else {
@@ -792,9 +819,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       }
     }
 
-    private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[TopicAndPartition, ConsumerThreadId]): Boolean = {
+    private def reflectPartitionOwnershipDecision(partitionAssignment: Map[TopicAndPartition, ConsumerThreadId]): Boolean = {
       var successfullyOwnedPartitions : List[(String, Int)] = Nil
-      val partitionOwnershipSuccessful = partitionOwnershipDecision.map { partitionOwner =>
+      val partitionOwnershipSuccessful = partitionAssignment.map { partitionOwner =>
         val topic = partitionOwner._1.topic
         val partition = partitionOwner._1.partition
         val consumerThreadId = partitionOwner._2

http://git-wip-us.apache.org/repos/asf/kafka/blob/c41c7b40/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java b/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java
index 7f45a90..288ebd9 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java
+++ b/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java
@@ -17,6 +17,8 @@
 
 package kafka.javaapi.consumer;
 
+
+import kafka.consumer.ConsumerThreadId;
 import java.util.Map;
 import java.util.Set;
 
@@ -33,7 +35,17 @@ public interface ConsumerRebalanceListener {
      * This listener is initially added to prevent duplicate messages on consumer rebalance
      * in mirror maker, where offset auto commit is disabled to prevent data loss. It could
      * also be used in more general cases.
+     * @param partitionOwnership The partition this consumer currently owns.
      */
     public void beforeReleasingPartitions(Map<String, Set<Integer>> partitionOwnership);
 
+    /**
+     * This method is called after the new partition assignment is finished but before fetcher
+     * threads start. A map of new global partition assignment is passed in as parameter.
+     * @param consumerId The consumer Id string of the consumer invoking this callback.
+     * @param globalPartitionAssignment A Map[topic, Map[Partition, ConsumerThreadId]]. It is the global partition
+     *                                  assignment of this consumer group.
+     */
+    public void beforeStartingFetchers(String consumerId, Map<String, Map<Integer, ConsumerThreadId>> globalPartitionAssignment);
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c41c7b40/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index bafa379..87b925c 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -17,87 +17,64 @@
 
 package kafka.tools
 
-import com.yammer.metrics.core._
-import kafka.common.{TopicAndPartition, OffsetAndMetadata}
+import java.util
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+import java.util.{Collections, Properties}
+
+import scala.collection.JavaConversions._
+
+import com.yammer.metrics.core.Gauge
+import joptsimple.OptionParser
+import kafka.consumer.{Blacklist, ConsumerConfig, ConsumerThreadId, ConsumerTimeoutException, KafkaStream, Whitelist, ZookeeperConsumerConnector}
 import kafka.javaapi.consumer.ConsumerRebalanceListener
-import kafka.utils._
-import kafka.consumer._
-import kafka.serializer._
-import kafka.producer.{KeyedMessage, ProducerConfig}
+import kafka.message.MessageAndMetadata
 import kafka.metrics.KafkaMetricsGroup
+import kafka.serializer.DefaultDecoder
+import kafka.utils.{CommandLineUtils, Logging, Utils}
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
-import org.apache.kafka.clients.producer.{KafkaProducer, RecordMetadata, ProducerRecord}
-
-import joptsimple.OptionParser
-import java.util.Properties
-import java.util.concurrent.atomic.{AtomicInteger, AtomicBoolean}
-import java.util.concurrent._
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
 
 /**
- * The mirror maker consists of three major modules:
- *  Consumer Threads - The consumer threads consume messages from source Kafka cluster through
- *                     ZookeeperConsumerConnector and put them into corresponding data channel queue based on hash value
- *                     of source topic-partitionId string. This guarantees the message order in source partition is
- *                     preserved.
- *  Producer Threads - Producer threads take messages out of data channel queues and send them to target cluster. Each
- *                     producer thread is bound to one data channel queue, so that the message order is preserved.
- *  Data Channel - The data channel has multiple queues. The number of queue is same as number of producer threads.
+ * The mirror maker has the following architecture:
+ * - There are N mirror maker thread shares one ZookeeperConsumerConnector and each owns a Kafka stream.
+ * - All the mirror maker threads share one producer.
+ * - Each mirror maker thread periodically flushes the producer and then commits all offsets.
  *
- * If new producer is used, the offset will be committed based on the new producer's callback. An offset map is
- * maintained and updated on each send() callback. A separate offset commit thread will commit the offset periodically.
- * @note For mirror maker, the following settings are required to make sure there is no data loss:
+ * @note For mirror maker, the following settings are set by default to make sure there is no data loss:
  *       1. use new producer with following settings
  *            acks=all
  *            retries=max integer
  *            block.on.buffer.full=true
  *       2. Consumer Settings
  *            auto.commit.enable=false
- *       If --no.data.loss flag is set in option, then those settings are automatically applied.
+ *       3. Mirror Maker Setting:
+ *            abort.on.send.failure=true
  */
 object MirrorMaker extends Logging with KafkaMetricsGroup {
 
-  private var connector: ZookeeperConsumerConnector = null
-  private var consumerThreads: Seq[ConsumerThread] = null
-  private var producerThreads: Seq[ProducerThread] = null
+  private var connectors: Seq[ZookeeperConsumerConnector] = null
+  private var producer: MirrorMakerProducer = null
+  private var mirrorMakerThreads: Seq[MirrorMakerThread] = null
   private val isShuttingdown: AtomicBoolean = new AtomicBoolean(false)
-  private val scheduler: KafkaScheduler = new KafkaScheduler(threads = 1)
-
-  private val unackedOffsetsMap: Pool[TopicAndPartition, UnackedOffsets] =
-      new Pool[TopicAndPartition, UnackedOffsets](valueFactory = Some((k: TopicAndPartition) => new UnackedOffsets))
   // Track the messages unacked for consumer rebalance
-  private var numUnackedMessages: AtomicInteger = new AtomicInteger(0)
-  private var numSkippedUnackedMessages: AtomicInteger = new AtomicInteger(0)
-  private var consumerRebalanceListener: ConsumerRebalanceListener = null
-  // This is to indicate whether the rebalance is going on so the producer callback knows if
-  // the flag indicates internal consumer rebalance callback is waiting for all the messages sent to be acked.
-  private var waitingForMessageAcks: Boolean = false
-
-  private val shutdownMessage : MirrorMakerRecord = new MirrorMakerRecord("shutdown", 0, 0, null, "shutdown".getBytes)
-
-  newGauge("MirrorMaker-NumUnackedMessages",
-    new Gauge[Int] {
-      def value = numUnackedMessages.get()
-    })
-
-  // The number of unacked offsets in the unackedOffsetsMap
-  newGauge("MirrorMaker-UnackedOffsetListsSize",
-    new Gauge[Int] {
-      def value = unackedOffsetsMap.iterator.map{
-        case(_, unackedOffsets) => unackedOffsets.size
-      }.sum
-    })
+  private var numDroppedMessages: AtomicInteger = new AtomicInteger(0)
+  private var messageHandler: MirrorMakerMessageHandler = null
+  private var offsetCommitIntervalMs = 0
+  private var abortOnSendFailure: Boolean = true
+  @volatile private var exitingOnSendFailure: Boolean = false
 
   // If a message send failed after retries are exhausted. The offset of the messages will also be removed from
   // the unacked offset list to avoid offset commit being stuck on that offset. In this case, the offset of that
   // message was not really acked, but was skipped. This metric records the number of skipped offsets.
-  newGauge("MirrorMaker-NumSkippedOffsets",
+  newGauge("MirrorMaker-numDroppedMessages",
     new Gauge[Int] {
-      def value = numSkippedUnackedMessages.get()
+      def value = numDroppedMessages.get()
     })
 
   def main(args: Array[String]) {
-    
-    info ("Starting mirror maker")
+
+    info("Starting mirror maker")
     val parser = new OptionParser
 
     val consumerConfigOpt = parser.accepts("consumer.config",
@@ -113,19 +90,6 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
       .describedAs("config file")
       .ofType(classOf[String])
 
-    val useNewProducerOpt = parser.accepts("new.producer",
-      "Use the new producer implementation.")
-
-    val noDataLossOpt = parser.accepts("no.data.loss",
-      "Configure the mirror maker to have no data loss.")
-
-    val numProducersOpt = parser.accepts("num.producers",
-      "Number of producer instances")
-      .withRequiredArg()
-      .describedAs("Number of producers")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(1)
-    
     val numStreamsOpt = parser.accepts("num.streams",
       "Number of consumption streams.")
       .withRequiredArg()
@@ -133,20 +97,6 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
       .ofType(classOf[java.lang.Integer])
       .defaultsTo(1)
 
-    val bufferSizeOpt =  parser.accepts("queue.size",
-      "Number of messages that are buffered between the consumer and producer")
-      .withRequiredArg()
-      .describedAs("Queue size in terms of number of messages")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(10000)
-
-    val bufferByteSizeOpt =  parser.accepts("queue.byte.size",
-      "Maximum bytes that can be buffered in each data channel queue")
-      .withRequiredArg()
-      .describedAs("Data channel queue size in terms of number of bytes")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(100000000)
-
     val whitelistOpt = parser.accepts("whitelist",
       "Whitelist of topics to mirror.")
       .withRequiredArg()
@@ -160,7 +110,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
       .ofType(classOf[String])
 
     val offsetCommitIntervalMsOpt = parser.accepts("offset.commit.interval.ms",
-       "Offset commit interval in ms")
+      "Offset commit interval in ms")
       .withRequiredArg()
       .describedAs("offset commit interval in millisecond")
       .ofType(classOf[java.lang.Integer])
@@ -172,12 +122,38 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
       .describedAs("A custom rebalance listener of type ConsumerRebalanceListener")
       .ofType(classOf[String])
 
+    val rebalanceListenerArgsOpt = parser.accepts("rebalance.listener.args",
+      "Arguments used by custom rebalance listener for mirror maker consumer")
+      .withRequiredArg()
+      .describedAs("Arguments passed to custom rebalance listener constructor as a string.")
+      .ofType(classOf[String])
+
+    val messageHandlerOpt = parser.accepts("message.handler",
+      "The consumer rebalance listener to use for mirror maker consumer.")
+      .withRequiredArg()
+      .describedAs("A custom rebalance listener of type MirrorMakerMessageHandler")
+      .ofType(classOf[String])
+
+    val messageHandlerArgsOpt = parser.accepts("message.handler.args",
+      "Arguments used by custom rebalance listener for mirror maker consumer")
+      .withRequiredArg()
+      .describedAs("Arguments passed to message handler constructor.")
+      .ofType(classOf[String])
+
+    val abortOnSendFailureOpt = parser.accepts("abort.on.send.failure",
+      "Configure the mirror maker to exit on a failed send.")
+      .withRequiredArg()
+      .describedAs("Stop the entire mirror maker when a send failure occurs")
+      .ofType(classOf[String])
+      .defaultsTo("true")
+
     val helpOpt = parser.accepts("help", "Print this message.")
-    
-    if(args.length == 0)
+
+    if (args.length == 0)
       CommandLineUtils.printUsageAndDie(parser, "Continuously copy data between two Kafka clusters.")
 
-    val options = parser.parse(args : _*)
+
+    val options = parser.parse(args: _*)
 
     if (options.has(helpOpt)) {
       parser.printHelpOn(System.out)
@@ -190,109 +166,112 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
       System.exit(1)
     }
 
-    val numProducers = options.valueOf(numProducersOpt).intValue()
+    abortOnSendFailure = options.valueOf(abortOnSendFailureOpt).toBoolean
+    offsetCommitIntervalMs = options.valueOf(offsetCommitIntervalMsOpt).intValue()
     val numStreams = options.valueOf(numStreamsOpt).intValue()
-    val bufferSize = options.valueOf(bufferSizeOpt).intValue()
-    val bufferByteSize = options.valueOf(bufferByteSizeOpt).intValue()
-    val offsetCommitIntervalMs = options.valueOf(offsetCommitIntervalMsOpt).intValue()
 
-    // create consumer connector
+    // create producer
+    val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt))
+    // Defaults to no data loss settings.
+    maybeSetDefaultProperty(producerProps, "retries", Int.MaxValue.toString)
+    maybeSetDefaultProperty(producerProps, "block.on.buffer.full", "true")
+    maybeSetDefaultProperty(producerProps, "acks", "all")
+    producer = new MirrorMakerProducer(producerProps)
+
+    // Create consumer connector
     val consumerConfigProps = Utils.loadProps(options.valueOf(consumerConfigOpt))
-    val noDataLoss = options.has(noDataLossOpt)
-    // disable consumer auto commit because offset will be committed by offset commit thread.
-    if (noDataLoss)
-      consumerConfigProps.setProperty("auto.commit.enable","false")
-    val consumerConfig = new ConsumerConfig(consumerConfigProps)
-    connector = new ZookeeperConsumerConnector(consumerConfig)
-
-    // create a data channel btw the consumers and the producers
-    val mirrorDataChannel = new DataChannel(bufferSize, bufferByteSize, numInputs = numStreams, numOutputs = numProducers)
-
-    // set consumer rebalance listener
-    // custom rebalance listener will be invoked after internal listener finishes its work.
-    val customRebalanceListenerClass = options.valueOf(consumerRebalanceListenerOpt)
-    val customRebalanceListener = {
-      if (customRebalanceListenerClass != null)
-        Some(Utils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass))
-      else
-        None
+    // Disable consumer auto offsets commit to prevent data loss.
+    maybeSetDefaultProperty(consumerConfigProps, "auto.commit.enable", "false")
+    // Set the consumer timeout so we will not block for low volume pipeline. The timeout is necessary to make sure
+    // Offsets are still committed for those low volume pipelines.
+    maybeSetDefaultProperty(consumerConfigProps, "consumer.timeout.ms", "10000")
+    // The default client id is group id, we manually set client id to groupId-index to avoid metric collision
+    val groupIdString = consumerConfigProps.getProperty("group.id")
+    connectors = (0 until numStreams) map { i =>
+      consumerConfigProps.setProperty("client.id", groupIdString + "-" + i.toString)
+      val consumerConfig = new ConsumerConfig(consumerConfigProps)
+      new ZookeeperConsumerConnector(consumerConfig)
     }
-    consumerRebalanceListener = new InternalRebalanceListener(mirrorDataChannel, customRebalanceListener)
-    connector.setConsumerRebalanceListener(consumerRebalanceListener)
 
-    // create producer threads
-    val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt))
-    val useNewProducer = {
-      // Override producer settings if no.data.loss is set
-      if (noDataLoss) {
-        producerProps.setProperty("retries",Int.MaxValue.toString)
-        producerProps.setProperty("block.on.buffer.full", "true")
-        producerProps.setProperty("acks","all")
-        true
+    // Set consumer rebalance listener.
+    // Custom rebalance listener will be invoked after internal listener finishes its work.
+    val customRebalanceListenerClass = options.valueOf(consumerRebalanceListenerOpt)
+    val rebalanceListenerArgs = options.valueOf(rebalanceListenerArgsOpt)
+    val customRebalanceListener = {
+      if (customRebalanceListenerClass != null) {
+        if (rebalanceListenerArgs != null)
+          Some(Utils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass, rebalanceListenerArgs))
+        else
+          Some(Utils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass))
       } else {
-        options.has(useNewProducerOpt)
+        None
       }
     }
-    val clientId = producerProps.getProperty("client.id", "")
-    producerThreads = (0 until numProducers).map(i => {
-      producerProps.setProperty("client.id", clientId + "-" + i)
-      val producer =
-      if (useNewProducer) {
-        producerProps.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
-                          "org.apache.kafka.common.serialization.ByteArraySerializer")
-        producerProps.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
-                          "org.apache.kafka.common.serialization.ByteArraySerializer")
-        new MirrorMakerNewProducer(producerProps)
-      }
-      else
-        new MirrorMakerOldProducer(producerProps)
-      new ProducerThread(mirrorDataChannel, producer, i)
-    })
-
-    // start offset commit thread
-    if (noDataLoss) {
-      /**
-       * The offset commit thread periodically commit consumed offsets. With the new producer,
-       * the offsets are updated upon the returned future metadata of the send() call; with the old producer,
-       * the offsets are updated upon the consumer's iterator advances. By doing this, it is guaranteed no data
-       * loss even when mirror maker is uncleanly shutdown with the new producer, while with the old producer
-       * messages inside the data channel could be lost upon mirror maker unclean shutdown.
-       */
-      scheduler.startup()
-      scheduler.schedule("offset-commit", commitOffsets, 0, offsetCommitIntervalMs, TimeUnit.MILLISECONDS)
+    connectors.foreach {
+      connector =>
+        val consumerRebalanceListener = new InternalRebalanceListener(connector, customRebalanceListener)
+        connector.setConsumerRebalanceListener(consumerRebalanceListener)
     }
 
-    // create consumer threads
+    // create Kafka streams
     val filterSpec = if (options.has(whitelistOpt))
       new Whitelist(options.valueOf(whitelistOpt))
     else
       new Blacklist(options.valueOf(blacklistOpt))
 
-    var streams: Seq[KafkaStream[Array[Byte], Array[Byte]]] = Nil
-    try {
-      streams = connector.createMessageStreamsByFilter(filterSpec, numStreams, new DefaultDecoder(), new DefaultDecoder())
-    } catch {
-      case t: Throwable =>
-        fatal("Unable to create stream - shutting down mirror maker.", t)
-        connector.shutdown()
+    // create a (connector->stream) sequence
+    val connectorStream = (0 until numStreams) map {
+      i => {
+        var stream: Seq[KafkaStream[Array[Byte], Array[Byte]]] = null
+        try {
+          // Creating just on stream per each connector instance
+          stream = connectors(i).createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder())
+          require(stream.size == 1)
+        } catch {
+          case t: Throwable =>
+            fatal("Unable to create stream - shutting down mirror maker.", t)
+            connectors(i).shutdown()
+        }
+        connectors(i) -> stream(0)
+      }
+    }
+
+    // Create mirror maker threads
+    mirrorMakerThreads = (0 until numStreams) map ( i =>
+        new MirrorMakerThread(connectorStream(i)._1, connectorStream(i)._2, i)
+    )
+
+    // Create and initialize message handler
+    val customMessageHandlerClass = options.valueOf(messageHandlerOpt)
+    val messageHandlerArgs = options.valueOf(messageHandlerArgsOpt)
+    messageHandler = {
+      if (customMessageHandlerClass != null) {
+        if (messageHandlerArgs != null)
+          Utils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass, rebalanceListenerArgs)
+        else
+          Utils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass)
+      } else {
+        defaultMirrorMakerMessageHandler
+      }
     }
-    consumerThreads = streams.zipWithIndex.map(streamAndIndex => new ConsumerThread(streamAndIndex._1, mirrorDataChannel, streamAndIndex._2))
-    assert(consumerThreads.size == numStreams)
 
-    Runtime.getRuntime.addShutdownHook(new Thread() {
+    Runtime.getRuntime.addShutdownHook(new Thread("MirrorMakerShutdownHook") {
       override def run() {
         cleanShutdown()
       }
     })
 
-    consumerThreads.foreach(_.start)
-    producerThreads.foreach(_.start)
+    mirrorMakerThreads.foreach(_.start())
+    mirrorMakerThreads.foreach(_.awaitShutdown())
+  }
 
-    // we wait on producer's shutdown latch instead of consumers
-    // since the consumer threads can hit a timeout/other exception;
-    // but in this case the producer should still be able to shutdown
-    // based on the shutdown message in the channel
-    producerThreads.foreach(_.awaitShutdown())
+  def commitOffsets(connector: ZookeeperConsumerConnector) {
+    if (!exitingOnSendFailure) {
+      trace("Committing offsets.")
+      connector.commitOffsets
+    } else {
+      info("Exiting on send failure, skip committing offsets.")
+    }
   }
 
   def cleanShutdown() {
@@ -300,167 +279,56 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
       info("Start clean shutdown.")
       // Shutdown consumer threads.
       info("Shutting down consumer threads.")
-      if (consumerThreads != null) {
-        consumerThreads.foreach(_.shutdown())
-        consumerThreads.foreach(_.awaitShutdown())
-      }
-      // After consumer threads exit, shutdown producer.
-      info("Shutting down producer threads.")
-      if (producerThreads != null) {
-        producerThreads.foreach(_.shutdown())
-        producerThreads.foreach(_.awaitShutdown())
+      if (mirrorMakerThreads != null) {
+        mirrorMakerThreads.foreach(_.shutdown())
+        mirrorMakerThreads.foreach(_.awaitShutdown())
       }
-      // offset commit thread should only be shutdown after producer threads are shutdown, so we don't lose offsets.
-      scheduler.shutdown()
-      swallow(commitOffsets())
-
-      // connector should only be shutdown after offsets are committed.
+      info("Closing producer.")
+      producer.close()
+      connectors.foreach(commitOffsets)
+      // Connector should only be shutdown after offsets are committed.
       info("Shutting down consumer connectors.")
-      if (connector != null)
-        connector.shutdown()
+      connectors.foreach(_.shutdown())
       info("Kafka mirror maker shutdown successfully")
     }
   }
 
-  class DataChannel(messageCapacity: Int, byteCapacity: Int, numInputs: Int, numOutputs: Int)
-      extends KafkaMetricsGroup {
-
-    val queues = new Array[ByteBoundedBlockingQueue[MirrorMakerRecord]](numOutputs)
-    val channelSizeHists = new Array[Histogram](numOutputs)
-    val channelByteSizeHists = new Array[Histogram](numOutputs)
-    val sizeFunction = (record: MirrorMakerRecord) => record.size
-    for (i <- 0 until numOutputs) {
-      queues(i) = new ByteBoundedBlockingQueue[MirrorMakerRecord](messageCapacity, byteCapacity, Some(sizeFunction))
-      channelSizeHists(i) = newHistogram("MirrorMaker-DataChannel-queue-%d-NumMessages".format(i))
-      channelByteSizeHists(i) = newHistogram("MirrorMaker-DataChannel-queue-%d-Bytes".format(i))
-    }
-    private val channelRecordSizeHist = newHistogram("MirrorMaker-DataChannel-Record-Size")
-
-    // We use a single meter for aggregated wait percentage for the data channel.
-    // Since meter is calculated as total_recorded_value / time_window and
-    // time_window is independent of the number of threads, each recorded wait
-    // time should be discounted by # threads.
-    private val waitPut = newMeter("MirrorMaker-DataChannel-WaitOnPut", "percent", TimeUnit.NANOSECONDS)
-    private val waitTake = newMeter("MirrorMaker-DataChannel-WaitOnTake", "percent", TimeUnit.NANOSECONDS)
-
-    def put(record: MirrorMakerRecord) {
-      // Use hash of source topic-partition to decide which queue to put the message in. The benefit is that
-      // we can maintain the message order for both keyed and non-keyed messages.
-      val queueId =
-        Utils.abs(java.util.Arrays.hashCode((record.sourceTopic + record.sourcePartition).toCharArray)) % numOutputs
-      put(record, queueId)
-    }
-
-    def put(record: MirrorMakerRecord, queueId: Int) {
-      val queue = queues(queueId)
-
-      var putSucceed = false
-      while (!putSucceed) {
-        val startPutTime = SystemTime.nanoseconds
-        putSucceed = queue.offer(record, 500, TimeUnit.MILLISECONDS)
-        waitPut.mark((SystemTime.nanoseconds - startPutTime) / numInputs)
-      }
-      channelSizeHists(queueId).update(queue.size())
-      channelByteSizeHists(queueId).update(queue.byteSize())
-      channelRecordSizeHist.update(sizeFunction(record))
-    }
-
-    def take(queueId: Int): MirrorMakerRecord = {
-      val queue = queues(queueId)
-      var data: MirrorMakerRecord = null
-      while (data == null) {
-        val startTakeTime = SystemTime.nanoseconds
-        data = queue.poll(500, TimeUnit.MILLISECONDS)
-        waitTake.mark((SystemTime.nanoseconds - startTakeTime) / numOutputs)
-      }
-      channelSizeHists(queueId).update(queue.size())
-      channelByteSizeHists(queueId).update(queue.byteSize())
-      data
-    }
-
-    def clear() {
-      queues.foreach(queue => queue.clear())
-    }
-  }
-
-  class ConsumerThread(stream: KafkaStream[Array[Byte], Array[Byte]],
-                       mirrorDataChannel: DataChannel,
-                       threadId: Int)
-          extends Thread with Logging with KafkaMetricsGroup {
-
-    private val shutdownLatch = new CountDownLatch(1)
-    private val threadName = "mirrormaker-consumer-" + threadId
-    this.logIdent = "[%s] ".format(threadName)
-    private var shutdownFlag: Boolean = false
-
-    this.setName(threadName)
-
-    override def run() {
-      info("Starting mirror maker consumer thread " + threadName)
-      try {
-        val iter = stream.iterator()
-        while (!shutdownFlag && iter.hasNext()) {
-          val msgAndMetadata = iter.next()
-          val data = new MirrorMakerRecord(msgAndMetadata.topic,
-                                           msgAndMetadata.partition,
-                                           msgAndMetadata.offset,
-                                           msgAndMetadata.key(),
-                                           msgAndMetadata.message())
-          mirrorDataChannel.put(data)
-        }
-      } catch {
-        case e: Throwable => {
-          fatal("Stream unexpectedly exited.", e)
-        }
-      } finally {
-        shutdownLatch.countDown()
-        info("Consumer thread stopped")
-
-        // If it exits accidentally, stop the entire mirror maker.
-        if (!isShuttingdown.get()) {
-          fatal("Consumer thread exited abnormally, stopping the whole mirror maker.")
-          System.exit(-1)
-        }
-      }
-    }
-
-    def shutdown() {
-      shutdownFlag = true
-    }
-
-    def awaitShutdown() {
-      try {
-        shutdownLatch.await()
-        info("Consumer thread shutdown complete")
-      } catch {
-        case e: InterruptedException => fatal("Shutdown of the consumer thread interrupted. This might leak data!")
-      }
-    }
+  private def maybeSetDefaultProperty(properties: Properties, propertyName: String, defaultValue: String) {
+    properties.setProperty(propertyName, Option(properties.getProperty(propertyName)).getOrElse(defaultValue))
+    if (properties.getProperty(propertyName) != defaultValue)
+      info("Property %s is overridden to %s - data loss or message reordering is possible.")
   }
 
-  class ProducerThread (val dataChannel: DataChannel,
-                        val producer: MirrorMakerBaseProducer,
-                        val threadId: Int) extends Thread with Logging with KafkaMetricsGroup {
-    private val threadName = "mirrormaker-producer-" + threadId
+  class MirrorMakerThread(connector: ZookeeperConsumerConnector,
+                          stream: KafkaStream[Array[Byte], Array[Byte]],
+                          val threadId: Int) extends Thread with Logging with KafkaMetricsGroup {
+    private val threadName = "mirrormaker-thread-" + threadId
     private val shutdownLatch: CountDownLatch = new CountDownLatch(1)
+    private var lastOffsetCommitMs = System.currentTimeMillis()
+    @volatile private var shuttingDown: Boolean = false
     this.logIdent = "[%s] ".format(threadName)
 
     setName(threadName)
 
     override def run() {
-      info("Starting mirror maker producer thread " + threadName)
+      info("Starting mirror maker thread " + threadName)
+      val iter = stream.iterator()
       try {
-        while (true) {
-          val data: MirrorMakerRecord = dataChannel.take(threadId)
-          trace("Sending message with value size %d".format(data.value.size))
-          if(data eq shutdownMessage) {
-            info("Received shutdown message")
-            return
+        // TODO: Need to be changed after KAFKA-1660 is available.
+        while (!exitingOnSendFailure && !shuttingDown) {
+          try {
+            while (!exitingOnSendFailure && !shuttingDown && iter.hasNext()) {
+              val data = iter.next()
+              trace("Sending message with value size %d".format(data.message().size))
+              val records = messageHandler.handle(data)
+              records.foreach(producer.send)
+              maybeFlushAndCommitOffsets()
+            }
+          } catch {
+            case e: ConsumerTimeoutException =>
+              trace("Caught ConsumerTimeoutException, continue iteration.")
           }
-          producer.send(new TopicAndPartition(data.sourceTopic, data.sourcePartition),
-                        data.sourceOffset,
-                        data.key,
-                        data.value)
+          maybeFlushAndCommitOffsets()
         }
       } catch {
         case t: Throwable =>
@@ -470,215 +338,112 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
         info("Producer thread stopped")
         // if it exits accidentally, stop the entire mirror maker
         if (!isShuttingdown.get()) {
-          fatal("Producer thread exited abnormally, stopping the whole mirror maker.")
+          fatal("Mirror maker thread exited abnormally, stopping the whole mirror maker.")
           System.exit(-1)
         }
       }
     }
 
+    def maybeFlushAndCommitOffsets() {
+      if (System.currentTimeMillis() - lastOffsetCommitMs > offsetCommitIntervalMs) {
+        producer.flush()
+        commitOffsets(connector)
+        lastOffsetCommitMs = System.currentTimeMillis()
+      }
+    }
+
     def shutdown() {
       try {
-        info("Producer thread " + threadName + " shutting down")
-        dataChannel.put(shutdownMessage, threadId)
+        info(threadName + " shutting down")
+        shuttingDown = true
       }
       catch {
-        case ie: InterruptedException => {
+        case ie: InterruptedException =>
           warn("Interrupt during shutdown of ProducerThread")
-        }
       }
     }
 
     def awaitShutdown() {
       try {
         shutdownLatch.await()
-        producer.close()
-        info("Producer thread shutdown complete")
+        info("Mirror maker thread shutdown complete")
       } catch {
-        case ie: InterruptedException => {
+        case ie: InterruptedException =>
           warn("Shutdown of the producer thread interrupted")
-        }
       }
     }
   }
 
-  private def commitOffsets()  {
-    try {
-      info("Committing offsets")
-      val offsetsToCommit = collection.immutable.Map(unackedOffsetsMap.map {
-        case (topicPartition, unackedOffsets) =>
-          topicPartition -> OffsetAndMetadata(unackedOffsets.getOffsetToCommit, null)
-      }.toSeq: _*)
-      if (connector == null) {
-        warn("No consumer connector available to commit offset.")
-      } else {
-        connector.commitOffsets(offsetsToCommit, isAutoCommit = false)
-      }
-    } catch {
-      case e: OutOfMemoryError =>
-        fatal("Shutting down mirror maker due to error when committing offsets.", e)
-        System.exit(-1)
-      case t: Throwable =>
-        warn("Offsets commit failed due to ", t)
-    }
-  }
-
-  private[kafka] trait MirrorMakerBaseProducer {
-    def send(sourceTopicPartition: TopicAndPartition, sourceOffset: Long, key: Array[Byte], value: Array[Byte])
-    def close()
-  }
-
-  private class MirrorMakerNewProducer (val producerProps: Properties) extends MirrorMakerBaseProducer {
+  private class MirrorMakerProducer(val producerProps: Properties) {
 
     val sync = producerProps.getProperty("producer.type", "async").equals("sync")
 
     val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps)
 
-    override def send(sourceTopicPartition: TopicAndPartition, sourceOffset: Long, key: Array[Byte], value: Array[Byte]) {
-      val record = new ProducerRecord[Array[Byte], Array[Byte]](sourceTopicPartition.topic, key, value)
-      if(sync) {
+    def send(record: ProducerRecord[Array[Byte], Array[Byte]]) {
+      if (sync) {
         this.producer.send(record).get()
-        unackedOffsetsMap.getAndMaybePut(sourceTopicPartition).maybeUpdateMaxOffsetSeen(sourceOffset)
       } else {
-
-        val unackedOffsets = unackedOffsetsMap.getAndMaybePut(sourceTopicPartition)
-        // synchronize to ensure that addOffset precedes removeOffset
-        unackedOffsets synchronized {
-          val unackedOffset = new UnackedOffset(sourceOffset)
           this.producer.send(record,
-            new MirrorMakerProducerCallback(sourceTopicPartition, unackedOffset, key, value))
-          // add offset to unackedOffsets
-          unackedOffsets.addOffset(unackedOffset)
-          numUnackedMessages.incrementAndGet()
-        }
+            new MirrorMakerProducerCallback(record.topic(), record.key(), record.value()))
       }
     }
 
-    override def close() {
-      this.producer.close()
-    }
-  }
-
-  private class MirrorMakerOldProducer (val producerProps: Properties) extends MirrorMakerBaseProducer {
-
-    // default to byte array partitioner
-    if (producerProps.getProperty("partitioner.class") == null)
-      producerProps.setProperty("partitioner.class", classOf[kafka.producer.ByteArrayPartitioner].getName)
-    val producer = new kafka.producer.Producer[Array[Byte], Array[Byte]](new ProducerConfig(producerProps))
-
-    override def send(topicPartition: TopicAndPartition, offset: Long, key: Array[Byte], value: Array[Byte]) {
-      this.producer.send(new KeyedMessage[Array[Byte], Array[Byte]](topicPartition.topic, key, value))
+    def flush() {
+      this.producer.flush()
     }
 
-    override def close() {
+    def close() {
       this.producer.close()
     }
   }
 
-  private class MirrorMakerProducerCallback (val topicPartition: TopicAndPartition,
-                                             val offset: UnackedOffset,
-                                             val key: Array[Byte],
-                                             val value: Array[Byte])
-    extends ErrorLoggingCallback(topicPartition.topic, key, value, false) {
+  private class MirrorMakerProducerCallback (topic: String, key: Array[Byte], value: Array[Byte])
+    extends ErrorLoggingCallback(topic, key, value, false) {
 
     override def onCompletion(metadata: RecordMetadata, exception: Exception) {
       if (exception != null) {
         // Use default call back to log error. This means the max retries of producer has reached and message
-        // still could not be sent. In this case we have to remove the offsets from list to let the mirror maker
-        // move on. The message failed to be sent will be lost in target cluster.
-        warn("Not be able to send message, offset of "+ topicPartition + " will not advance. Total number" +
-          "of skipped unacked messages is" + numSkippedUnackedMessages.incrementAndGet())
+        // still could not be sent.
         super.onCompletion(metadata, exception)
-      } else {
-        trace("Updating offset for %s to %d".format(topicPartition, offset.element))
-      }
-      // remove the offset from the unackedOffsets
-      val unackedOffsets = unackedOffsetsMap.get(topicPartition)
-      unackedOffsets.removeOffset(offset)
-      // Notify the rebalance callback only when all the messages handed to producer are acked.
-      // There is a very slight chance that one message is held by producer thread and not handed to producer.
-      // That message might have duplicate. We are not handling that here.
-      numUnackedMessages synchronized {
-        if (numUnackedMessages.decrementAndGet() == 0 && waitingForMessageAcks) {
-            numUnackedMessages.notify()
-        }
+        // If abort.on.send.failure is set, stop the mirror maker. Otherwise log skipped message and move on.
+        if (abortOnSendFailure)
+          exitingOnSendFailure = true
       }
     }
   }
 
-  class InternalRebalanceListener (dataChannel: DataChannel, customRebalanceListener: Option[ConsumerRebalanceListener])
-      extends ConsumerRebalanceListener {
 
-    override def beforeReleasingPartitions(partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]]) {
-      info("Clearing data channel.")
-      dataChannel.clear()
-      info("Waiting until all the messages are acked.")
-      numUnackedMessages synchronized {
-        waitingForMessageAcks = true
-        while (numUnackedMessages.get() > 0) {
-          try {
-            numUnackedMessages.wait()
-          } catch {
-            case e: InterruptedException => info("Ignoring interrupt while waiting.")
-          }
-        }
-        waitingForMessageAcks = false
-      }
-      info("Committing offsets.")
-      commitOffsets()
+  private class InternalRebalanceListener(connector: ZookeeperConsumerConnector,
+                                          customRebalanceListener: Option[ConsumerRebalanceListener])
+    extends ConsumerRebalanceListener {
 
+    override def beforeReleasingPartitions(partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]]) {
+      producer.flush()
+      commitOffsets(connector)
       // invoke custom consumer rebalance listener
       if (customRebalanceListener.isDefined)
         customRebalanceListener.get.beforeReleasingPartitions(partitionOwnership)
     }
-  }
 
-  private[kafka] class MirrorMakerRecord (val sourceTopic: String,
-                                          val sourcePartition: Int,
-                                          val sourceOffset: Long,
-                                          val key: Array[Byte],
-                                          val value: Array[Byte]) {
-    def size = {if (value == null) 0 else value.length} + {if (key == null) 0 else key.length}
+    override def beforeStartingFetchers(consumerId: String,
+                                        partitionAssignment: java.util.Map[String, java.util.Map[java.lang.Integer, ConsumerThreadId]]) {
+      if (customRebalanceListener.isDefined)
+        customRebalanceListener.get.beforeStartingFetchers(consumerId, partitionAssignment)
+    }
   }
 
-  private class UnackedOffset(offset: Long) extends DoublyLinkedListNode[Long](offset) {
-
+  /**
+   * If message.handler.args is specified. A constructor that takes in a String as argument must exist.
+   */
+  trait MirrorMakerMessageHandler {
+    def handle(record: MessageAndMetadata[Array[Byte], Array[Byte]]): util.List[ProducerRecord[Array[Byte], Array[Byte]]]
   }
 
-  private class UnackedOffsets {
-    val offsetList = new DoublyLinkedList[Long]
-    var maxOffsetSeen: Long = -1L
-
-    def maybeUpdateMaxOffsetSeen(offset: Long) {
-      this synchronized {
-        maxOffsetSeen = math.max(maxOffsetSeen, offset)
-      }
-    }
-
-    def addOffset(offset: DoublyLinkedListNode[Long]) {
-      this synchronized {
-        offsetList.add(offset)
-        maybeUpdateMaxOffsetSeen(offset.element)
-      }
+  private object defaultMirrorMakerMessageHandler extends MirrorMakerMessageHandler {
+    override def handle(record: MessageAndMetadata[Array[Byte], Array[Byte]]): util.List[ProducerRecord[Array[Byte], Array[Byte]]] = {
+      Collections.singletonList(new ProducerRecord[Array[Byte], Array[Byte]](record.topic, record.key(), record.message()))
     }
-
-    def removeOffset(offset: DoublyLinkedListNode[Long]) {
-      this synchronized {
-        offsetList.remove(offset)
-      }
-    }
-
-    def getOffsetToCommit: Long = {
-      this synchronized {
-        val smallestUnackedOffset = offsetList.peek()
-        if (smallestUnackedOffset == null)
-          // list is empty, commit maxOffsetSeen + 1
-          maxOffsetSeen + 1
-        else
-          // commit the smallest unacked offset
-          smallestUnackedOffset.element
-      }
-    }
-
-    def size: Int = offsetList.size
   }
-}
\ No newline at end of file
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c41c7b40/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
index 543070f..1910fcb 100644
--- a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
@@ -164,7 +164,7 @@ private object PartitionAssignorTest extends Logging {
                               verifyAssignmentIsUniform: Boolean = false) {
     val assignments = scenario.subscriptions.map{ case(consumer, subscription)  =>
       val ctx = new AssignmentContext("g1", consumer, excludeInternalTopics = true, zkClient)
-      assignor.assign(ctx)
+      assignor.assign(ctx).get(consumer)
     }
 
     // check for uniqueness (i.e., any partition should be assigned to exactly one consumer stream)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c41c7b40/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index 19640cc..155fd04 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -17,22 +17,22 @@
 
 package kafka.consumer
 
+import java.util.{Collections, Properties}
+
 import junit.framework.Assert._
+import kafka.common.MessageStreamsExistException
 import kafka.integration.KafkaServerTestHarness
 import kafka.javaapi.consumer.ConsumerRebalanceListener
-import kafka.server._
-import scala.collection._
-import scala.collection.JavaConversions._
-import org.scalatest.junit.JUnit3Suite
 import kafka.message._
 import kafka.serializer._
-import org.I0Itec.zkclient.ZkClient
-import kafka.utils._
-import kafka.producer.{KeyedMessage, Producer}
-import java.util.{Collections, Properties}
-import org.apache.log4j.{Logger, Level}
+import kafka.server._
 import kafka.utils.TestUtils._
-import kafka.common.{TopicAndPartition, MessageStreamsExistException}
+import kafka.utils._
+import org.I0Itec.zkclient.ZkClient
+import org.apache.log4j.{Level, Logger}
+import org.scalatest.junit.JUnit3Suite
+
+import scala.collection._
 
 class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
 
@@ -362,10 +362,18 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
 
     // Check if rebalance listener is fired
-    assertEquals(true, rebalanceListener1.listenerCalled)
+    assertEquals(true, rebalanceListener1.beforeReleasingPartitionsCalled)
+    assertEquals(true, rebalanceListener1.beforeStartingFetchersCalled)
     assertEquals(null, rebalanceListener1.partitionOwnership.get(topic))
+    // Check if partition assignment in rebalance listener is correct
+    assertEquals("group1_consumer1", rebalanceListener1.globalPartitionOwnership.get(topic).get(0).consumer)
+    assertEquals("group1_consumer1", rebalanceListener1.globalPartitionOwnership.get(topic).get(1).consumer)
+    assertEquals(0, rebalanceListener1.globalPartitionOwnership.get(topic).get(0).threadId)
+    assertEquals(0, rebalanceListener1.globalPartitionOwnership.get(topic).get(1).threadId)
+    assertEquals("group1_consumer1", rebalanceListener1.consumerId)
     // reset the flag
-    rebalanceListener1.listenerCalled = false
+    rebalanceListener1.beforeReleasingPartitionsCalled = false
+    rebalanceListener1.beforeStartingFetchersCalled = false
 
     val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
     val expected_1 = List(("0", "group1_consumer1-0"),
@@ -379,16 +387,26 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     zkConsumerConnector2.setConsumerRebalanceListener(rebalanceListener2)
     val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
 
+    // Consume messages from consumer 1 to make sure it has finished rebalance
+    getMessages(nMessages, topicMessageStreams1)
+
     val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir)
     val expected_2 = List(("0", "group1_consumer1-0"),
                           ("1", "group1_consumer2-0"))
     assertEquals(expected_2, actual_2)
 
     // Check if rebalance listener is fired
-    assertEquals(true, rebalanceListener1.listenerCalled)
+    assertEquals(true, rebalanceListener1.beforeReleasingPartitionsCalled)
+    assertEquals(true, rebalanceListener1.beforeStartingFetchersCalled)
     assertEquals(Set[Int](0, 1), rebalanceListener1.partitionOwnership.get(topic))
-    assertEquals(true, rebalanceListener2.listenerCalled)
-    assertEquals(null, rebalanceListener2.partitionOwnership.get(topic))
+    // Check if global partition ownership in rebalance listener is correct
+    assertEquals("group1_consumer1", rebalanceListener1.globalPartitionOwnership.get(topic).get(0).consumer)
+    assertEquals("group1_consumer2", rebalanceListener1.globalPartitionOwnership.get(topic).get(1).consumer)
+    assertEquals(0, rebalanceListener1.globalPartitionOwnership.get(topic).get(0).threadId)
+    assertEquals(0, rebalanceListener1.globalPartitionOwnership.get(topic).get(1).threadId)
+    assertEquals("group1_consumer1", rebalanceListener1.consumerId)
+    assertEquals("group1_consumer2", rebalanceListener2.consumerId)
+    assertEquals(rebalanceListener1.globalPartitionOwnership, rebalanceListener2.globalPartitionOwnership)
     zkConsumerConnector1.shutdown()
     zkConsumerConnector2.shutdown()
   }
@@ -397,7 +415,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     val children = zkClient.getChildren(path)
     Collections.sort(children)
     val childrenAsSeq : Seq[java.lang.String] = {
-      import JavaConversions._
+      import scala.collection.JavaConversions._
       children.toSeq
     }
     childrenAsSeq.map(partition =>
@@ -405,13 +423,22 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
   }
 
   private class TestConsumerRebalanceListener extends ConsumerRebalanceListener {
-    var listenerCalled: Boolean = false
+    var beforeReleasingPartitionsCalled: Boolean = false
+    var beforeStartingFetchersCalled: Boolean = false
+    var consumerId: String = "";
     var partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]] = null
+    var globalPartitionOwnership: java.util.Map[String, java.util.Map[java.lang.Integer, ConsumerThreadId]] = null
 
     override def beforeReleasingPartitions(partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]]) {
-      listenerCalled = true
+      beforeReleasingPartitionsCalled = true
       this.partitionOwnership = partitionOwnership
     }
+
+    override def beforeStartingFetchers(consumerId: String, globalPartitionOwnership: java.util.Map[String, java.util.Map[java.lang.Integer, ConsumerThreadId]]) {
+      beforeStartingFetchersCalled = true
+      this.consumerId = consumerId
+      this.globalPartitionOwnership = globalPartitionOwnership
+    }
   }
 
 }


Mime
View raw message