kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject git commit: KAFKA-925 Add a partition key that overrides the message's stored key. Patch reviewed by Joel.
Date Mon, 29 Jul 2013 22:34:07 GMT
Updated Branches:
  refs/heads/trunk c27c76846 -> d285e263b


KAFKA-925 Add a partition key that overrides the message's stored key. Patch reviewed by Joel.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d285e263
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d285e263
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d285e263

Branch: refs/heads/trunk
Commit: d285e263bf403f3db27f5d138594c395643a2284
Parents: c27c768
Author: Jay Kreps <jay.kreps@gmail.com>
Authored: Mon Jul 29 15:32:36 2013 -0700
Committer: Jay Kreps <jay.kreps@gmail.com>
Committed: Mon Jul 29 15:32:36 2013 -0700

----------------------------------------------------------------------
 .../scala/kafka/producer/ConsoleProducer.scala  |  2 +-
 .../kafka/producer/DefaultPartitioner.scala     |  4 +--
 .../scala/kafka/producer/KeyedMessage.scala     | 18 ++++++++++--
 .../main/scala/kafka/producer/Partitioner.scala |  4 +--
 .../main/scala/kafka/producer/Producer.scala    |  2 +-
 .../producer/async/DefaultEventHandler.scala    | 11 +++----
 .../scala/other/kafka/TestLogCleaning.scala     |  4 +--
 .../unit/kafka/integration/FetcherTest.scala    |  2 +-
 .../unit/kafka/producer/AsyncProducerTest.scala | 31 ++++++++++----------
 .../test/scala/unit/kafka/utils/TestUtils.scala | 14 ++++-----
 10 files changed, 53 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d285e263/core/src/main/scala/kafka/producer/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
