kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject [kafka] branch trunk updated: KAFKA-3333: Adds RoundRobinPartitioner with tests (#6771)
Date Tue, 09 Jul 2019 18:46:25 GMT
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9973758  KAFKA-3333: Adds RoundRobinPartitioner with tests (#6771)
9973758 is described below

commit 99737588b6db7647d0d03b7306f143714fbae4d5
Author: mmanna-sapfgl <38857392+mmanna-sapfgl@users.noreply.github.com>
AuthorDate: Tue Jul 9 19:46:07 2019 +0100

    KAFKA-3333: Adds RoundRobinPartitioner with tests (#6771)
    
    Reviewers: Colin P. McCabe <cmccabe@apache.org>, Sriharsha Chintalapani <sriharsha@apache.org>,
Matthias J. Sax <matthias@confluent.io>
---
 ...Partitioner.java => RoundRobinPartitioner.java} |  47 +++-----
 .../producer/internals/DefaultPartitioner.java     |   2 +-
 .../producer/RoundRobinPartitionerTest.java        | 124 +++++++++++++++++++++
 3 files changed, 143 insertions(+), 30 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
b/clients/src/main/java/org/apache/kafka/clients/producer/RoundRobinPartitioner.java
similarity index 57%
copy from clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
copy to clients/src/main/java/org/apache/kafka/clients/producer/RoundRobinPartitioner.java
index 9d4ecbf..8c21164 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/RoundRobinPartitioner.java
@@ -14,28 +14,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.clients.producer.internals;
+package org.apache.kafka.clients.producer;
 
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.kafka.clients.producer.Partitioner;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.utils.Utils;
 
 /**
- * The default partitioning strategy:
- * <ul>
- * <li>If a partition is specified in the record, use it
- * <li>If no partition is specified but a key is present choose a partition based on
a hash of the key
- * <li>If no partition or key is present choose a partition in a round-robin fashion
+ * The "Round-Robin" partitioner
+ * 
+ * This partitioning strategy can be used when user wants 
+ * to distribute the writes to all partitions equally. This
+ * is the behaviour regardless of record key hash. 
+ *
  */
-public class DefaultPartitioner implements Partitioner {
+public class RoundRobinPartitioner implements Partitioner {
 
     private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
 
@@ -54,31 +53,21 @@ public class DefaultPartitioner implements Partitioner {
     public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[]
valueBytes, Cluster cluster) {
         List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
         int numPartitions = partitions.size();
-        if (keyBytes == null) {
-            int nextValue = nextValue(topic);
-            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
-            if (availablePartitions.size() > 0) {
-                int part = Utils.toPositive(nextValue) % availablePartitions.size();
-                return availablePartitions.get(part).partition();
-            } else {
-                // no partitions are available, give a non-available partition
-                return Utils.toPositive(nextValue) % numPartitions;
-            }
+        int nextValue = nextValue(topic);
+        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
+        if (!availablePartitions.isEmpty()) {
+            int part = Utils.toPositive(nextValue) % availablePartitions.size();
+            return availablePartitions.get(part).partition();
         } else {
-            // hash the keyBytes to choose a partition
-            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
+            // no partitions are available, give a non-available partition
+            return Utils.toPositive(nextValue) % numPartitions;
         }
     }
 
     private int nextValue(String topic) {
-        AtomicInteger counter = topicCounterMap.get(topic);
-        if (null == counter) {
-            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
-            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
-            if (currentCounter != null) {
-                counter = currentCounter;
-            }
-        }
+        AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
+            return new AtomicInteger(0);
+        });
         return counter.getAndIncrement();
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
index 9d4ecbf..72f8f4b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
@@ -57,7 +57,7 @@ public class DefaultPartitioner implements Partitioner {
         if (keyBytes == null) {
             int nextValue = nextValue(topic);
             List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
-            if (availablePartitions.size() > 0) {
+            if (!availablePartitions.isEmpty()) {
                 int part = Utils.toPositive(nextValue) % availablePartitions.size();
                 return availablePartitions.get(part).partition();
             } else {
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RoundRobinPartitionerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/RoundRobinPartitionerTest.java
new file mode 100644
index 0000000..d1c326b
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/RoundRobinPartitionerTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class RoundRobinPartitionerTest {
+    private byte[] keyBytes = "key".getBytes();
+    private Partitioner partitioner = new RoundRobinPartitioner();
+    private Node node0 = new Node(0, "localhost", 99);
+    private Node node1 = new Node(1, "localhost", 100);
+    private Node node2 = new Node(2, "localhost", 101);
+    private Node[] nodes = new Node[] {node0, node1, node2};
+    private String topic = "test";
+    // Intentionally make the partition list not in partition order to test the edge
+    // cases.
+    private List<PartitionInfo> partitions = asList(new PartitionInfo(topic, 1, null,
nodes, nodes),
+            new PartitionInfo(topic, 2, node1, nodes, nodes), new PartitionInfo(topic, 0,
node0, nodes, nodes));
+    private Cluster cluster = new Cluster("clusterId", asList(node0, node1, node2), partitions,
+            Collections.<String>emptySet(), Collections.<String>emptySet());
+
+    @Test
+    public void testRoundRobinWithUnavailablePartitions() {
+        // When there are some unavailable partitions, we want to make sure that (1) we
+        // always pick an available partition,
+        // and (2) the available partitions are selected in a round robin way.
+        int countForPart0 = 0;
+        int countForPart2 = 0;
+        for (int i = 1; i <= 100; i++) {
+            int part = partitioner.partition(topic, null, null, null, null, cluster);
+            assertTrue("We should never choose a leader-less node in round robin", part ==
0 || part == 2);
+            if (part == 0)
+                countForPart0++;
+            else
+                countForPart2++;
+        }
+        assertEquals("The distribution between two available partitions should be even",
countForPart0, countForPart2);
+    }
+
+    @Test
+    public void testRoundRobinWithKeyBytes() throws InterruptedException {
+        final String topicA = "topicA";
+        final String topicB = "topicB";
+
+        List<PartitionInfo> allPartitions = asList(new PartitionInfo(topicA, 0, node0,
nodes, nodes),
+                new PartitionInfo(topicA, 1, node1, nodes, nodes), new PartitionInfo(topicA,
2, node2, nodes, nodes),
+                new PartitionInfo(topicB, 0, node0, nodes, nodes));
+        Cluster testCluster = new Cluster("clusterId", asList(node0, node1, node2), allPartitions,
+                Collections.<String>emptySet(), Collections.<String>emptySet());
+
+        final Map<Integer, Integer> partitionCount = new HashMap<>();
+
+        for (int i = 0; i < 30; ++i) {
+            int partition = partitioner.partition(topicA, null, keyBytes, null, null, testCluster);
+            Integer count = partitionCount.get(partition);
+            if (null == count)
+                count = 0;
+            partitionCount.put(partition, count + 1);
+
+            if (i % 5 == 0) {
+                partitioner.partition(topicB, null, keyBytes, null, null, testCluster);
+            }
+        }
+
+        assertEquals(10, (int) partitionCount.get(0));
+        assertEquals(10, (int) partitionCount.get(1));
+        assertEquals(10, (int) partitionCount.get(2));
+    }
+    
+    @Test
+    public void testRoundRobinWithNullKeyBytes() throws InterruptedException {
+        final String topicA = "topicA";
+        final String topicB = "topicB";
+
+        List<PartitionInfo> allPartitions = asList(new PartitionInfo(topicA, 0, node0,
nodes, nodes),
+                new PartitionInfo(topicA, 1, node1, nodes, nodes), new PartitionInfo(topicA,
2, node2, nodes, nodes),
+                new PartitionInfo(topicB, 0, node0, nodes, nodes));
+        Cluster testCluster = new Cluster("clusterId", asList(node0, node1, node2), allPartitions,
+                Collections.<String>emptySet(), Collections.<String>emptySet());
+
+        final Map<Integer, Integer> partitionCount = new HashMap<>();
+
+        for (int i = 0; i < 30; ++i) {
+            int partition = partitioner.partition(topicA, null, null, null, null, testCluster);
+            Integer count = partitionCount.get(partition);
+            if (null == count)
+                count = 0;
+            partitionCount.put(partition, count + 1);
+
+            if (i % 5 == 0) {
+                partitioner.partition(topicB, null, null, null, null, testCluster);
+            }
+        }
+
+        assertEquals(10, (int) partitionCount.get(0));
+        assertEquals(10, (int) partitionCount.get(1));
+        assertEquals(10, (int) partitionCount.get(2));
+    }    
+}


Mime
View raw message