This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new c71ea4c KAFKA-10083: fix failed testReassignmentWithRandomSubscriptionsAndChanges
tests (#8786)
c71ea4c is described below
commit c71ea4ccf4fc99ca12f48b3df01e16ec5d8ed7d6
Author: A. Sophie Blee-Goldman <sophie@confluent.io>
AuthorDate: Tue Jun 2 20:39:02 2020 -0700
KAFKA-10083: fix failed testReassignmentWithRandomSubscriptionsAndChanges tests (#8786)
Minimum fix needed to stop this test failing and unblock others
Co-authored-by: Luke Chen <showuon@gmail.com>
Reviewers: Guozhang Wang <wangguoz@gmail.com>
---
.../kafka/clients/consumer/internals/AbstractStickyAssignor.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
index 9743688..a1af6d9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
@@ -43,7 +43,7 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor
{
public static final int DEFAULT_GENERATION = -1;
- private PartitionMovements partitionMovements = new PartitionMovements();
+ private PartitionMovements partitionMovements;
// Keep track of the partitions being migrated from one consumer to another during assignment
// so the cooperative assignor can adjust the assignment
@@ -72,6 +72,7 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor
{
@Override
public Map<String, List<TopicPartition>> assign(Map<String, Integer>
partitionsPerTopic,
Map<String, Subscription> subscriptions)
{
+ partitionMovements = new PartitionMovements();
Map<String, List<TopicPartition>> consumerToOwnedPartitions = new HashMap<>();
if (allSubscriptionsEqual(partitionsPerTopic.keySet(), subscriptions, consumerToOwnedPartitions))
{
log.debug("Detected that all consumers were subscribed to same set of topics,
invoking the "
|