kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-1988; Fix org.apache.kafka.common.utils.Utils.abs and add Partitioner.toPositive; reviewed by Jun Rao and Guozhang Wang
Date Wed, 04 Mar 2015 01:15:52 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 00c643e6f -> 3a9f4b833


KAFKA-1988; Fix org.apache.kafka.common.utils.Utils.abs and add Partitioner.toPositive; reviewed
by Jun Rao and Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3a9f4b83
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3a9f4b83
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3a9f4b83

Branch: refs/heads/trunk
Commit: 3a9f4b833bb7e86dc759361c33f4321ab043db05
Parents: 00c643e
Author: Tong Li <litong02@us.ibm.com>
Authored: Tue Mar 3 17:15:30 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Mar 3 17:15:30 2015 -0800

----------------------------------------------------------------------
 .../clients/producer/internals/Partitioner.java | 23 ++++++++++++++++----
 .../org/apache/kafka/common/utils/Utils.java    |  2 +-
 .../apache/kafka/common/utils/UtilsTest.java    |  9 ++++++++
 3 files changed, 29 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3a9f4b83/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
index dfb936d..93e7991 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.utils.Utils;
 
-
 /**
  * The default partitioning strategy:
  * <ul>
@@ -37,6 +36,22 @@ public class Partitioner {
     private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());
 
     /**
+     * A cheap way to deterministically convert a number to a positive value. When the input
is 
+     * positive, the original value is returned. When the input number is negative, the returned
+     * positive value is the original value bit AND against 0x7fffffff which is not its absolutely
+     * value.
+     * 
+     * Note: changing this method in the future will possibly cause partition selection not
to be
+     * compatible with the existing messages already placed on a partition. 
+     * 
+     * @param number a given number
+     * @return a positive number.
+     */
+    private static int toPositive(int number) {
+        return number & 0x7fffffff; 
+    }
+
+    /**
      * Compute the partition for the given record.
      * 
      * @param topic The topic name
@@ -59,15 +74,15 @@ public class Partitioner {
             int nextValue = counter.getAndIncrement();
             List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
             if (availablePartitions.size() > 0) {
-                int part = Utils.abs(nextValue) % availablePartitions.size();
+                int part = Partitioner.toPositive(nextValue) % availablePartitions.size();
                 return availablePartitions.get(part).partition();
             } else {
                 // no partitions are available, give a non-available partition
-                return Utils.abs(nextValue) % numPartitions;
+                return Partitioner.toPositive(nextValue) % numPartitions;
             }
         } else {
             // hash the key to choose a partition
-            return Utils.abs(Utils.murmur2(key)) % numPartitions;
+            return Partitioner.toPositive(Utils.murmur2(key)) % numPartitions;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a9f4b83/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 69530c1..920b51a 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -163,7 +163,7 @@ public class Utils {
      * java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!).
      */
     public static int abs(int n) {
-        return n & 0x7fffffff;
+        return (n == Integer.MIN_VALUE) ? 0 : Math.abs(n);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a9f4b83/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index 4c2ea34..c899813 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -58,4 +58,13 @@ public class UtilsTest {
         assertEquals("1", Utils.join(Arrays.asList("1"), ","));
         assertEquals("1,2,3", Utils.join(Arrays.asList(1, 2, 3), ","));
     }
+
+    @Test
+    public void testAbs() {
+        assertEquals(0, Utils.abs(Integer.MIN_VALUE));
+        assertEquals(10, Utils.abs(-10));
+        assertEquals(10, Utils.abs(10));
+        assertEquals(0, Utils.abs(0));
+        assertEquals(1, Utils.abs(-1));
+    }
 }
\ No newline at end of file


Mime
View raw message