kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-4402: make the KafkaProducer true round robin per topic
Date Mon, 09 Jan 2017 04:08:46 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk fa80093c0 -> 42a6b7166


KAFKA-4402: make the KafkaProducer true round robin per topic

Author: yaojuncn <yaojuncn@users.noreply.github.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Vahid Hashemian <vahidhashemian@us.ibm.com>,
Konstantin <konstantin@tubemogul.com>, Ismael Juma <ismael@juma.me.uk>, Ewen Cheslack-Postava
<ewen@confluent.io>

Closes #2128 from yaojuncn/KAFKA-4402-client-producer-round-robin-fix


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/42a6b716
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/42a6b716
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/42a6b716

Branch: refs/heads/trunk
Commit: 42a6b7166348455482934f7845160d10cd435eaa
Parents: fa80093
Author: yaojuncn <yaojuncn@users.noreply.github.com>
Authored: Sun Jan 8 20:07:40 2017 -0800
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Sun Jan 8 20:08:07 2017 -0800

----------------------------------------------------------------------
 .../producer/internals/DefaultPartitioner.java  | 18 +++++++++--
 .../internals/DefaultPartitionerTest.java       | 34 +++++++++++++++++++-
 2 files changed, 49 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/42a6b716/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
index 241e809..f8ed0b4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
@@ -19,6 +19,8 @@ package org.apache.kafka.clients.producer.internals;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.kafka.clients.producer.Partitioner;
@@ -35,7 +37,7 @@ import org.apache.kafka.common.utils.Utils;
  */
 public class DefaultPartitioner implements Partitioner {
 
-    private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());
+    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
 
     public void configure(Map<String, ?> configs) {}
 
@@ -53,7 +55,7 @@ public class DefaultPartitioner implements Partitioner {
         List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
         int numPartitions = partitions.size();
         if (keyBytes == null) {
-            int nextValue = counter.getAndIncrement();
+            int nextValue = nextValue(topic);
             List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
             if (availablePartitions.size() > 0) {
                 int part = Utils.toPositive(nextValue) % availablePartitions.size();
@@ -68,6 +70,18 @@ public class DefaultPartitioner implements Partitioner {
         }
     }
 
+    private int nextValue(String topic) {
+        AtomicInteger counter = topicCounterMap.get(topic);
+        if (null == counter) {
+            counter = new AtomicInteger(new Random().nextInt());
+            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
+            if (currentCounter != null) {
+                counter = currentCounter;
+            }
+        }
+        return counter.getAndIncrement();
+    }
+
     public void close() {}
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/42a6b716/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
index ee8441c..2e2c821 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
@@ -19,8 +19,9 @@ import org.apache.kafka.common.PartitionInfo;
 import org.junit.Test;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
-
+import java.util.Map;
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -62,4 +63,35 @@ public class DefaultPartitionerTest {
         }
         assertEquals("The distribution between two available partitions should be even",
countForPart0, countForPart2);
     }
+
+    @Test
+    public void testRoundRobin() throws InterruptedException {
+        final String topicA = "topicA";
+        final String topicB = "topicB";
+
+        List<PartitionInfo> allPartitions = asList(new PartitionInfo(topicA, 0, node0,
nodes, nodes),
+                new PartitionInfo(topicA, 1, node1, nodes, nodes),
+                new PartitionInfo(topicA, 2, node2, nodes, nodes),
+                new PartitionInfo(topicB, 0, node0, nodes, nodes)
+                );
+        Cluster testCluster = new Cluster("clusterId", asList(node0, node1, node2), allPartitions,
+                Collections.<String>emptySet(), Collections.<String>emptySet());
+
+        final Map<Integer, Integer> partitionCount = new HashMap<>();
+
+        for (int i = 0; i < 30; ++i) {
+            int partition = partitioner.partition(topicA, null, null, null, null, testCluster);
+            Integer count = partitionCount.get(partition);
+            if (null == count) count = 0;
+            partitionCount.put(partition, count + 1);
+
+            if (i % 5 == 0) {
+                partitioner.partition(topicB, null, null, null, null, testCluster);
+            }
+        }
+
+        assertEquals(10, (int) partitionCount.get(0));
+        assertEquals(10, (int) partitionCount.get(1));
+        assertEquals(10, (int) partitionCount.get(2));
+    }
 }


Mime
View raw message