kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject git commit: KAFKA-957; MirrorMaker needs to preserve ordering for keyed messages from source cluster; patched by Guozhang Wang, reviewed by Joel Koshy
Date Wed, 24 Jul 2013 18:43:17 GMT
Updated Branches:
  refs/heads/0.8 401d59199 -> 5cf6a5466


KAFKA-957; MirrorMaker needs to preserve ordering for keyed messages from source cluster;
patched by Guozhang Wang, reviewed by Joel Koshy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5cf6a546
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5cf6a546
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5cf6a546

Branch: refs/heads/0.8
Commit: 5cf6a546649c213c33ef41d5d47e6af959c0c587
Parents: 401d591
Author: Joel Koshy <jjkoshy@gmail.com>
Authored: Wed Jul 24 10:28:26 2013 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Wed Jul 24 11:41:22 2013 -0700

----------------------------------------------------------------------
 .../kafka/producer/ByteArrayPartitioner.scala   | 27 ++++++++++++++++++
 .../main/scala/kafka/tools/MirrorMaker.scala    | 30 ++++++++++++++++----
 2 files changed, 52 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf6a546/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala b/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala
new file mode 100644
index 0000000..752a4fc
--- /dev/null
+++ b/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.producer
+
+
+import kafka.utils._
+
+private class ByteArrayPartitioner(props: VerifiableProperties = null) extends Partitioner[Array[Byte]]
{
+  def partition(key: Array[Byte], numPartitions: Int): Int = {
+    Utils.abs(java.util.Arrays.hashCode(key)) % numPartitions
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf6a546/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 2d93947..a85bfa2 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -22,6 +22,7 @@ import kafka.utils.{Utils, CommandLineUtils, Logging}
 import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
 import scala.collection.JavaConversions._
 import java.util.concurrent.CountDownLatch
+import java.nio.ByteBuffer
 import kafka.consumer._
 import kafka.serializer._
 import collection.mutable.ListBuffer
@@ -99,8 +100,15 @@ object MirrorMaker extends Logging {
     val bufferSize = options.valueOf(bufferSizeOpt).intValue()
 
     val producers = (1 to options.valueOf(numProducersOpt).intValue()).map(_ => {
-      val config = new ProducerConfig(
-        Utils.loadProps(options.valueOf(producerConfigOpt)))
+      val props = Utils.loadProps(options.valueOf(producerConfigOpt))
+      val config = props.getProperty("partitioner.class") match {
+        case null =>
+          new ProducerConfig(props) {
+            override val partitionerClass = "kafka.producer.ByteArrayPartitioner"
+          }
+        case pClass : String =>
+          new ProducerConfig(props)
+      }
       new Producer[Array[Byte], Array[Byte]](config)
     })
 
@@ -125,7 +133,7 @@ object MirrorMaker extends Logging {
     val producerDataChannel = new ProducerDataChannel[KeyedMessage[Array[Byte], Array[Byte]]](bufferSize);
 
     val consumerThreads =
-      streams.zipWithIndex.map(streamAndIndex => new MirrorMakerThread(streamAndIndex._1,
producerDataChannel, streamAndIndex._2))
+      streams.zipWithIndex.map(streamAndIndex => new MirrorMakerThread(streamAndIndex._1,
producerDataChannel, producers, streamAndIndex._2))
 
     val producerThreads = new ListBuffer[ProducerThread]()
 
@@ -162,6 +170,7 @@ object MirrorMaker extends Logging {
 
   class MirrorMakerThread(stream: KafkaStream[Array[Byte], Array[Byte]],
                           producerDataChannel: ProducerDataChannel[KeyedMessage[Array[Byte],
Array[Byte]]],
+                          producers: Seq[Producer[Array[Byte], Array[Byte]]],
                           threadId: Int)
           extends Thread with Logging {
 
@@ -174,8 +183,19 @@ object MirrorMaker extends Logging {
       info("Starting mirror maker thread " + threadName)
       try {
         for (msgAndMetadata <- stream) {
-          val pd = new KeyedMessage[Array[Byte], Array[Byte]](msgAndMetadata.topic, msgAndMetadata.message)
-          producerDataChannel.sendRequest(pd)
+          // If the key of the message is empty, put it into the universal channel
+          // Otherwise use a pre-assigned producer to send the message
+          if (msgAndMetadata.key == null) {
+            trace("Send the non-keyed message the producer channel.")
+            val pd = new KeyedMessage[Array[Byte], Array[Byte]](msgAndMetadata.topic, msgAndMetadata.message)
+            producerDataChannel.sendRequest(pd)
+          } else {
+            val producerId = Utils.abs(java.util.Arrays.hashCode(msgAndMetadata.key)) % producers.size()
+            trace("Send message with key %s to producer %d.".format(java.util.Arrays.toString(msgAndMetadata.key),
producerId))
+            val producer = producers(producerId)
+            val pd = new KeyedMessage[Array[Byte], Array[Byte]](msgAndMetadata.topic, msgAndMetadata.key,
msgAndMetadata.message)
+            producer.send(pd)
+          }
         }
       } catch {
         case e =>


Mime
View raw message