kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vvcep...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6145: KIP 441 remove balance factor (#8597)
Date Fri, 01 May 2020 14:59:41 GMT
This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 95edaba  KAFKA-6145: KIP 441 remove balance factor (#8597)
95edaba is described below

commit 95edaba8615d4ca2d623722eee38eb78fc24d317
Author: A. Sophie Blee-Goldman <sophie@confluent.io>
AuthorDate: Fri May 1 07:59:08 2020 -0700

    KAFKA-6145: KIP 441 remove balance factor (#8597)
    
    Reviewers: John Roesler <vvcephei@apache.org>
---
 .../org/apache/kafka/streams/StreamsConfig.java    | 12 +---
 .../internals/StreamsPartitionAssignor.java        |  4 --
 .../assignment/AssignorConfiguration.java          |  4 --
 .../internals/assignment/BalancedAssignor.java     |  3 +-
 .../assignment/DefaultBalancedAssignor.java        | 10 ++--
 .../assignment/HighAvailabilityTaskAssignor.java   |  3 +-
 .../apache/kafka/streams/StreamsConfigTest.java    | 12 ----
 .../internals/StreamsPartitionAssignorTest.java    |  2 -
 .../assignment/DefaultBalancedAssignorTest.java    | 69 +++-------------------
 .../assignment/FallbackPriorTaskAssignorTest.java  |  2 +-
 .../HighAvailabilityTaskAssignorTest.java          |  5 --
 .../assignment/StickyTaskAssignorTest.java         |  4 +-
 .../assignment/TaskAssignorConvergenceTest.java    |  4 --
 13 files changed, 18 insertions(+), 116 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 1df6839..4688848 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -329,10 +329,6 @@ public class StreamsConfig extends AbstractConfig {
     public static final String APPLICATION_SERVER_CONFIG = "application.server";
     private static final String APPLICATION_SERVER_DOC = "A host:port pair pointing to a
user-defined endpoint that can be used for state store discovery and interactive queries on
this KafkaStreams instance.";
 
-    /** {@code balance.factor} */
-    public static final String BALANCE_FACTOR_CONFIG = "balance.factor";
-    private static final String BALANCE_FACTOR_DOC = "Maximum difference in the number of
stateful (and total) active tasks assigned to the stream thread with the most tasks and the
stream thread with the least in a steady-state assignment. Must be at least 1.";
-
     /** {@code bootstrap.servers} */
     @SuppressWarnings("WeakerAccess")
     public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
@@ -462,7 +458,7 @@ public class StreamsConfig extends AbstractConfig {
     /** {@code probing.rebalance.interval.ms} */
     public static final String PROBING_REBALANCE_INTERVAL_MS_CONFIG = "probing.rebalance.interval.ms";
     private static final String PROBING_REBALANCE_INTERVAL_MS_DOC = "The maximum time to
wait before triggering a rebalance to probe for warmup replicas that have finished warming
up and are ready to become active. Probing rebalances " +
-                                                                        "will continue to
be triggered until the assignment is balanced according to the " + BALANCE_FACTOR_CONFIG +
". Must be at least 1 minute.";
+                                                                        "will continue to
be triggered until the assignment is balanced. Must be at least 1 minute.";
 
     /** {@code processing.guarantee} */
     @SuppressWarnings("WeakerAccess")
@@ -681,12 +677,6 @@ public class StreamsConfig extends AbstractConfig {
                     "",
                     Importance.LOW,
                     APPLICATION_SERVER_DOC)
-            .define(BALANCE_FACTOR_CONFIG,
-                    Type.INT,
-                    1,
-                    atLeast(1),
-                    Importance.LOW,
-                    BALANCE_FACTOR_DOC)
             .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
                     Type.INT,
                     1000,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 666da21..242cff7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -1572,10 +1572,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
         return assignmentConfigs.acceptableRecoveryLag;
     }
 
-    int balanceFactor() {
-        return assignmentConfigs.balanceFactor;
-    }
-
     int maxWarmupReplicas() {
         return assignmentConfigs.maxWarmupReplicas;
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
index 2a5d1d4..8ea63fc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
@@ -342,7 +342,6 @@ public final class AssignorConfiguration {
 
     public static class AssignmentConfigs {
         public final long acceptableRecoveryLag;
-        public final int balanceFactor;
         public final int maxWarmupReplicas;
         public final int numStandbyReplicas;
         public final long probingRebalanceIntervalMs;
@@ -350,7 +349,6 @@ public final class AssignorConfiguration {
         private AssignmentConfigs(final StreamsConfig configs) {
             this(
                 configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG),
-                configs.getInt(StreamsConfig.BALANCE_FACTOR_CONFIG),
                 configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG),
                 configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG),
                 configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG)
@@ -358,12 +356,10 @@ public final class AssignorConfiguration {
         }
 
         AssignmentConfigs(final Long acceptableRecoveryLag,
-                          final Integer balanceFactor,
                           final Integer maxWarmupReplicas,
                           final Integer numStandbyReplicas,
                           final Long probingRebalanceIntervalMs) {
             this.acceptableRecoveryLag = acceptableRecoveryLag;
-            this.balanceFactor = balanceFactor;
             this.maxWarmupReplicas = maxWarmupReplicas;
             this.numStandbyReplicas = numStandbyReplicas;
             this.probingRebalanceIntervalMs = probingRebalanceIntervalMs;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/BalancedAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/BalancedAssignor.java
index 5c2136a..70c9070 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/BalancedAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/BalancedAssignor.java
@@ -26,6 +26,5 @@ public interface BalancedAssignor {
 
     Map<UUID, List<TaskId>> assign(final SortedSet<UUID> clients,
                                    final SortedSet<TaskId> tasks,
-                                   final Map<UUID, Integer> clientsToNumberOfStreamThreads,
-                                   final int balanceFactor);
+                                   final Map<UUID, Integer> clientsToNumberOfStreamThreads);
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultBalancedAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultBalancedAssignor.java
index 60d96c6..c75c831 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultBalancedAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultBalancedAssignor.java
@@ -31,12 +31,11 @@ public class DefaultBalancedAssignor implements BalancedAssignor {
     @Override
     public Map<UUID, List<TaskId>> assign(final SortedSet<UUID> clients,
                                           final SortedSet<TaskId> tasks,
-                                          final Map<UUID, Integer> clientsToNumberOfStreamThreads,
-                                          final int balanceFactor) {
+                                          final Map<UUID, Integer> clientsToNumberOfStreamThreads)
{
         final Map<UUID, List<TaskId>> assignment = new HashMap<>();
         clients.forEach(client -> assignment.put(client, new ArrayList<>()));
         distributeTasksEvenlyOverClients(assignment, clients, tasks);
-        balanceTasksOverStreamThreads(assignment, clients, clientsToNumberOfStreamThreads,
balanceFactor);
+        balanceTasksOverStreamThreads(assignment, clients, clientsToNumberOfStreamThreads);
         return assignment;
     }
 
@@ -58,8 +57,7 @@ public class DefaultBalancedAssignor implements BalancedAssignor {
 
     private void balanceTasksOverStreamThreads(final Map<UUID, List<TaskId>>
assignment,
                                                final SortedSet<UUID> clients,
-                                               final Map<UUID, Integer> clientsToNumberOfStreamThreads,
-                                               final int balanceFactor) {
+                                               final Map<UUID, Integer> clientsToNumberOfStreamThreads)
{
         boolean stop = false;
         while (!stop) {
             stop = true;
@@ -74,7 +72,7 @@ public class DefaultBalancedAssignor implements BalancedAssignor {
                         destinationTasks.size() / clientsToNumberOfStreamThreads.get(destinationClient);
                     final int assignedTasksPerStreamThreadAtSource =
                         sourceTasks.size() / clientsToNumberOfStreamThreads.get(sourceClient);
-                    if (assignedTasksPerStreamThreadAtSource - assignedTasksPerStreamThreadAtDestination
> balanceFactor) {
+                    if (assignedTasksPerStreamThreadAtSource - assignedTasksPerStreamThreadAtDestination
> 1) {
                         final Iterator<TaskId> sourceIterator = sourceTasks.iterator();
                         final TaskId taskToMove = sourceIterator.next();
                         sourceIterator.remove();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
index 5dbf099..404b793 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
@@ -99,8 +99,7 @@ public class HighAvailabilityTaskAssignor implements TaskAssignor {
         final Map<UUID, List<TaskId>> statefulActiveTaskAssignment = new DefaultBalancedAssignor().assign(
             sortedClients,
             statefulTasks,
-            clientsToNumberOfThreads,
-            configs.balanceFactor
+            clientsToNumberOfThreads
         );
 
         return assignTaskMovements(
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index a6701b0..0b4e614 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -939,18 +939,6 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void shouldSetDefaultBalanceFactor() {
-        final StreamsConfig config = new StreamsConfig(props);
-        assertThat(config.getInt(StreamsConfig.BALANCE_FACTOR_CONFIG), is(1));
-    }
-
-    @Test
-    public void shouldThrowConfigExceptionIfBalanceFactorIsOutsideBounds() {
-        props.put(StreamsConfig.BALANCE_FACTOR_CONFIG, 0);
-        assertThrows(ConfigException.class, () -> new StreamsConfig(props));
-    }
-
-    @Test
     public void shouldSetDefaultNumStandbyReplicas() {
         final StreamsConfig config = new StreamsConfig(props);
         assertThat(config.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG), is(0));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 814bbcf..840d48e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -1743,7 +1743,6 @@ public class StreamsPartitionAssignorTest {
 
         final Map<String, Object> props = configProps();
         props.put(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, 11);
-        props.put(StreamsConfig.BALANCE_FACTOR_CONFIG, 22);
         props.put(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, 33);
         props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 44);
         props.put(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, 55 * 60 * 1000L);
@@ -1751,7 +1750,6 @@ public class StreamsPartitionAssignorTest {
         partitionAssignor.configure(props);
 
         assertThat(partitionAssignor.acceptableRecoveryLag(), equalTo(11L));
-        assertThat(partitionAssignor.balanceFactor(), equalTo(22));
         assertThat(partitionAssignor.maxWarmupReplicas(), equalTo(33));
         assertThat(partitionAssignor.numStandbyReplicas(), equalTo(44));
         assertThat(partitionAssignor.probingRebalanceIntervalMs(), equalTo(55 * 60 * 1000L));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/DefaultBalancedAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/DefaultBalancedAssignorTest.java
index b45a33f..9a3c661 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/DefaultBalancedAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/DefaultBalancedAssignorTest.java
@@ -51,8 +51,6 @@ public class DefaultBalancedAssignorTest {
 
     @Test
     public void shouldAssignTasksEvenlyOverClientsWhereNumberOfClientsIntegralDivisorOfNumberOfTasks()
{
-        final int balanceFactor = 1;
-
         final Map<UUID, List<TaskId>> assignment = new DefaultBalancedAssignor().assign(
             THREE_CLIENTS,
             mkSortedSet(
@@ -66,8 +64,7 @@ public class DefaultBalancedAssignorTest {
                 TASK_2_1,
                 TASK_2_2
             ),
-            threeClientsToNumberOfStreamThreads(1, 1, 1),
-            balanceFactor
+            threeClientsToNumberOfStreamThreads(1, 1, 1)
         );
 
         final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_0_0, TASK_1_0,
TASK_2_0);
@@ -81,8 +78,6 @@ public class DefaultBalancedAssignorTest {
 
     @Test
     public void shouldAssignTasksEvenlyOverClientsWhereNumberOfClientsNotIntegralDivisorOfNumberOfTasks()
{
-        final int balanceFactor = 1;
-
         final Map<UUID, List<TaskId>> assignment = new DefaultBalancedAssignor().assign(
             TWO_CLIENTS,
             mkSortedSet(
@@ -96,8 +91,7 @@ public class DefaultBalancedAssignorTest {
                 TASK_2_1,
                 TASK_2_2
             ),
-            twoClientsToNumberOfStreamThreads(1, 1),
-            balanceFactor
+            twoClientsToNumberOfStreamThreads(1, 1)
         );
 
         final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_0_0, TASK_0_2,
TASK_1_1, TASK_2_0, TASK_2_2);
@@ -110,8 +104,6 @@ public class DefaultBalancedAssignorTest {
 
     @Test
     public void shouldAssignTasksEvenlyOverClientsWhereNumberOfStreamThreadsIntegralDivisorOfNumberOfTasks()
{
-        final int balanceFactor = 1;
-
         final Map<UUID, List<TaskId>> assignment = new DefaultBalancedAssignor().assign(
             THREE_CLIENTS,
             mkSortedSet(
@@ -125,8 +117,7 @@ public class DefaultBalancedAssignorTest {
                 TASK_2_1,
                 TASK_2_2
             ),
-            threeClientsToNumberOfStreamThreads(3, 3, 3),
-            balanceFactor
+            threeClientsToNumberOfStreamThreads(3, 3, 3)
         );
 
         final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_0_0, TASK_1_0,
TASK_2_0);
@@ -140,8 +131,6 @@ public class DefaultBalancedAssignorTest {
 
     @Test
     public void shouldAssignTasksEvenlyOverClientsWhereNumberOfStreamThreadsNotIntegralDivisorOfNumberOfTasks()
{
-        final int balanceFactor = 1;
-
         final Map<UUID, List<TaskId>> assignment = new DefaultBalancedAssignor().assign(
             THREE_CLIENTS,
             mkSortedSet(
@@ -155,8 +144,7 @@ public class DefaultBalancedAssignorTest {
                 TASK_2_1,
                 TASK_2_2
             ),
-            threeClientsToNumberOfStreamThreads(2, 2, 2),
-            balanceFactor
+            threeClientsToNumberOfStreamThreads(2, 2, 2)
         );
 
         final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_0_0, TASK_1_0,
TASK_2_0);
@@ -170,8 +158,6 @@ public class DefaultBalancedAssignorTest {
 
     @Test
     public void shouldAssignTasksEvenlyOverUnevenlyDistributedStreamThreads() {
-        final int balanceFactor = 1;
-
         final Map<UUID, List<TaskId>> assignment = new DefaultBalancedAssignor().assign(
             THREE_CLIENTS,
             mkSortedSet(
@@ -185,8 +171,7 @@ public class DefaultBalancedAssignorTest {
                 TASK_2_1,
                 TASK_2_2
             ),
-            threeClientsToNumberOfStreamThreads(1, 2, 3),
-            balanceFactor
+            threeClientsToNumberOfStreamThreads(1, 2, 3)
         );
 
         final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_1_0, TASK_2_0);
@@ -200,16 +185,13 @@ public class DefaultBalancedAssignorTest {
 
     @Test
     public void shouldAssignTasksEvenlyOverClientsWithLessClientsThanTasks() {
-        final int balanceFactor = 1;
-
         final Map<UUID, List<TaskId>> assignment = new DefaultBalancedAssignor().assign(
             THREE_CLIENTS,
             mkSortedSet(
                 TASK_0_0,
                 TASK_0_1
             ),
-            threeClientsToNumberOfStreamThreads(1, 1, 1),
-            balanceFactor
+            threeClientsToNumberOfStreamThreads(1, 1, 1)
         );
 
         final List<TaskId> assignedTasksForClient1 = Collections.singletonList(TASK_0_0);
@@ -223,8 +205,6 @@ public class DefaultBalancedAssignorTest {
 
     @Test
     public void shouldAssignTasksEvenlyOverClientsAndStreamThreadsWithMoreStreamThreadsThanTasks()
{
-        final int balanceFactor = 1;
-
         final Map<UUID, List<TaskId>> assignment = new DefaultBalancedAssignor().assign(
             THREE_CLIENTS,
             mkSortedSet(
@@ -238,8 +218,7 @@ public class DefaultBalancedAssignorTest {
                 TASK_2_1,
                 TASK_2_2
             ),
-            threeClientsToNumberOfStreamThreads(6, 6, 6),
-            balanceFactor
+            threeClientsToNumberOfStreamThreads(6, 6, 6)
         );
 
         final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_0_0, TASK_1_0,
TASK_2_0);
@@ -253,8 +232,6 @@ public class DefaultBalancedAssignorTest {
 
     @Test
     public void shouldAssignTasksEvenlyOverStreamThreadsButBestEffortOverClients() {
-        final int balanceFactor = 1;
-
         final Map<UUID, List<TaskId>> assignment = new DefaultBalancedAssignor().assign(
             TWO_CLIENTS,
             mkSortedSet(
@@ -268,8 +245,7 @@ public class DefaultBalancedAssignorTest {
                 TASK_2_1,
                 TASK_2_2
             ),
-            twoClientsToNumberOfStreamThreads(6, 2),
-            balanceFactor
+            twoClientsToNumberOfStreamThreads(6, 2)
         );
 
         final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_0_0, TASK_0_2,
TASK_1_1, TASK_2_0, TASK_2_2,
@@ -281,35 +257,6 @@ public class DefaultBalancedAssignorTest {
         );
     }
 
-    @Test
-    public void shouldAssignTasksEvenlyOverClientsButNotOverStreamThreadsBecauseBalanceFactorSatisfied()
{
-        final int balanceFactor = 2;
-
-        final Map<UUID, List<TaskId>> assignment = new DefaultBalancedAssignor().assign(
-            TWO_CLIENTS,
-            mkSortedSet(
-                TASK_0_0,
-                TASK_0_1,
-                TASK_0_2,
-                TASK_1_0,
-                TASK_1_1,
-                TASK_1_2,
-                TASK_2_0,
-                TASK_2_1,
-                TASK_2_2
-            ),
-            twoClientsToNumberOfStreamThreads(6, 2),
-            balanceFactor
-        );
-
-        final List<TaskId> assignedTasksForClient1 = Arrays.asList(TASK_0_0, TASK_0_2,
TASK_1_1, TASK_2_0, TASK_2_2);
-        final List<TaskId> assignedTasksForClient2 = Arrays.asList(TASK_0_1, TASK_1_0,
TASK_1_2, TASK_2_1);
-        assertThat(
-            assignment,
-            is(expectedAssignmentForTwoClients(assignedTasksForClient1, assignedTasksForClient2))
-        );
-    }
-
     private static Map<UUID, Integer> twoClientsToNumberOfStreamThreads(final int numberOfStreamThread1,
                                                                           final int numberOfStreamThread2)
{
         return mkMap(
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignorTest.java
index 687e5b6..9cd2da1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignorTest.java
@@ -53,7 +53,7 @@ public class FallbackPriorTaskAssignorTest {
             clients,
             new HashSet<>(taskIds),
             new HashSet<>(taskIds),
-            new AssignorConfiguration.AssignmentConfigs(0L, 0, 0, 0, 0L)
+            new AssignorConfiguration.AssignmentConfigs(0L, 0, 0, 0L)
         );
         assertThat(probingRebalanceNeeded, is(true));
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
index d3fcbbe..dd9b79b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
@@ -61,7 +61,6 @@ import static org.hamcrest.Matchers.not;
 public class HighAvailabilityTaskAssignorTest {
     private final AssignmentConfigs configWithoutStandbys = new AssignmentConfigs(
         /*acceptableRecoveryLag*/ 100L,
-        /*balanceFactor*/ 1,
         /*maxWarmupReplicas*/ 2,
         /*numStandbyReplicas*/ 0,
         /*probingRebalanceIntervalMs*/ 60 * 1000L
@@ -69,13 +68,11 @@ public class HighAvailabilityTaskAssignorTest {
 
     private final AssignmentConfigs configWithStandbys = new AssignmentConfigs(
         /*acceptableRecoveryLag*/ 100L,
-        /*balanceFactor*/ 1,
         /*maxWarmupReplicas*/ 2,
         /*numStandbyReplicas*/ 1,
         /*probingRebalanceIntervalMs*/ 60 * 1000L
     );
 
-
     @Test
     public void shouldComputeNewAssignmentIfThereAreUnassignedActiveTasks() {
         final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1);
@@ -286,7 +283,6 @@ public class HighAvailabilityTaskAssignorTest {
             statefulTasks,
             new AssignmentConfigs(
                 /*acceptableRecoveryLag*/ 100L,
-                /*balanceFactor*/ 1,
                 /*maxWarmupReplicas*/ 1,
                 /*numStandbyReplicas*/ 0,
                 /*probingRebalanceIntervalMs*/ 60 * 1000L
@@ -315,7 +311,6 @@ public class HighAvailabilityTaskAssignorTest {
             statefulTasks,
             new AssignmentConfigs(
                 /*acceptableRecoveryLag*/ 100L,
-                /*balanceFactor*/ 1,
                 /*maxWarmupReplicas*/ 1,
                 /*numStandbyReplicas*/ 1,
                 /*probingRebalanceIntervalMs*/ 60 * 1000L
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
index 5203832..85c3947 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
@@ -668,7 +668,7 @@ public class StickyTaskAssignorTest {
             clients,
             new HashSet<>(taskIds),
             new HashSet<>(taskIds),
-            new AssignorConfiguration.AssignmentConfigs(0L, 0, 0, 0, 0L)
+            new AssignorConfiguration.AssignmentConfigs(0L, 0, 0, 0L)
         );
         assertThat(probingRebalanceNeeded, is(false));
 
@@ -687,7 +687,7 @@ public class StickyTaskAssignorTest {
             clients,
             new HashSet<>(taskIds),
             new HashSet<>(taskIds),
-            new AssignorConfiguration.AssignmentConfigs(0L, 0, 0, numStandbys, 0L)
+            new AssignorConfiguration.AssignmentConfigs(0L, 0, numStandbys, 0L)
         );
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
index 9517400..c85eac7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
@@ -232,7 +232,6 @@ public class TaskAssignorConvergenceTest {
     @Test
     public void staticAssignmentShouldConvergeWithTheFirstAssignment() {
         final AssignmentConfigs configs = new AssignmentConfigs(100L,
-                                                                1,
                                                                 2,
                                                                 0,
                                                                 1000L);
@@ -251,7 +250,6 @@ public class TaskAssignorConvergenceTest {
         final int numStandbyReplicas = 0;
 
         final AssignmentConfigs configs = new AssignmentConfigs(100L,
-                                                                1,
                                                                 maxWarmupReplicas,
                                                                 numStandbyReplicas,
                                                                 1000L);
@@ -273,7 +271,6 @@ public class TaskAssignorConvergenceTest {
         final int numStandbyReplicas = 0;
 
         final AssignmentConfigs configs = new AssignmentConfigs(100L,
-                                                                1,
                                                                 maxWarmupReplicas,
                                                                 numStandbyReplicas,
                                                                 1000L);
@@ -314,7 +311,6 @@ public class TaskAssignorConvergenceTest {
             final int numberOfEvents = prng.nextInt(10) + 1;
 
             final AssignmentConfigs configs = new AssignmentConfigs(100L,
-                                                                    1,
                                                                     maxWarmupReplicas,
                                                                     numStandbyReplicas,
                                                                     1000L);


Mime
View raw message