index 5539bce..59222a2 100644
--- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
@@ -60,7 +60,7 @@ object ConsoleProducer {
                                .withRequiredArg
                                .describedAs("queue enqueuetimeout ms")
                                .ofType(classOf[java.lang.Long])
-                               .defaultsTo(0)
+                               .defaultsTo(Int.MaxValue)
     val requestRequiredAcksOpt = parser.accepts("request-required-acks", "The required acks
of the producer requests")
                                .withRequiredArg
                                .describedAs("request required acks")

http://git-wip-us.apache.org/repos/asf/kafka/blob/d285e263/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
index 9bffeb6..37ddd55 100644
--- a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
+++ b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala
@@ -20,10 +20,10 @@ package kafka.producer
 
 import kafka.utils._
 
-private class DefaultPartitioner[T](props: VerifiableProperties = null) extends Partitioner[T]
{
+private class DefaultPartitioner(props: VerifiableProperties = null) extends Partitioner
{
   private val random = new java.util.Random
   
-  def partition(key: T, numPartitions: Int): Int = {
+  def partition(key: Any, numPartitions: Int): Int = {
     Utils.abs(key.hashCode) % numPartitions
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d285e263/core/src/main/scala/kafka/producer/KeyedMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/KeyedMessage.scala b/core/src/main/scala/kafka/producer/KeyedMessage.scala
index b13c4ec..388bc9b 100644
--- a/core/src/main/scala/kafka/producer/KeyedMessage.scala
+++ b/core/src/main/scala/kafka/producer/KeyedMessage.scala
@@ -18,13 +18,25 @@
 package kafka.producer
 
 /**
- * A topic, key, and value
+ * A topic, key, and value.
+ * If a partition key is provided it will override the key for the purpose of partitioning
but will not be stored.
  */
-case class KeyedMessage[K, V](val topic: String, val key: K, val message: V) {
+case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message:
V) {
   if(topic == null)
     throw new IllegalArgumentException("Topic cannot be null.")
   
-  def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], message)
+  def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message)
+  
+  def this(topic: String, key: K, message: V) = this(topic, key, key, message)
+  
+  def partitionKey = {
+    if(partKey != null)
+      partKey
+    else if(hasKey)
+      key
+    else
+      null  
+  }
   
   def hasKey = key != null
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/d285e263/core/src/main/scala/kafka/producer/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/Partitioner.scala b/core/src/main/scala/kafka/producer/Partitioner.scala
index 9ee61c7..efe6d6d 100644
--- a/core/src/main/scala/kafka/producer/Partitioner.scala
+++ b/core/src/main/scala/kafka/producer/Partitioner.scala
@@ -23,11 +23,11 @@ package kafka.producer
  * Implementations will be constructed via reflection and are required to have a constructor
that takes a single 
  * VerifiableProperties instance--this allows passing configuration properties into the partitioner
implementation.
  */
-trait Partitioner[T] {
+trait Partitioner {
   /**
    * Uses the key to calculate a partition bucket id for routing
    * the data to the appropriate broker partition
    * @return an integer between 0 and numPartitions-1
    */
-  def partition(key: T, numPartitions: Int): Int
+  def partition(key: Any, numPartitions: Int): Int
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d285e263/core/src/main/scala/kafka/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala
index bb16a29..ba94e87 100644
--- a/core/src/main/scala/kafka/producer/Producer.scala
+++ b/core/src/main/scala/kafka/producer/Producer.scala
@@ -56,7 +56,7 @@ class Producer[K,V](val config: ProducerConfig,
   def this(config: ProducerConfig) =
     this(config,
          new DefaultEventHandler[K,V](config,
-                                      Utils.createObject[Partitioner[K]](config.partitionerClass,
config.props),
+                                      Utils.createObject[Partitioner](config.partitionerClass,
config.props),
                                       Utils.createObject[Encoder[V]](config.serializerClass,
config.props),
                                       Utils.createObject[Encoder[K]](config.keySerializerClass,
config.props),
                                       new ProducerPool(config)))

http://git-wip-us.apache.org/repos/asf/kafka/blob/d285e263/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 1ecaeaa..48ddb6a 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -28,7 +28,7 @@ import java.util.concurrent.atomic._
 import kafka.api.{TopicMetadata, ProducerRequest}
 
 class DefaultEventHandler[K,V](config: ProducerConfig,
-                               private val partitioner: Partitioner[K],
+                               private val partitioner: Partitioner,
                                private val encoder: Encoder[V],
                                private val keyEncoder: Encoder[K],
                                private val producerPool: ProducerPool,
@@ -126,9 +126,9 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
     events.map{e =>
       try {
         if(e.hasKey)
-          serializedMessages += KeyedMessage[K,Message](topic = e.topic, key = e.key, message
= new Message(key = keyEncoder.toBytes(e.key), bytes = encoder.toBytes(e.message)))
+          serializedMessages += new KeyedMessage[K,Message](topic = e.topic, key = e.key,
partKey = e.partKey, message = new Message(key = keyEncoder.toBytes(e.key), bytes = encoder.toBytes(e.message)))
         else
-          serializedMessages += KeyedMessage[K,Message](topic = e.topic, key = null.asInstanceOf[K],
message = new Message(bytes = encoder.toBytes(e.message)))
+          serializedMessages += new KeyedMessage[K,Message](topic = e.topic, key = e.key,
partKey = e.partKey, message = new Message(bytes = encoder.toBytes(e.message)))
       } catch {
         case t =>
           producerStats.serializationErrorRate.mark()
@@ -149,7 +149,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
     try {
       for (message <- messages) {
         val topicPartitionsList = getPartitionListForTopic(message)
-        val partitionIndex = getPartition(message.topic, message.key, topicPartitionsList)
+        val partitionIndex = getPartition(message.topic, message.partitionKey, topicPartitionsList)
         val brokerPartition = topicPartitionsList(partitionIndex)
 
         // postpone the failure until the send operation, so that requests for other brokers
are handled correctly
@@ -196,11 +196,12 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
   /**
    * Retrieves the partition id and throws an UnknownTopicOrPartitionException if
    * the value of partition is not between 0 and numPartitions-1
+   * @param topic The topic
    * @param key the partition key
    * @param topicPartitionList the list of available partitions
    * @return the partition id
    */
-  private def getPartition(topic: String, key: K, topicPartitionList: Seq[PartitionAndLeader]):
Int = {
+  private def getPartition(topic: String, key: Any, topicPartitionList: Seq[PartitionAndLeader]):
Int = {
     val numPartitions = topicPartitionList.size
     if(numPartitions <= 0)
       throw new UnknownTopicOrPartitionException("Topic " + topic + " doesn't exist")

http://git-wip-us.apache.org/repos/asf/kafka/blob/d285e263/core/src/test/scala/other/kafka/TestLogCleaning.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestLogCleaning.scala b/core/src/test/scala/other/kafka/TestLogCleaning.scala
index 0bef218..22b16e5 100644
--- a/core/src/test/scala/other/kafka/TestLogCleaning.scala
+++ b/core/src/test/scala/other/kafka/TestLogCleaning.scala
@@ -260,9 +260,9 @@ object TestLogCleaning {
       val delete = i % 100 < percentDeletes
       val msg = 
         if(delete)
-          KeyedMessage[String, String](topic = topic, key = key.toString, message = null)
+          new KeyedMessage[String, String](topic = topic, key = key.toString, message = null)
         else
-          KeyedMessage[String, String](topic = topic, key = key.toString, message = i.toString)
+          new KeyedMessage[String, String](topic = topic, key = key.toString, message = i.toString)
       producer.send(msg)
       producedWriter.write(TestRecord(topic, key, i, delete).toString)
       producedWriter.newLine()

http://git-wip-us.apache.org/repos/asf/kafka/blob/d285e263/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 67ed201..b1d56d6 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -88,7 +88,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
                                                                              new StringEncoder())
       val ms = 0.until(messagesPerNode).map(x => (conf.brokerId * 5 + x).toString.getBytes).toArray
       messages += conf.brokerId -> ms
-      producer.send(ms.map(m => KeyedMessage[String, Array[Byte]](topic, topic, m)):_*)
+      producer.send(ms.map(m => new KeyedMessage[String, Array[Byte]](topic, topic, m)):_*)
       producer.close()
       count += ms.size
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d285e263/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index f2f91e8..74a2743 100644
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -158,11 +158,12 @@ class AsyncProducerTest extends JUnit3Suite {
   @Test
   def testPartitionAndCollateEvents() {
     val producerDataList = new ArrayBuffer[KeyedMessage[Int,Message]]
-    producerDataList.append(new KeyedMessage[Int,Message]("topic1", 0, new Message("msg1".getBytes)))
-    producerDataList.append(new KeyedMessage[Int,Message]("topic2", 1, new Message("msg2".getBytes)))
-    producerDataList.append(new KeyedMessage[Int,Message]("topic1", 2, new Message("msg3".getBytes)))
-    producerDataList.append(new KeyedMessage[Int,Message]("topic1", 3, new Message("msg4".getBytes)))
-    producerDataList.append(new KeyedMessage[Int,Message]("topic2", 4, new Message("msg5".getBytes)))
+    // use bogus key and partition key override for some messages
+    producerDataList.append(new KeyedMessage[Int,Message]("topic1", key = 0, message = new
Message("msg1".getBytes)))
+    producerDataList.append(new KeyedMessage[Int,Message]("topic2", key = -99, partKey =
1, message = new Message("msg2".getBytes)))
+    producerDataList.append(new KeyedMessage[Int,Message]("topic1", key = 2, message = new
Message("msg3".getBytes)))
+    producerDataList.append(new KeyedMessage[Int,Message]("topic1", key = -101, partKey =
3, message = new Message("msg4".getBytes)))
+    producerDataList.append(new KeyedMessage[Int,Message]("topic2", key = 4, message = new
Message("msg5".getBytes)))
 
     val props = new Properties()
     props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
@@ -179,8 +180,8 @@ class AsyncProducerTest extends JUnit3Suite {
     topicPartitionInfos.put("topic1", topic1Metadata)
     topicPartitionInfos.put("topic2", topic2Metadata)
 
-    val intPartitioner = new Partitioner[Int] {
-      def partition(key: Int, numPartitions: Int): Int = key % numPartitions
+    val intPartitioner = new Partitioner {
+      def partition(key: Any, numPartitions: Int): Int = key.asInstanceOf[Int] % numPartitions
     }
     val config = new ProducerConfig(props)
 
@@ -195,9 +196,9 @@ class AsyncProducerTest extends JUnit3Suite {
     val topic1Broker1Data =
       ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic1", 0, new
Message("msg1".getBytes)),
                                              new KeyedMessage[Int,Message]("topic1", 2, new
Message("msg3".getBytes)))
-    val topic1Broker2Data = ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic1",
3, new Message("msg4".getBytes)))
+    val topic1Broker2Data = ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic1",
-101, 3, new Message("msg4".getBytes)))
     val topic2Broker1Data = ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic2",
4, new Message("msg5".getBytes)))
-    val topic2Broker2Data = ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic2",
1, new Message("msg2".getBytes)))
+    val topic2Broker2Data = ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic2",
-99, 1, new Message("msg2".getBytes)))
     val expectedResult = Some(Map(
         0 -> Map(
               TopicAndPartition("topic1", 0) -> topic1Broker1Data,
@@ -225,7 +226,7 @@ class AsyncProducerTest extends JUnit3Suite {
     val producerPool = new ProducerPool(config)
 
     val handler = new DefaultEventHandler[String,String](config,
-                                                         partitioner = null.asInstanceOf[Partitioner[String]],
+                                                         partitioner = null.asInstanceOf[Partitioner],
                                                          encoder = new StringEncoder,
                                                          keyEncoder = new StringEncoder,
                                                          producerPool = producerPool,
@@ -285,7 +286,7 @@ class AsyncProducerTest extends JUnit3Suite {
     val producerDataList = new ArrayBuffer[KeyedMessage[String,String]]
     producerDataList.append(new KeyedMessage[String,String]("topic1", "msg1"))
     val handler = new DefaultEventHandler[String,String](config,
-                                                         partitioner = null.asInstanceOf[Partitioner[String]],
+                                                         partitioner = null.asInstanceOf[Partitioner],
                                                          encoder = new StringEncoder,
                                                          keyEncoder = new StringEncoder,
                                                          producerPool = producerPool,
@@ -332,7 +333,7 @@ class AsyncProducerTest extends JUnit3Suite {
 
     val producerPool = new ProducerPool(config)
     val handler = new DefaultEventHandler[String,String](config,
-                                                         partitioner = null.asInstanceOf[Partitioner[String]],
+                                                         partitioner = null.asInstanceOf[Partitioner],
                                                          encoder = null.asInstanceOf[Encoder[String]],
                                                          keyEncoder = null.asInstanceOf[Encoder[String]],
                                                          producerPool = producerPool,
@@ -373,7 +374,7 @@ class AsyncProducerTest extends JUnit3Suite {
     val msgs = TestUtils.getMsgStrings(10)
 
     val handler = new DefaultEventHandler[String,String](config,
-                                                         partitioner = null.asInstanceOf[Partitioner[String]],
+                                                         partitioner = null.asInstanceOf[Partitioner],
                                                          encoder = new StringEncoder,
                                                          keyEncoder = new StringEncoder,
                                                          producerPool = producerPool,
@@ -506,6 +507,6 @@ class AsyncProducerTest extends JUnit3Suite {
   }
 }
 
-class NegativePartitioner(props: VerifiableProperties = null) extends Partitioner[String]
{
-  def partition(data: String, numPartitions: Int): Int = -1
+class NegativePartitioner(props: VerifiableProperties = null) extends Partitioner {
+  def partition(data: Any, numPartitions: Int): Int = -1
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d285e263/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 6b343e3..148bb4b 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -525,18 +525,18 @@ class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int]
{
   override def toBytes(n: Int) = n.toString.getBytes
 }
 
-class StaticPartitioner(props: VerifiableProperties = null) extends Partitioner[String] {
-  def partition(data: String, numPartitions: Int): Int = {
-    (data.length % numPartitions)
+class StaticPartitioner(props: VerifiableProperties = null) extends Partitioner{
+  def partition(data: Any, numPartitions: Int): Int = {
+    (data.asInstanceOf[String].length % numPartitions)
   }
 }
 
-class HashPartitioner(props: VerifiableProperties = null) extends Partitioner[String] {
-  def partition(data: String, numPartitions: Int): Int = {
+class HashPartitioner(props: VerifiableProperties = null) extends Partitioner {
+  def partition(data: Any, numPartitions: Int): Int = {
     (data.hashCode % numPartitions)
   }
 }
 
-class FixedValuePartitioner(props: VerifiableProperties = null) extends Partitioner[Int]
{
-  def partition(data: Int, numPartitions: Int): Int = data
+class FixedValuePartitioner(props: VerifiableProperties = null) extends Partitioner {
+  def partition(data: Any, numPartitions: Int): Int = data.asInstanceOf[Int]
 }


Mime
View raw message