Updated Branches:
refs/heads/0.8 dcbf0bf0b -> 93921a3a5
KAFKA-1071; The random partition selected in DefaultEventHandler is not random across producer
instances; reviewed by Neha Narkhede and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/93921a3a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/93921a3a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/93921a3a
Branch: refs/heads/0.8
Commit: 93921a3a5720a1ffd9e272d59d8a7627da28e89e
Parents: dcbf0bf
Author: Guozhang Wang <guwang@linkedin.com>
Authored: Thu Oct 3 17:46:53 2013 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Thu Oct 3 17:46:53 2013 -0700
----------------------------------------------------------------------
.../main/scala/kafka/producer/async/DefaultEventHandler.scala | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/93921a3a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index c151032..eba375b 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -22,6 +22,7 @@ import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
import kafka.producer._
import kafka.serializer.Encoder
import kafka.utils.{Utils, Logging, SystemTime}
+import scala.util.Random
import scala.collection.{Seq, Map}
import scala.collection.mutable.{ArrayBuffer, HashMap, Set}
import java.util.concurrent.atomic._
@@ -36,7 +37,6 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
extends EventHandler[K,V] with Logging {
val isSync = ("sync" == config.producerType)
- val partitionCounter = new AtomicInteger(0)
val correlationId = new AtomicInteger(0)
val brokerPartitionInfo = new BrokerPartitionInfo(config, producerPool, topicPartitionInfos)
@@ -217,7 +217,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
if (availablePartitions.isEmpty)
throw new LeaderNotAvailableException("No leader for any partition in topic
" + topic)
- val index = Utils.abs(partitionCounter.getAndIncrement()) % availablePartitions.size
+ val index = Utils.abs(Random.nextInt) % availablePartitions.size
val partitionId = availablePartitions(index).partitionId
sendPartitionPerTopicCache.put(topic, partitionId)
partitionId
|