kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [kafka] branch trunk updated: KAFKA-12897: KRaft multi-partition placement on single broker (#10823)
Date Mon, 07 Jun 2021 21:15:05 GMT
This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 43db8ac  KAFKA-12897: KRaft multi-partition placement on single broker (#10823)
43db8ac is described below

commit 43db8ac86ac3e609d9e2a993444b8f1b22f7693b
Author: Ron Dagostino <rdagostino@confluent.io>
AuthorDate: Mon Jun 7 17:13:06 2021 -0400

    KAFKA-12897: KRaft multi-partition placement on single broker (#10823)
    
    #10494 introduced a bug in the KRaft controller where the controller will loop forever
in StripedReplicaPlacer trying to identify the racks on which to place partition replicas
if there is a single unfenced broker in the cluster and the number of requested partitions
in a CREATE_TOPICS request is greater than 1.
    
    This patch refactors out some argument sanity checks and invokes those checks in both
RackList and StripedReplicaPlacer, and it adds tests for this as well as the single broker
placement issue.
    
    Reviewers: Jun Rao <junrao@gmail.com>
---
 .../kafka/controller/StripedReplicaPlacer.java     | 42 +++++++++-----
 .../kafka/controller/StripedReplicaPlacerTest.java | 65 ++++++++++++++++++++++
 2 files changed, 94 insertions(+), 13 deletions(-)

diff --git a/metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java
b/metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java
index a2aadc5..031354c 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java
@@ -340,14 +340,14 @@ public class StripedReplicaPlacer implements ReplicaPlacer {
         }
 
         List<Integer> place(int replicationFactor) {
-            if (replicationFactor <= 0) {
-                throw new InvalidReplicationFactorException("Invalid replication factor "
+
-                        replicationFactor + ": the replication factor must be positive.");
-            }
+            throwInvalidReplicationFactorIfNonPositive(replicationFactor);
+            throwInvalidReplicationFactorIfTooFewBrokers(replicationFactor, numTotalBrokers());
+            throwInvalidReplicationFactorIfZero(numUnfencedBrokers());
             // If we have returned as many assignments as there are unfenced brokers in
             // the cluster, shuffle the rack list and broker lists to try to avoid
             // repeating the same assignments again.
-            if (epoch == numUnfencedBrokers) {
+            // But don't reset the iteration epoch for a single unfenced broker -- otherwise
we would loop forever
+            if (epoch == numUnfencedBrokers && numUnfencedBrokers > 1) {
                 shuffle();
                 epoch = 0;
             }
@@ -400,6 +400,27 @@ public class StripedReplicaPlacer implements ReplicaPlacer {
         }
     }
 
+    private static void throwInvalidReplicationFactorIfNonPositive(int replicationFactor)
{
+        if (replicationFactor <= 0) {
+            throw new InvalidReplicationFactorException("Invalid replication factor " +
+                    replicationFactor + ": the replication factor must be positive.");
+        }
+    }
+
+    private static void throwInvalidReplicationFactorIfZero(int numUnfenced) {
+        if (numUnfenced == 0) {
+            throw new InvalidReplicationFactorException("All brokers are currently fenced.");
+        }
+    }
+
+    private static void throwInvalidReplicationFactorIfTooFewBrokers(int replicationFactor,
int numTotalBrokers) {
+        if (replicationFactor > numTotalBrokers) {
+            throw new InvalidReplicationFactorException("The target replication factor "
+
+                    "of " + replicationFactor + " cannot be reached because only " +
+                    numTotalBrokers + " broker(s) are registered.");
+        }
+    }
+
     private final Random random;
 
     public StripedReplicaPlacer(Random random) {
@@ -412,14 +433,9 @@ public class StripedReplicaPlacer implements ReplicaPlacer {
                                      short replicationFactor,
                                      Iterator<UsableBroker> iterator) {
         RackList rackList = new RackList(random, iterator);
-        if (rackList.numUnfencedBrokers() == 0) {
-            throw new InvalidReplicationFactorException("All brokers are currently fenced.");
-        }
-        if (replicationFactor > rackList.numTotalBrokers()) {
-            throw new InvalidReplicationFactorException("The target replication factor "
+
-                "of " + replicationFactor + " cannot be reached because only " +
-                rackList.numTotalBrokers() + " broker(s) are registered.");
-        }
+        throwInvalidReplicationFactorIfNonPositive(replicationFactor);
+        throwInvalidReplicationFactorIfZero(rackList.numUnfencedBrokers());
+        throwInvalidReplicationFactorIfTooFewBrokers(replicationFactor, rackList.numTotalBrokers());
         List<List<Integer>> placements = new ArrayList<>(numPartitions);
         for (int partition = 0; partition < numPartitions; partition++) {
             placements.add(rackList.place(replicationFactor));
diff --git a/metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java
index 667e900..c3fbb09 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java
@@ -85,6 +85,22 @@ public class StripedReplicaPlacerTest {
     }
 
     /**
+     * Test that we perform striped replica placement as expected for a multi-partition topic
+     * on a single unfenced broker
+     */
+    @Test
+    public void testMultiPartitionTopicPlacementOnSingleUnfencedBroker() {
+        MockRandom random = new MockRandom();
+        StripedReplicaPlacer placer = new StripedReplicaPlacer(random);
+        assertEquals(Arrays.asList(Arrays.asList(0),
+                Arrays.asList(0),
+                Arrays.asList(0)),
+                placer.place(0, 3, (short) 1, Arrays.asList(
+                        new UsableBroker(0, Optional.empty(), false),
+                        new UsableBroker(1, Optional.empty(), true)).iterator()));
+    }
+
+    /**
      * Test that we will place on the fenced replica if we need to.
      */
     @Test
@@ -168,6 +184,17 @@ public class StripedReplicaPlacerTest {
     }
 
     @Test
+    public void testNonPositiveReplicationFactor() {
+        MockRandom random = new MockRandom();
+        StripedReplicaPlacer placer = new StripedReplicaPlacer(random);
+        assertEquals("Invalid replication factor 0: the replication factor must be positive.",
+                assertThrows(InvalidReplicationFactorException.class,
+                        () -> placer.place(0, 1, (short) 0, Arrays.asList(
+                                new UsableBroker(11, Optional.of("1"), false),
+                                new UsableBroker(10, Optional.of("1"), false)).iterator())).getMessage());
+    }
+
+    @Test
     public void testSuccessfulPlacement() {
         MockRandom random = new MockRandom();
         StripedReplicaPlacer placer = new StripedReplicaPlacer(random);
@@ -210,4 +237,42 @@ public class StripedReplicaPlacerTest {
         assertEquals(11, counts.get(Arrays.asList(3, 2)));
     }
 
+    @Test
+    public void testRackListAllBrokersFenced() {
+        // ensure we can place N replicas on a rack when the rack has less than N brokers
+        MockRandom random = new MockRandom();
+        RackList rackList = new RackList(random, Arrays.asList(
+                new UsableBroker(0, Optional.empty(), true),
+                new UsableBroker(1, Optional.empty(), true),
+                new UsableBroker(2, Optional.empty(), true)).iterator());
+        assertEquals(3, rackList.numTotalBrokers());
+        assertEquals(0, rackList.numUnfencedBrokers());
+        assertEquals(Collections.singletonList(Optional.empty()), rackList.rackNames());
+        assertEquals("All brokers are currently fenced.",
+                assertThrows(InvalidReplicationFactorException.class,
+                        () -> rackList.place(3)).getMessage());
+    }
+
+    @Test
+    public void testRackListNotEnoughBrokers() {
+        MockRandom random = new MockRandom();
+        RackList rackList = new RackList(random, Arrays.asList(
+                new UsableBroker(11, Optional.of("1"), false),
+                new UsableBroker(10, Optional.of("1"), false)).iterator());
+        assertEquals("The target replication factor of 3 cannot be reached because only "
+
+                        "2 broker(s) are registered.",
+                assertThrows(InvalidReplicationFactorException.class,
+                        () -> rackList.place(3)).getMessage());
+    }
+
+    @Test
+    public void testRackListNonPositiveReplicationFactor() {
+        MockRandom random = new MockRandom();
+        RackList rackList = new RackList(random, Arrays.asList(
+                new UsableBroker(11, Optional.of("1"), false),
+                new UsableBroker(10, Optional.of("1"), false)).iterator());
+        assertEquals("Invalid replication factor -1: the replication factor must be positive.",
+                assertThrows(InvalidReplicationFactorException.class,
+                        () -> rackList.place(-1)).getMessage());
+    }
 }

Mime
View raw message