kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: kafka-1984; java producer may miss an available partition; patched by Jun Rao; reviewed by Ewen Cheslack-Postava, Jay Kreps, and Guozhang Wang
Date Tue, 24 Feb 2015 21:52:23 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.8.2 5f3eb1caf -> ee1267b12


kafka-1984; java producer may miss an available partition; patched by Jun Rao; reviewed by
Ewen Cheslack-Postava, Jay Kreps, and Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ee1267b1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ee1267b1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ee1267b1

Branch: refs/heads/0.8.2
Commit: ee1267b127f3081db491fa1bf9a287084c324e36
Parents: 5f3eb1c
Author: Jun Rao <junrao@gmail.com>
Authored: Tue Feb 24 13:52:10 2015 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Tue Feb 24 13:52:10 2015 -0800

----------------------------------------------------------------------
 .../clients/producer/internals/Partitioner.java | 15 +++++-----
 .../java/org/apache/kafka/common/Cluster.java   | 24 ++++++++++++++--
 .../kafka/clients/producer/PartitionerTest.java | 30 ++++++++++----------
 3 files changed, 45 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ee1267b1/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
index 483899d..f5abdd1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
@@ -55,14 +55,15 @@ public class Partitioner {
                                                    + "].");
             return record.partition();
         } else if (record.key() == null) {
-            // choose the next available node in a round-robin fashion
-            for (int i = 0; i < numPartitions; i++) {
-                int partition = Utils.abs(counter.getAndIncrement()) % numPartitions;
-                if (partitions.get(partition).leader() != null)
-                    return partition;
+            int nextValue = counter.getAndIncrement();
+            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(record.topic());
+            if (availablePartitions.size() > 0) {
+                int part = Utils.abs(nextValue) % availablePartitions.size();
+                return availablePartitions.get(part).partition();
+            } else {
+                // no partitions are available, give a non-available partition
+                return Utils.abs(nextValue) % numPartitions;
             }
-            // no partitions are available, give a non-available partition
-            return Utils.abs(counter.getAndIncrement()) % numPartitions;
         } else {
             // hash the key to choose a partition
             return Utils.abs(Utils.murmur2(record.key())) % numPartitions;

http://git-wip-us.apache.org/repos/asf/kafka/blob/ee1267b1/clients/src/main/java/org/apache/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index d3299b9..1e54527 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -25,6 +25,7 @@ public final class Cluster {
     private final List<Node> nodes;
     private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
     private final Map<String, List<PartitionInfo>> partitionsByTopic;
+    private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
     private final Map<Integer, List<PartitionInfo>> partitionsByNode;
 
     /**
@@ -63,8 +64,18 @@ public final class Cluster {
             }
         }
         this.partitionsByTopic = new HashMap<String, List<PartitionInfo>>(partsForTopic.size());
-        for (Map.Entry<String, List<PartitionInfo>> entry : partsForTopic.entrySet())
-            this.partitionsByTopic.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
+        this.availablePartitionsByTopic = new HashMap<String, List<PartitionInfo>>(partsForTopic.size());
+        for (Map.Entry<String, List<PartitionInfo>> entry : partsForTopic.entrySet())
{
+            String topic = entry.getKey();
+            List<PartitionInfo> partitionList = entry.getValue();
+            this.partitionsByTopic.put(topic, Collections.unmodifiableList(partitionList));
+            List<PartitionInfo> availablePartitions = new ArrayList<PartitionInfo>();
+            for (PartitionInfo part : partitionList) {
+                if (part.leader() != null)
+                    availablePartitions.add(part);
+            }
+            this.availablePartitionsByTopic.put(topic, Collections.unmodifiableList(availablePartitions));
+        }
         this.partitionsByNode = new HashMap<Integer, List<PartitionInfo>>(partsForNode.size());
         for (Map.Entry<Integer, List<PartitionInfo>> entry : partsForNode.entrySet())
             this.partitionsByNode.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
@@ -130,6 +141,15 @@ public final class Cluster {
     }
 
     /**
+     * Get the list of available partitions for this topic
+     * @param topic The topic name
+     * @return A list of partitions
+     */
+    public List<PartitionInfo> availablePartitionsForTopic(String topic) {
+        return this.availablePartitionsByTopic.get(topic);
+    }
+
+    /**
      * Get the list of partitions whose leader is this node
      * @param nodeId The node id
      * @return A list of partitions

http://git-wip-us.apache.org/repos/asf/kafka/blob/ee1267b1/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
index 1002f05..2519ce4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
@@ -38,9 +38,10 @@ public class PartitionerTest {
     private Node node2 = new Node(2, "localhost", 101);
     private Node[] nodes = new Node[] { node0, node1, node2 };
     private String topic = "test";
-    private List<PartitionInfo> partitions = asList(new PartitionInfo(topic, 0, node0,
nodes, nodes),
-                                                    new PartitionInfo(topic, 1, node1, nodes,
nodes),
-                                                    new PartitionInfo(topic, 2, null, nodes,
nodes));
+    // Intentionally make the partition list not in partition order to test the edge cases.
+    private List<PartitionInfo> partitions = asList(new PartitionInfo(topic, 1, null,
nodes, nodes),
+                                                    new PartitionInfo(topic, 2, node1, nodes,
nodes),
+                                                    new PartitionInfo(topic, 0, node0, nodes,
nodes));
     private Cluster cluster = new Cluster(asList(node0, node1, node2), partitions);
 
     @Test
@@ -59,20 +60,19 @@ public class PartitionerTest {
     }
 
     @Test
-    public void testRoundRobinIsStable() {
-        int startPart = partitioner.partition(new ProducerRecord<byte[], byte[]>("test",
value), cluster);
+    public void testRoundRobinWithUnavailablePartitions() {
+        // When there are some unavailable partitions, we want to make sure that (1) we always
pick an available partition,
+        // and (2) the available partitions are selected in a round robin way.
+        int countForPart0 = 0;
+        int countForPart2 = 0;
         for (int i = 1; i <= 100; i++) {
-            int partition = partitioner.partition(new ProducerRecord<byte[], byte[]>("test",
value), cluster);
-            assertEquals("Should yield a different partition each call with round-robin partitioner",
-                partition, (startPart + i) % 2);
-      }
-    }
-
-    @Test
-    public void testRoundRobinWithDownNode() {
-        for (int i = 0; i < partitions.size(); i++) {
             int part = partitioner.partition(new ProducerRecord<byte[], byte[]>("test",
value), cluster);
-            assertTrue("We should never choose a leader-less node in round robin", part >=
0 && part < 2);
+            assertTrue("We should never choose a leader-less node in round robin", part ==
0 || part == 2);
+            if (part == 0)
+                countForPart0++;
+            else
+                countForPart2++;
         }
+        assertEquals("The distribution between two available partitions should be even",
countForPart0, countForPart2);
     }
 }


Mime
View raw message