kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-8179: PartitionAssignorAdapter (#7110)
Date Wed, 31 Jul 2019 20:53:57 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6fbac3c   KAFKA-8179: PartitionAssignorAdapter (#7110)
6fbac3c is described below

commit 6fbac3cfa88bd3eba36b2d3e254d7ce5e11a550b
Author: A. Sophie Blee-Goldman <sophie@confluent.io>
AuthorDate: Wed Jul 31 13:53:38 2019 -0700

     KAFKA-8179: PartitionAssignorAdapter (#7110)
    
    Follow up to new PartitionAssignor interface merged in 7108 is merged
    
    Adds a PartitionAssignorAdapter class to maintain backwards compatibility
    
    Reviewers: Boyang Chen <boyang@confluent.io>, Jason Gustafson <jason@confluent.io>,
Guozhang Wang <wangguoz@gmail.com>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |   4 +-
 .../kafka/clients/consumer/ConsumerConfig.java     |   2 +-
 .../consumer/ConsumerPartitionAssignor.java        |  10 +-
 .../kafka/clients/consumer/KafkaConsumer.java      |   7 +-
 .../internals/AbstractPartitionAssignor.java       |   4 +-
 .../consumer/internals/ConsumerCoordinator.java    |   3 +-
 .../consumer/internals/PartitionAssignor.java      |   3 +
 .../internals/PartitionAssignorAdapter.java        | 136 ++++++++++++++++
 .../internals/PartitionAssignorAdapterTest.java    | 173 +++++++++++++++++++++
 docs/upgrade.html                                  |   2 +
 .../internals/StreamsPartitionAssignor.java        |   4 +-
 11 files changed, 333 insertions(+), 15 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index f2ee21e..227a03b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -29,7 +29,7 @@ import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
 import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo;
 import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
-import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.common.Cluster;
@@ -2724,7 +2724,7 @@ public class KafkaAdminClient extends AdminClient {
                     for (DescribedGroupMember groupMember : members) {
                         Set<TopicPartition> partitions = Collections.emptySet();
                         if (groupMember.memberAssignment().length > 0) {
-                            final ConsumerPartitionAssignor.Assignment assignment = ConsumerProtocol.
+                            final Assignment assignment = ConsumerProtocol.
                                 deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment()));
                             partitions = new HashSet<>(assignment.partitions());
                         }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 8a18bd5..2e4507a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -102,7 +102,7 @@ public class ConsumerConfig extends AbstractConfig {
      * <code>partition.assignment.strategy</code>
      */
     public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy";
-    private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "The class name or class
type of the assignor implementing the partition assignment strategy that the client will use
to distribute partition ownership amongst consumer instances when group management is used.
A custom assignor that implements ConsumerPartitionAssignor can be plugged in";
+    private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "A list of class names
or class types, ordered by preference, of supported assignors responsible for the partition
assignment strategy that the client will use to distribute partition ownership amongst consumer
instances when group management is used. Implementing the <code>org.apache.kafka.clients.consumer.ConsumerPartitionAssignor</code>
interface allows you to plug in a custom assignment strategy.";
 
     /**
      * <code>auto.offset.reset</code>
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
index 72d5d6e..07e153e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
@@ -44,7 +44,9 @@ public interface ConsumerPartitionAssignor {
      * Return serialized data that will be included in the {@link Subscription} sent to the
leader
      * and can be leveraged in {@link #assign(Cluster, GroupSubscription)} ((e.g. local host/rack
information)
      *
-     * @return optional join subscription user data
+     * @param topics Topics subscribed to through {@link org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(java.util.Collection)}
+     *               and variants
+     * @return nullable subscription user data
      */
     default ByteBuffer subscriptionUserData(Set<String> topics) {
         return null;
@@ -53,11 +55,11 @@ public interface ConsumerPartitionAssignor {
     /**
      * Perform the group assignment given the member subscriptions and current cluster metadata.
      * @param metadata Current topic/broker metadata known by consumer
-     * @param subscriptions Subscriptions from all members including metadata provided through
{@link #subscriptionUserData(Set)}
-     * @return A map from the members to their respective assignment. This should have one
entry
+     * @param groupSubscription Subscriptions from all members including metadata provided
through {@link #subscriptionUserData(Set)}
+     * @return A map from the members to their respective assignments. This should have one
entry
      *         for each member in the input subscription map.
      */
-    GroupAssignment assign(Cluster metadata, GroupSubscription subscriptions);
+    GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscription);
 
     /**
      * Callback which is invoked when a group member receives its assignment from the leader.
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 30944b3..b33b1f2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.clients.consumer;
 
+import static org.apache.kafka.clients.consumer.internals.PartitionAssignorAdapter.getAssignorInstances;
+
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.ClientUtils;
@@ -765,9 +767,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
                     retryBackoffMs,
                     config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
                     heartbeatIntervalMs); //Will avoid blocking an extended period of time
to prevent heartbeat thread starvation
-            this.assignors = config.getConfiguredInstances(
-                    ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
-                    ConsumerPartitionAssignor.class);
+
+            this.assignors = getAssignorInstances(config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
config.originals());
 
             // no coordinator will be constructed for the default (null) group id
             this.coordinator = groupId == null ? null :
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
index 3b966b0..ed0282b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
@@ -48,8 +48,8 @@ public abstract class AbstractPartitionAssignor implements ConsumerPartitionAssi
                                                              Map<String, Subscription>
subscriptions);
 
     @Override
-    public GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscriptions)
{
-        Map<String, Subscription> subscriptions = groupSubscriptions.groupSubscription();
+    public GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscription)
{
+        Map<String, Subscription> subscriptions = groupSubscription.groupSubscription();
         Set<String> allSubscribedTopics = new HashSet<>();
         for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet())
             allSubscribedTopics.addAll(subscriptionEntry.getValue().topics());
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index a28119d..aae16b9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -204,8 +204,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         this.joinedSubscription = subscriptions.subscription();
         JoinGroupRequestData.JoinGroupRequestProtocolCollection protocolSet = new JoinGroupRequestData.JoinGroupRequestProtocolCollection();
 
+        List<String> topics = new ArrayList<>(joinedSubscription);
         for (ConsumerPartitionAssignor assignor : assignors) {
-            Subscription subscription = new Subscription(new ArrayList<>(joinedSubscription),
+            Subscription subscription = new Subscription(topics,
                                                          assignor.subscriptionUserData(joinedSubscription),
                                                          subscriptions.assignedPartitionsList());
             ByteBuffer metadata = ConsumerProtocol.serializeSubscription(subscription);
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
index b3f2ada..1ecb15c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
@@ -36,6 +36,9 @@ import java.util.Set;
  * assignment decisions. For this, you can override {@link #subscription(Set)} and provide
custom
  * userData in the returned Subscription. For example, to have a rack-aware assignor, an
implementation
  * can use this user data to forward the rackId belonging to each member.
+ *
+ * This interface has been deprecated in 2.4, custom assignors should now implement
+ * {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor}
  */
 @Deprecated
 public interface PartitionAssignor {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignorAdapter.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignorAdapter.java
new file mode 100644
index 0000000..d1d5e20
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignorAdapter.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This adapter class is used to ensure backwards compatibility for those who have implemented
the {@link PartitionAssignor}
+ * interface, which has been deprecated in favor of the new {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor}
+ */
+@SuppressWarnings("deprecation")
+public class PartitionAssignorAdapter implements ConsumerPartitionAssignor {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PartitionAssignorAdapter.class);
+    private final PartitionAssignor oldAssignor;
+
+    PartitionAssignorAdapter(PartitionAssignor oldAssignor) {
+        this.oldAssignor = oldAssignor;
+    }
+
+    @Override
+    public ByteBuffer subscriptionUserData(Set<String> topics) {
+        return oldAssignor.subscription(topics).userData();
+    }
+
+    @Override
+    public GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscription)
{
+        return toNewGroupAssignment(oldAssignor.assign(metadata, toOldGroupSubscription(groupSubscription)));
+    }
+
+    @Override
+    public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
+        oldAssignor.onAssignment(toOldAssignment(assignment), metadata.generationId());
+    }
+
+    @Override
+    public String name() {
+        return oldAssignor.name();
+    }
+
+    private static PartitionAssignor.Assignment toOldAssignment(Assignment newAssignment)
{
+        return new PartitionAssignor.Assignment(newAssignment.partitions(), newAssignment.userData());
+    }
+
+    private static Map<String, PartitionAssignor.Subscription> toOldGroupSubscription(GroupSubscription
newSubscriptions) {
+        Map<String, PartitionAssignor.Subscription> oldSubscriptions = new HashMap<>();
+        for (Map.Entry<String, Subscription> entry : newSubscriptions.groupSubscription().entrySet())
{
+            String member = entry.getKey();
+            Subscription newSubscription = entry.getValue();
+            oldSubscriptions.put(member, new PartitionAssignor.Subscription(
+                newSubscription.topics(), newSubscription.userData()));
+        }
+        return oldSubscriptions;
+    }
+
+    private static GroupAssignment toNewGroupAssignment(Map<String, PartitionAssignor.Assignment>
oldAssignments) {
+        Map<String, Assignment> newAssignments = new HashMap<>();
+        for (Map.Entry<String, PartitionAssignor.Assignment> entry : oldAssignments.entrySet())
{
+            String member = entry.getKey();
+            PartitionAssignor.Assignment oldAssignment = entry.getValue();
+            newAssignments.put(member, new Assignment(oldAssignment.partitions(), oldAssignment.userData()));
+        }
+        return new GroupAssignment(newAssignments);
+    }
+
+    /**
+     * Get a list of configured instances of {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor}
+     * based on the class names/types specified by {@link org.apache.kafka.clients.consumer.ConsumerConfig#PARTITION_ASSIGNMENT_STRATEGY_CONFIG}
+     * where any instances of the old {@link PartitionAssignor} interface are wrapped in
an adapter to the new
+     * {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor} interface
+     */
+    public static List<ConsumerPartitionAssignor> getAssignorInstances(List<String>
assignorClasses, Map<String, Object> configs) {
+        List<ConsumerPartitionAssignor> assignors = new ArrayList<>();
+
+        if (assignorClasses == null)
+            return assignors;
+
+        for (Object klass : assignorClasses) {
+            // first try to get the class if passed in as a string
+            if (klass instanceof String) {
+                try {
+                    klass = Class.forName((String) klass, true, Utils.getContextOrKafkaClassLoader());
+                } catch (ClassNotFoundException classNotFound) {
+                    throw new KafkaException(klass + " ClassNotFoundException exception occurred",
classNotFound);
+                }
+            }
+
+            if (klass instanceof Class<?>) {
+                Object assignor = Utils.newInstance((Class<?>) klass);
+                if (assignor instanceof Configurable)
+                    ((Configurable) assignor).configure(configs);
+
+                if (assignor instanceof ConsumerPartitionAssignor) {
+                    assignors.add((ConsumerPartitionAssignor) assignor);
+                } else if (assignor instanceof PartitionAssignor) {
+                    assignors.add(new PartitionAssignorAdapter((PartitionAssignor) assignor));
+                    LOG.warn("The PartitionAssignor interface has been deprecated, "
+                        + "please implement the ConsumerPartitionAssignor interface instead.");
+                } else {
+                    throw new KafkaException(klass + " is not an instance of " + PartitionAssignor.class.getName()
+                        + " or an instance of " + ConsumerPartitionAssignor.class.getName());
+                }
+            } else {
+                throw new KafkaException("List contains element of type " + klass.getClass().getName()
+ ", expected String or Class");
+            }
+        }
+        return assignors;
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PartitionAssignorAdapterTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PartitionAssignorAdapterTest.java
new file mode 100644
index 0000000..52950cb
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PartitionAssignorAdapterTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import static org.apache.kafka.clients.consumer.internals.PartitionAssignorAdapter.getAssignorInstances;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.StickyAssignor;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.Test;
+
+public class PartitionAssignorAdapterTest {
+
+    private List<String> classNames;
+    private List<Object> classTypes;
+
+    @Test
+    public void shouldInstantiateNewAssignors() {
+        classNames = Arrays.asList(StickyAssignor.class.getName());
+        List<ConsumerPartitionAssignor> assignors = getAssignorInstances(classNames,
Collections.emptyMap());
+        assertTrue(StickyAssignor.class.isInstance(assignors.get(0)));
+    }
+
+    @Test
+    public void shouldAdaptOldAssignors() {
+        classNames = Arrays.asList(OldPartitionAssignor.class.getName());
+        List<ConsumerPartitionAssignor> assignors = getAssignorInstances(classNames,
Collections.emptyMap());
+        assertTrue(PartitionAssignorAdapter.class.isInstance(assignors.get(0)));
+    }
+
+    @Test
+    public void shouldThrowKafkaExceptionOnNonAssignor() {
+        classNames = Arrays.asList(String.class.getName());
+        assertThrows(KafkaException.class, () -> getAssignorInstances(classNames, Collections.emptyMap()));
+    }
+
+    @Test
+    public void shouldThrowKafkaExceptionOnAssignorNotFound() {
+        classNames = Arrays.asList("Non-existent assignor");
+        assertThrows(KafkaException.class, () -> getAssignorInstances(classNames, Collections.emptyMap()));
+    }
+
+    @Test
+    public void shouldInstantiateFromListOfOldAndNewClassTypes() {
+        Properties props = new Properties();
+        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+
+        classTypes = Arrays.asList(StickyAssignor.class, OldPartitionAssignor.class);
+
+        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classTypes);
+        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
+            props, new StringDeserializer(), new StringDeserializer());
+
+        consumer.close();
+    }
+
+    @Test
+    public void shouldThrowKafkaExceptionOnListWithNonAssignorClassType() {
+        Properties props = new Properties();
+        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+
+        classTypes = Arrays.asList(StickyAssignor.class, OldPartitionAssignor.class, String.class);
+
+        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classTypes);
+        assertThrows(KafkaException.class, () -> new KafkaConsumer<>(
+            props, new StringDeserializer(), new StringDeserializer()));
+    }
+
+    @Test
+    @SuppressWarnings("deprecation")
+    public void testOnAssignment() {
+        OldPartitionAssignor oldAssignor = new OldPartitionAssignor();
+        ConsumerPartitionAssignor adaptedAssignor = new PartitionAssignorAdapter(oldAssignor);
+
+        TopicPartition tp1 = new TopicPartition("tp1", 1);
+        TopicPartition tp2 = new TopicPartition("tp2", 2);
+        List<TopicPartition> partitions = Arrays.asList(tp1, tp2);
+
+        adaptedAssignor.onAssignment(new Assignment(partitions), new ConsumerGroupMetadata("",
1, "", Optional.empty()));
+
+        assertEquals(oldAssignor.partitions, partitions);
+    }
+
+    @Test
+    public void testAssign() {
+        ConsumerPartitionAssignor adaptedAssignor = new PartitionAssignorAdapter(new OldPartitionAssignor());
+
+        Map<String, Subscription> subscriptions = new HashMap<>();
+        subscriptions.put("C1", new Subscription(Arrays.asList("topic1")));
+        subscriptions.put("C2", new Subscription(Arrays.asList("topic1", "topic2")));
+        subscriptions.put("C3", new Subscription(Arrays.asList("topic2", "topic3")));
+        GroupSubscription groupSubscription = new GroupSubscription(subscriptions);
+
+        Map<String, Assignment> assignments = adaptedAssignor.assign(null, groupSubscription).groupAssignment();
+
+        assertEquals(assignments.get("C1").partitions(), Arrays.asList(new TopicPartition("topic1",
1)));
+        assertEquals(assignments.get("C2").partitions(), Arrays.asList(new TopicPartition("topic1",
1), new TopicPartition("topic2", 1)));
+        assertEquals(assignments.get("C3").partitions(), Arrays.asList(new TopicPartition("topic2",
1), new TopicPartition("topic3", 1)));
+    }
+
+    /*
+     * Dummy assignor just gives each consumer partition 1 of each topic it's subscribed
to
+     */
+    @SuppressWarnings("deprecation")
+    public static class OldPartitionAssignor implements PartitionAssignor {
+
+        List<TopicPartition> partitions = null;
+
+        @Override
+        public Subscription subscription(Set<String> topics) {
+            return new Subscription(new ArrayList<>(topics), null);
+        }
+
+        @Override
+        public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription>
subscriptions) {
+            Map<String, Assignment> assignments = new HashMap<>();
+            for (Map.Entry<String, Subscription> entry : subscriptions.entrySet())
{
+                List<TopicPartition> partitions = new ArrayList<>();
+                for (String topic : entry.getValue().topics()) {
+                    partitions.add(new TopicPartition(topic, 1));
+                }
+                assignments.put(entry.getKey(), new Assignment(partitions, null));
+            }
+            return assignments;
+        }
+
+        @Override
+        public void onAssignment(Assignment assignment) {
+            partitions = assignment.partitions();
+        }
+
+        @Override
+        public String name() {
+            return "old-assignor";
+        }
+    }
+
+}
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 9d0e738..0ae5904 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -29,6 +29,8 @@
         avoid potential misuse. The constructor <code>TopicAuthorizationException(String)</code>
which was previously used for a single
         unauthorized topic was changed similarly.
     </li>
+    <li>The internal <code>PartitionAssignor</code> interface has been
deprecated and replaced with a new <code>ConsumerPartitionAssignor</code> in the
public API. Users
+        implementing a custom PartitionAssignor should migrate to the new interface as soon
as possible.</li>
 </ul>
 
 <h4><a id="upgrade_2_3_0" href="#upgrade_2_3_0">Upgrading from 0.8.x, 0.9.x,
0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, 2.0.x or 2.1.x or 2.2.x to 2.3.0</a></h4>
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index fa5f511..36196a6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -373,8 +373,8 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
      * 3. within each client, tasks are assigned to consumer clients in round-robin manner.
      */
     @Override
-    public GroupAssignment assign(final Cluster metadata, final GroupSubscription groupSubscriptions)
{
-        final Map<String, Subscription> subscriptions = groupSubscriptions.groupSubscription();
+    public GroupAssignment assign(final Cluster metadata, final GroupSubscription groupSubscription)
{
+        final Map<String, Subscription> subscriptions = groupSubscription.groupSubscription();
         // construct the client metadata from the decoded subscription info
         final Map<UUID, ClientMetadata> clientMetadataMap = new HashMap<>();
         final Set<String> futureConsumers = new HashSet<>();


Mime
View raw message