kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-8179: add public ConsumerPartitionAssignor interface (#7108)
Date Thu, 25 Jul 2019 20:02:38 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 69d86a1  KAFKA-8179: add public ConsumerPartitionAssignor interface (#7108)
69d86a1 is described below

commit 69d86a197f86ad4c6f1636b5ab4678907e30a4c0
Author: A. Sophie Blee-Goldman <sophie@confluent.io>
AuthorDate: Thu Jul 25 13:02:09 2019 -0700

    KAFKA-8179: add public ConsumerPartitionAssignor interface (#7108)
    
    Main changes of this PR:
    
    * Deprecate old consumer.internal.PartitionAssignor and add public consumer.ConsumerPartitionAssignor with all OOTB assignors migrated to new interface
    * Refactor assignor's assignment/subscription related classes for easier to evolve API
    * Removed version number from classes as it is only needed for serialization/deserialization
    * Other previously-discussed cleanup included in this PR:
    
    * Remove Assignment.error added in pt 1
    * Remove ConsumerCoordinator#adjustAssignment added in pt 2
    
    Reviewers: Boyang Chen <boyang@confluent.io>, Jason Gustafson <jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |   4 +-
 .../kafka/clients/consumer/ConsumerConfig.java     |   2 +-
 .../clients/consumer/ConsumerGroupMetadata.java    |  50 +++++
 ...ssignor.java => ConsumerPartitionAssignor.java} | 210 ++++++++-------------
 .../kafka/clients/consumer/KafkaConsumer.java      |   7 +-
 .../kafka/clients/consumer/StickyAssignor.java     |  11 +-
 .../internals/AbstractPartitionAssignor.java       |  18 +-
 .../consumer/internals/ConsumerCoordinator.java    |  86 +++------
 .../consumer/internals/ConsumerProtocol.java       |  95 ++++------
 .../consumer/internals/PartitionAssignor.java      | 142 +-------------
 .../consumer/internals/SubscriptionState.java      |   8 +
 .../kafka/clients/admin/KafkaAdminClientTest.java  |   6 +-
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  69 ++++---
 .../kafka/clients/consumer/RangeAssignorTest.java  |   2 +-
 .../clients/consumer/RoundRobinAssignorTest.java   |   2 +-
 .../kafka/clients/consumer/StickyAssignorTest.java |   2 +-
 .../internals/ConsumerCoordinatorTest.java         |  31 +--
 .../consumer/internals/ConsumerProtocolTest.java   |  45 +----
 .../group/GroupMetadataManagerTest.scala           |   2 +-
 .../internals/StreamsPartitionAssignor.java        |  20 +-
 .../internals/StreamsPartitionAssignorTest.java    | 126 +++++++------
 .../kafka/streams/tests/StreamsUpgradeTest.java    |  21 ++-
 22 files changed, 363 insertions(+), 596 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 349fc2a..f2ee21e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -29,9 +29,9 @@ import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
 import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo;
 import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
-import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.ElectionType;
@@ -2724,7 +2724,7 @@ public class KafkaAdminClient extends AdminClient {
                     for (DescribedGroupMember groupMember : members) {
                         Set<TopicPartition> partitions = Collections.emptySet();
                         if (groupMember.memberAssignment().length > 0) {
-                            final PartitionAssignor.Assignment assignment = ConsumerProtocol.
+                            final ConsumerPartitionAssignor.Assignment assignment = ConsumerProtocol.
                                 deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment()));
                             partitions = new HashSet<>(assignment.partitions());
                         }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 7eb34d4..8a18bd5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -102,7 +102,7 @@ public class ConsumerConfig extends AbstractConfig {
      * <code>partition.assignment.strategy</code>
      */
     public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy";
-    private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used";
+    private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "The class name or class type of the assignor implementing the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used. A custom assignor that implements ConsumerPartitionAssignor can be plugged in";
 
     /**
      * <code>auto.offset.reset</code>
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerGroupMetadata.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerGroupMetadata.java
new file mode 100644
index 0000000..e17894b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerGroupMetadata.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import java.util.Optional;
+
+public class ConsumerGroupMetadata {
+    private String groupId;
+    private int generationId;
+    private String memberId;
+    Optional<String> groupInstanceId;
+
+    public ConsumerGroupMetadata(String groupId, int generationId, String memberId, Optional<String> groupInstanceId) {
+        this.groupId = groupId;
+        this.generationId = generationId;
+        this.memberId = memberId;
+        this.groupInstanceId = groupInstanceId;
+    }
+
+    public String groupId() {
+        return groupId;
+    }
+
+    public int generationId() {
+        return generationId;
+    }
+
+    public String memberId() {
+        return memberId;
+    }
+
+    public Optional<String> groupInstanceId() {
+        return groupInstanceId;
+    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
similarity index 52%
copy from clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
copy to clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
index c26f684..72d5d6e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
@@ -14,69 +14,57 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.clients.consumer.internals;
-
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.protocol.types.SchemaException;
+package org.apache.kafka.clients.consumer;
 
 import java.nio.ByteBuffer;
+import java.util.Optional;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
-
-import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.CONSUMER_PROTOCOL_V0;
-import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.CONSUMER_PROTOCOL_V1;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.TopicPartition;
 
 /**
  * This interface is used to define custom partition assignment for use in
  * {@link org.apache.kafka.clients.consumer.KafkaConsumer}. Members of the consumer group subscribe
  * to the topics they are interested in and forward their subscriptions to a Kafka broker serving
  * as the group coordinator. The coordinator selects one member to perform the group assignment and
- * propagates the subscriptions of all members to it. Then {@link #assign(Cluster, Map)} is called
+ * propagates the subscriptions of all members to it. Then {@link #assign(Cluster, GroupSubscription)} is called
  * to perform the assignment and the results are forwarded back to each respective members
  *
  * In some cases, it is useful to forward additional metadata to the assignor in order to make
- * assignment decisions. For this, you can override {@link #subscription(Set)} and provide custom
+ * assignment decisions. For this, you can override {@link #subscriptionUserData(Set)} and provide custom
  * userData in the returned Subscription. For example, to have a rack-aware assignor, an implementation
  * can use this user data to forward the rackId belonging to each member.
  */
-public interface PartitionAssignor {
+public interface ConsumerPartitionAssignor {
 
     /**
-     * Return a serializable object representing the local member's subscription. This can include
-     * additional information as well (e.g. local host/rack information) which can be leveraged in
-     * {@link #assign(Cluster, Map)}.
-     * @param topics Topics subscribed to through {@link org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(java.util.Collection)}
-     *               and variants
-     * @return Non-null subscription with optional user data
+     * Return serialized data that will be included in the {@link Subscription} sent to the leader
+     * and can be leveraged in {@link #assign(Cluster, GroupSubscription)} ((e.g. local host/rack information)
+     *
+     * @return optional join subscription user data
      */
-    Subscription subscription(Set<String> topics);
+    default ByteBuffer subscriptionUserData(Set<String> topics) {
+        return null;
+    }
 
     /**
      * Perform the group assignment given the member subscriptions and current cluster metadata.
      * @param metadata Current topic/broker metadata known by consumer
-     * @param subscriptions Subscriptions from all members provided through {@link #subscription(Set)}
+     * @param subscriptions Subscriptions from all members including metadata provided through {@link #subscriptionUserData(Set)}
      * @return A map from the members to their respective assignment. This should have one entry
-     *         for all members who in the input subscription map.
-     */
-    Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions);
-
-    /**
-     * Callback which is invoked when a group member receives its assignment from the leader.
-     * @param assignment The local member's assignment as provided by the leader in {@link #assign(Cluster, Map)}
+     *         for each member in the input subscription map.
      */
-    void onAssignment(Assignment assignment);
+    GroupAssignment assign(Cluster metadata, GroupSubscription subscriptions);
 
     /**
      * Callback which is invoked when a group member receives its assignment from the leader.
-     * @param assignment The local member's assignment as provided by the leader in {@link #assign(Cluster, Map)}
-     * @param generation The consumer group generation associated with this partition assignment (optional)
+     * @param assignment The local member's assignment as provided by the leader in {@link #assign(Cluster, GroupSubscription)}
+     * @param metadata Additional metadata on the consumer (optional)
      */
-    default void onAssignment(Assignment assignment, int generation) {
-        onAssignment(assignment);
+    default void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
     }
 
     /**
@@ -96,92 +84,45 @@ public interface PartitionAssignor {
     }
 
     /**
-     * Unique name for this assignor (e.g. "range" or "roundrobin" or "sticky")
+     * Unique name for this assignor (e.g. "range" or "roundrobin" or "sticky"). Note, this is not required
+     * to be the same as the class name specified in {@link ConsumerConfig#PARTITION_ASSIGNMENT_STRATEGY_CONFIG}
      * @return non-null unique name
      */
     String name();
 
-    enum RebalanceProtocol {
-        EAGER((byte) 0), COOPERATIVE((byte) 1);
-
-        private final byte id;
-
-        RebalanceProtocol(byte id) {
-            this.id = id;
-        }
-
-        public byte id() {
-            return id;
-        }
-
-        public static RebalanceProtocol forId(byte id) {
-            switch (id) {
-                case 0:
-                    return EAGER;
-                case 1:
-                    return COOPERATIVE;
-                default:
-                    throw new IllegalArgumentException("Unknown rebalance protocol id: " + id);
-            }
-        }
-    }
-
-    class Subscription {
-        private final Short version;
+    final class Subscription {
         private final List<String> topics;
         private final ByteBuffer userData;
         private final List<TopicPartition> ownedPartitions;
         private Optional<String> groupInstanceId;
 
-        Subscription(Short version,
-                     List<String> topics,
-                     ByteBuffer userData,
-                     List<TopicPartition> ownedPartitions) {
-            this.version = version;
+        public Subscription(List<String> topics, ByteBuffer userData, List<TopicPartition> ownedPartitions) {
             this.topics = topics;
             this.userData = userData;
             this.ownedPartitions = ownedPartitions;
             this.groupInstanceId = Optional.empty();
-
-            if (version < CONSUMER_PROTOCOL_V0)
-                throw new SchemaException("Unsupported subscription version: " + version);
-
-            if (version < CONSUMER_PROTOCOL_V1 && !ownedPartitions.isEmpty())
-                throw new IllegalArgumentException("Subscription version smaller than 1 should not have owned partitions");
-        }
-
-        Subscription(Short version, List<String> topics, ByteBuffer userData) {
-            this(version, topics, userData, Collections.emptyList());
-        }
-
-        public Subscription(List<String> topics, ByteBuffer userData, List<TopicPartition> ownedPartitions) {
-            this(CONSUMER_PROTOCOL_V1, topics, userData, ownedPartitions);
         }
 
         public Subscription(List<String> topics, ByteBuffer userData) {
-            this(CONSUMER_PROTOCOL_V1, topics, userData);
+            this(topics, userData, Collections.emptyList());
         }
 
         public Subscription(List<String> topics) {
-            this(topics, ByteBuffer.wrap(new byte[0]));
-        }
-
-        Short version() {
-            return version;
+            this(topics, null, Collections.emptyList());
         }
 
         public List<String> topics() {
             return topics;
         }
 
-        public List<TopicPartition> ownedPartitions() {
-            return ownedPartitions;
-        }
-
         public ByteBuffer userData() {
             return userData;
         }
 
+        public List<TopicPartition> ownedPartitions() {
+            return ownedPartitions;
+        }
+
         public void setGroupInstanceId(Optional<String> groupInstanceId) {
             this.groupInstanceId = groupInstanceId;
         }
@@ -189,79 +130,76 @@ public interface PartitionAssignor {
         public Optional<String> groupInstanceId() {
             return groupInstanceId;
         }
-
-        @Override
-        public String toString() {
-            return "Subscription(" +
-                    "version=" + version +
-                    ", topics=" + topics +
-                    ", ownedPartitions=" + ownedPartitions +
-                    ", group.instance.id=" + groupInstanceId + ")";
-        }
     }
 
-    class Assignment {
-        private final Short version;
+    final class Assignment {
         private List<TopicPartition> partitions;
-        private final ByteBuffer userData;
-        private ConsumerProtocol.AssignmentError error;
+        private ByteBuffer userData;
 
-        Assignment(Short version, List<TopicPartition> partitions, ByteBuffer userData, ConsumerProtocol.AssignmentError error) {
-            this.version = version;
+        public Assignment(List<TopicPartition> partitions, ByteBuffer userData) {
             this.partitions = partitions;
             this.userData = userData;
-            this.error = error;
-
-            if (version < CONSUMER_PROTOCOL_V0)
-                throw new SchemaException("Unsupported subscription version: " + version);
-
-            if (version < CONSUMER_PROTOCOL_V1 && error != ConsumerProtocol.AssignmentError.NONE)
-                throw new IllegalArgumentException("Assignment version smaller than 1 should not have error code.");
         }
 
-        Assignment(Short version, List<TopicPartition> partitions, ByteBuffer userData) {
-            this(version, partitions, userData, ConsumerProtocol.AssignmentError.NONE);
+        public Assignment(List<TopicPartition> partitions) {
+            this(partitions, null);
         }
 
-        public Assignment(List<TopicPartition> partitions, ByteBuffer userData) {
-            this(CONSUMER_PROTOCOL_V1, partitions, userData);
+        public List<TopicPartition> partitions() {
+            return partitions;
         }
 
-        public Assignment(List<TopicPartition> partitions) {
-            this(partitions, ByteBuffer.wrap(new byte[0]));
+        public ByteBuffer userData() {
+            return userData;
         }
+    }
 
-        Short version() {
-            return version;
+    final class GroupSubscription {
+        private final Map<String, Subscription> subscriptions;
+
+        public GroupSubscription(Map<String, Subscription> subscriptions) {
+            this.subscriptions = subscriptions;
         }
 
-        public List<TopicPartition> partitions() {
-            return partitions;
+        public Map<String, Subscription> groupSubscription() {
+            return subscriptions;
         }
+    }
+
+    final class GroupAssignment {
+        private final Map<String, Assignment> assignments;
 
-        public ConsumerProtocol.AssignmentError error() {
-            return error;
+        public GroupAssignment(Map<String, Assignment> assignments) {
+            this.assignments = assignments;
         }
 
-        public void updatePartitions(List<TopicPartition> partitions) {
-            this.partitions = partitions;
+        public Map<String, Assignment> groupAssignment() {
+            return assignments;
         }
+    }
+
+    enum RebalanceProtocol {
+        EAGER((byte) 0), COOPERATIVE((byte) 1);
+
+        private final byte id;
 
-        public void setError(ConsumerProtocol.AssignmentError error) {
-            this.error = error;
+        RebalanceProtocol(byte id) {
+            this.id = id;
         }
 
-        public ByteBuffer userData() {
-            return userData;
+        public byte id() {
+            return id;
         }
 
-        @Override
-        public String toString() {
-            return "Assignment(" +
-                    "version=" + version +
-                    ", partitions=" + partitions +
-                    ", error=" + error +
-                    ')';
+        public static RebalanceProtocol forId(byte id) {
+            switch (id) {
+                case 0:
+                    return EAGER;
+                case 1:
+                    return COOPERATIVE;
+                default:
+                    throw new IllegalArgumentException("Unknown rebalance protocol id: " + id);
+            }
         }
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index fa5cc99..30944b3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -29,7 +29,6 @@ import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.clients.consumer.internals.Fetcher;
 import org.apache.kafka.clients.consumer.internals.FetcherMetricsRegistry;
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
@@ -581,7 +580,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     private final long requestTimeoutMs;
     private final int defaultApiTimeoutMs;
     private volatile boolean closed = false;
-    private List<PartitionAssignor> assignors;
+    private List<ConsumerPartitionAssignor> assignors;
 
     // currentThread holds the threadId of the current thread accessing KafkaConsumer
     // and is used to prevent multi-threaded access
@@ -768,7 +767,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     heartbeatIntervalMs); //Will avoid blocking an extended period of time to prevent heartbeat thread starvation
             this.assignors = config.getConfiguredInstances(
                     ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
-                    PartitionAssignor.class);
+                    ConsumerPartitionAssignor.class);
 
             // no coordinator will be constructed for the default (null) group id
             this.coordinator = groupId == null ? null :
@@ -833,7 +832,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                   long retryBackoffMs,
                   long requestTimeoutMs,
                   int defaultApiTimeoutMs,
-                  List<PartitionAssignor> assignors,
+                  List<ConsumerPartitionAssignor> assignors,
                   String groupId) {
         this.log = logContext.logger(getClass());
         this.clientId = clientId;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
index 3c7d010..3311cd8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
@@ -365,18 +365,17 @@ public class StickyAssignor extends AbstractPartitionAssignor {
     }
 
     @Override
-    public void onAssignment(Assignment assignment, int generation) {
+    public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
         memberAssignment = assignment.partitions();
-        this.generation = generation;
+        this.generation = metadata.generationId();
     }
 
     @Override
-    public Subscription subscription(Set<String> topics) {
+    public ByteBuffer subscriptionUserData(Set<String> topics) {
         if (memberAssignment == null)
-            return new Subscription(new ArrayList<>(topics));
+            return null;
 
-        return new Subscription(new ArrayList<>(topics),
-                serializeTopicPartitionAssignment(new ConsumerUserData(memberAssignment, Optional.of(generation))));
+        return serializeTopicPartitionAssignment(new ConsumerUserData(memberAssignment, Optional.of(generation)));
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
index 2487daa..3b966b0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
@@ -33,7 +34,7 @@ import java.util.Set;
  * Abstract assignor implementation which does some common grunt work (in particular collecting
  * partition counts which are always needed in assignors).
  */
-public abstract class AbstractPartitionAssignor implements PartitionAssignor {
+public abstract class AbstractPartitionAssignor implements ConsumerPartitionAssignor {
     private static final Logger log = LoggerFactory.getLogger(AbstractPartitionAssignor.class);
 
     /**
@@ -47,12 +48,8 @@ public abstract class AbstractPartitionAssignor implements PartitionAssignor {
                                                              Map<String, Subscription> subscriptions);
 
     @Override
-    public Subscription subscription(Set<String> topics) {
-        return new Subscription(new ArrayList<>(topics));
-    }
-
-    @Override
-    public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
+    public GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscriptions) {
+        Map<String, Subscription> subscriptions = groupSubscriptions.groupSubscription();
         Set<String> allSubscribedTopics = new HashSet<>();
         for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet())
             allSubscribedTopics.addAll(subscriptionEntry.getValue().topics());
@@ -72,12 +69,7 @@ public abstract class AbstractPartitionAssignor implements PartitionAssignor {
         Map<String, Assignment> assignments = new HashMap<>();
         for (Map.Entry<String, List<TopicPartition>> assignmentEntry : rawAssignments.entrySet())
             assignments.put(assignmentEntry.getKey(), new Assignment(assignmentEntry.getValue()));
-        return assignments;
-    }
-
-    @Override
-    public void onAssignment(Assignment assignment) {
-        // this assignor maintains no internal state, so nothing to do
+        return new GroupAssignment(assignments);
     }
 
     protected static <K, V> void put(Map<K, List<V>> map, K key, V value) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 4866986..a28119d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -18,13 +18,16 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.clients.consumer.CommitFailedException;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
-import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Assignment;
-import org.apache.kafka.clients.consumer.internals.PartitionAssignor.RebalanceProtocol;
-import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
@@ -78,7 +81,7 @@ import java.util.stream.Collectors;
 public final class ConsumerCoordinator extends AbstractCoordinator {
     private final GroupRebalanceConfig rebalanceConfig;
     private final Logger log;
-    private final List<PartitionAssignor> assignors;
+    private final List<ConsumerPartitionAssignor> assignors;
     private final ConsumerMetadata metadata;
     private final ConsumerCoordinatorMetrics sensors;
     private final SubscriptionState subscriptions;
@@ -128,7 +131,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig,
                                LogContext logContext,
                                ConsumerNetworkClient client,
-                               List<PartitionAssignor> assignors,
+                               List<ConsumerPartitionAssignor> assignors,
                                ConsumerMetadata metadata,
                                SubscriptionState subscriptions,
                                Metrics metrics,
@@ -170,13 +173,13 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         if (!assignors.isEmpty()) {
             List<RebalanceProtocol> supportedProtocols = new ArrayList<>(assignors.get(0).supportedProtocols());
 
-            for (PartitionAssignor assignor : assignors) {
+            for (ConsumerPartitionAssignor assignor : assignors) {
                 supportedProtocols.retainAll(assignor.supportedProtocols());
             }
 
             if (supportedProtocols.isEmpty()) {
                 throw new IllegalArgumentException("Specified assignors " +
-                    assignors.stream().map(PartitionAssignor::name).collect(Collectors.toSet()) +
+                    assignors.stream().map(ConsumerPartitionAssignor::name).collect(Collectors.toSet()) +
                     " do not have commonly supported rebalance protocol");
             }
 
@@ -201,8 +204,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         this.joinedSubscription = subscriptions.subscription();
         JoinGroupRequestData.JoinGroupRequestProtocolCollection protocolSet = new JoinGroupRequestData.JoinGroupRequestProtocolCollection();
 
-        for (PartitionAssignor assignor : assignors) {
-            Subscription subscription = assignor.subscription(joinedSubscription);
+        for (ConsumerPartitionAssignor assignor : assignors) {
+            Subscription subscription = new Subscription(new ArrayList<>(joinedSubscription),
+                                                         assignor.subscriptionUserData(joinedSubscription),
+                                                         subscriptions.assignedPartitionsList());
             ByteBuffer metadata = ConsumerProtocol.serializeSubscription(subscription);
 
             protocolSet.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
@@ -220,8 +225,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
             metadata.requestUpdateForNewTopics();
     }
 
-    private PartitionAssignor lookupAssignor(String name) {
-        for (PartitionAssignor assignor : this.assignors) {
+    private ConsumerPartitionAssignor lookupAssignor(String name) {
+        for (ConsumerPartitionAssignor assignor : this.assignors) {
             if (assignor.name().equals(name))
                 return assignor;
         }
@@ -261,7 +266,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         if (!isLeader)
             assignmentSnapshot = null;
 
-        PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
+        ConsumerPartitionAssignor assignor = lookupAssignor(assignmentStrategy);
         if (assignor == null)
             throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
 
@@ -285,7 +290,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         maybeUpdateJoinedSubscription(assignedPartitions);
 
         // give the assignor a chance to update internal state based on the received assignment
-        assignor.onAssignment(assignment, generation);
+        ConsumerGroupMetadata metadata = new ConsumerGroupMetadata(rebalanceConfig.groupId, generation, memberId, rebalanceConfig.groupInstanceId);
+        assignor.onAssignment(assignment, metadata);
 
         // reschedule the auto commit starting from now
         if (autoCommitEnabled)
@@ -314,10 +320,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
             case COOPERATIVE:
                 assignAndRevoke(listener, assignedPartitions, ownedPartitions);
 
-                if (assignment.error() == ConsumerProtocol.AssignmentError.NEED_REJOIN) {
-                    requestRejoin();
-                }
-
                 break;
         }
 
@@ -470,20 +472,17 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     protected Map<String, ByteBuffer> performAssignment(String leaderId,
                                                         String assignmentStrategy,
                                                         List<JoinGroupResponseData.JoinGroupResponseMember> allSubscriptions) {
-        PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
+        ConsumerPartitionAssignor assignor = lookupAssignor(assignmentStrategy);
         if (assignor == null)
             throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
 
         Set<String> allSubscribedTopics = new HashSet<>();
         Map<String, Subscription> subscriptions = new HashMap<>();
-        // collect all the owned partitions
-        Map<TopicPartition, String> ownedPartitions = new HashMap<>();
         for (JoinGroupResponseData.JoinGroupResponseMember memberSubscription : allSubscriptions) {
             Subscription subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberSubscription.metadata()));
             subscription.setGroupInstanceId(Optional.ofNullable(memberSubscription.groupInstanceId()));
             subscriptions.put(memberSubscription.memberId(), subscription);
             allSubscribedTopics.addAll(subscription.topics());
-            ownedPartitions.putAll(subscription.ownedPartitions().stream().collect(Collectors.toMap(item -> item, item -> memberSubscription.memberId())));
         }
 
         // the leader will begin watching for changes to any of the topics the group is interested in,
@@ -494,16 +493,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
         log.debug("Performing assignment using strategy {} with subscriptions {}", assignor.name(), subscriptions);
 
-        Map<String, Assignment> assignments = assignor.assign(metadata.fetch(), subscriptions);
-
-        switch (protocol) {
-            case EAGER:
-                break;
-
-            case COOPERATIVE:
-                adjustAssignment(ownedPartitions, assignments);
-                break;
-        }
+        Map<String, Assignment> assignments = assignor.assign(metadata.fetch(), new GroupSubscription(subscriptions)).groupAssignment();
 
         // user-customized assignor may have created some topics that are not in the subscription list
         // and assign their partitions to the members; in this case we would like to update the leader's
@@ -547,40 +537,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         return groupAssignment;
     }
 
-    private void adjustAssignment(final Map<TopicPartition, String> ownedPartitions,
-                                  final Map<String, Assignment> assignments) {
-        boolean revocationsNeeded = false;
-        Set<TopicPartition> assignedPartitions = new HashSet<>();
-        for (final Map.Entry<String, Assignment> entry : assignments.entrySet()) {
-            final Assignment assignment = entry.getValue();
-            assignedPartitions.addAll(assignment.partitions());
-
-            // update the assignment if the partition is owned by another different owner
-            List<TopicPartition> updatedPartitions = assignment.partitions().stream()
-                .filter(tp -> ownedPartitions.containsKey(tp) && !entry.getKey().equals(ownedPartitions.get(tp)))
-                .collect(Collectors.toList());
-            if (!updatedPartitions.equals(assignment.partitions())) {
-                assignment.updatePartitions(updatedPartitions);
-                revocationsNeeded = true;
-            }
-        }
-
-        // for all owned but not assigned partitions, blindly add them to assignment
-        for (final Map.Entry<TopicPartition, String> entry : ownedPartitions.entrySet()) {
-            final TopicPartition tp = entry.getKey();
-            if (!assignedPartitions.contains(tp)) {
-                assignments.get(entry.getValue()).partitions().add(tp);
-            }
-        }
-
-        // if revocations are triggered, tell everyone to re-join immediately.
-        if (revocationsNeeded) {
-            for (final Assignment assignment : assignments.values()) {
-                assignment.setError(ConsumerProtocol.AssignmentError.NEED_REJOIN);
-            }
-        }
-    }
-
     @Override
     protected void onJoinPrepare(int generation, String memberId) {
         // commit offsets prior to rebalance if auto-commit enabled
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
index d05d5b0..e852e62 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
@@ -16,6 +16,9 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import java.util.Collections;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
@@ -49,7 +52,6 @@ import java.util.Map;
  *     Topic            => String
  *     Partitions       => [int32]
  *   UserData           => Bytes
- *   ErrorCode          => [int16]
  * </pre>
  *
  * Version 0 format:
@@ -85,11 +87,11 @@ public class ConsumerProtocol {
     public static final String TOPIC_PARTITIONS_KEY_NAME = "topic_partitions";
     public static final String USER_DATA_KEY_NAME = "user_data";
 
-    public static final Field.Int16 ERROR_CODE = new Field.Int16("error_code", "Assignment error code");
-
     public static final short CONSUMER_PROTOCOL_V0 = 0;
     public static final short CONSUMER_PROTOCOL_V1 = 1;
 
+    public static final short CONSUMER_PROTOCL_LATEST_VERSION = CONSUMER_PROTOCOL_V1;
+
     public static final Schema CONSUMER_PROTOCOL_HEADER_SCHEMA = new Schema(
             new Field(VERSION_KEY_NAME, Type.INT16));
     private static final Struct CONSUMER_PROTOCOL_HEADER_V0 = new Struct(CONSUMER_PROTOCOL_HEADER_SCHEMA)
@@ -116,36 +118,9 @@ public class ConsumerProtocol {
 
     public static final Schema ASSIGNMENT_V1 = new Schema(
         new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(TOPIC_ASSIGNMENT_V0)),
-        new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES),
-        ERROR_CODE);
-
-    public enum AssignmentError {
-        NONE(0),
-        NEED_REJOIN(1);
+        new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES));
 
-        private final short code;
-
-        AssignmentError(final int code) {
-            this.code = (short) code;
-        }
-
-        public short code() {
-            return code;
-        }
-
-        public static AssignmentError fromCode(final short code) {
-            switch (code) {
-                case 0:
-                    return NONE;
-                case 1:
-                    return NEED_REJOIN;
-                default:
-                    throw new IllegalArgumentException("Unknown error code: " + code);
-            }
-        }
-    }
-
-    public static ByteBuffer serializeSubscriptionV0(PartitionAssignor.Subscription subscription) {
+    public static ByteBuffer serializeSubscriptionV0(Subscription subscription) {
         Struct struct = new Struct(SUBSCRIPTION_V0);
         struct.set(USER_DATA_KEY_NAME, subscription.userData());
         struct.set(TOPICS_KEY_NAME, subscription.topics().toArray());
@@ -157,7 +132,7 @@ public class ConsumerProtocol {
         return buffer;
     }
 
-    public static ByteBuffer serializeSubscriptionV1(PartitionAssignor.Subscription subscription) {
+    public static ByteBuffer serializeSubscriptionV1(Subscription subscription) {
         Struct struct = new Struct(SUBSCRIPTION_V1);
         struct.set(USER_DATA_KEY_NAME, subscription.userData());
         struct.set(TOPICS_KEY_NAME, subscription.topics().toArray());
@@ -178,8 +153,12 @@ public class ConsumerProtocol {
         return buffer;
     }
 
-    public static ByteBuffer serializeSubscription(PartitionAssignor.Subscription subscription) {
-        switch (subscription.version()) {
+    public static ByteBuffer serializeSubscription(Subscription subscription) {
+        return serializeSubscription(subscription, CONSUMER_PROTOCL_LATEST_VERSION);
+    }
+
+    public static ByteBuffer serializeSubscription(Subscription subscription, short version) {
+        switch (version) {
             case CONSUMER_PROTOCOL_V0:
                 return serializeSubscriptionV0(subscription);
 
@@ -192,17 +171,17 @@ public class ConsumerProtocol {
         }
     }
 
-    public static PartitionAssignor.Subscription deserializeSubscriptionV0(ByteBuffer buffer) {
+    public static Subscription deserializeSubscriptionV0(ByteBuffer buffer) {
         Struct struct = SUBSCRIPTION_V0.read(buffer);
         ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME);
         List<String> topics = new ArrayList<>();
         for (Object topicObj : struct.getArray(TOPICS_KEY_NAME))
             topics.add((String) topicObj);
 
-        return new PartitionAssignor.Subscription(CONSUMER_PROTOCOL_V0, topics, userData);
+        return new Subscription(topics, userData, Collections.emptyList());
     }
 
-    public static PartitionAssignor.Subscription deserializeSubscriptionV1(ByteBuffer buffer) {
+    public static Subscription deserializeSubscriptionV1(ByteBuffer buffer) {
         Struct struct = SUBSCRIPTION_V1.read(buffer);
         ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME);
         List<String> topics = new ArrayList<>();
@@ -218,10 +197,10 @@ public class ConsumerProtocol {
             }
         }
 
-        return new PartitionAssignor.Subscription(CONSUMER_PROTOCOL_V1, topics, userData, ownedPartitions);
+        return new Subscription(topics, userData, ownedPartitions);
     }
 
-    public static PartitionAssignor.Subscription deserializeSubscription(ByteBuffer buffer) {
+    public static Subscription deserializeSubscription(ByteBuffer buffer) {
         Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
         Short version = header.getShort(VERSION_KEY_NAME);
 
@@ -241,7 +220,7 @@ public class ConsumerProtocol {
         }
     }
 
-    public static ByteBuffer serializeAssignmentV0(PartitionAssignor.Assignment assignment) {
+    public static ByteBuffer serializeAssignmentV0(Assignment assignment) {
         Struct struct = new Struct(ASSIGNMENT_V0);
         struct.set(USER_DATA_KEY_NAME, assignment.userData());
         List<Struct> topicAssignments = new ArrayList<>();
@@ -261,7 +240,7 @@ public class ConsumerProtocol {
         return buffer;
     }
 
-    public static ByteBuffer serializeAssignmentV1(PartitionAssignor.Assignment assignment) {
+    public static ByteBuffer serializeAssignmentV1(Assignment assignment) {
         Struct struct = new Struct(ASSIGNMENT_V1);
         struct.set(USER_DATA_KEY_NAME, assignment.userData());
         List<Struct> topicAssignments = new ArrayList<>();
@@ -273,7 +252,6 @@ public class ConsumerProtocol {
             topicAssignments.add(topicAssignment);
         }
         struct.set(TOPIC_PARTITIONS_KEY_NAME, topicAssignments.toArray());
-        struct.set(ERROR_CODE, assignment.error().code);
 
         ByteBuffer buffer = ByteBuffer.allocate(CONSUMER_PROTOCOL_HEADER_V1.sizeOf() + ASSIGNMENT_V1.sizeOf(struct));
         CONSUMER_PROTOCOL_HEADER_V1.writeTo(buffer);
@@ -282,8 +260,12 @@ public class ConsumerProtocol {
         return buffer;
     }
 
-    public static ByteBuffer serializeAssignment(PartitionAssignor.Assignment assignment) {
-        switch (assignment.version()) {
+    public static ByteBuffer serializeAssignment(Assignment assignment) {
+        return serializeAssignment(assignment, CONSUMER_PROTOCL_LATEST_VERSION);
+    }
+
+    public static ByteBuffer serializeAssignment(Assignment assignment, short version) {
+        switch (version) {
             case CONSUMER_PROTOCOL_V0:
                 return serializeAssignmentV0(assignment);
 
@@ -296,7 +278,7 @@ public class ConsumerProtocol {
         }
     }
 
-    public static PartitionAssignor.Assignment deserializeAssignmentV0(ByteBuffer buffer) {
+    public static Assignment deserializeAssignmentV0(ByteBuffer buffer) {
         Struct struct = ASSIGNMENT_V0.read(buffer);
         ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME);
         List<TopicPartition> partitions = new ArrayList<>();
@@ -307,27 +289,14 @@ public class ConsumerProtocol {
                 partitions.add(new TopicPartition(topic, (Integer) partitionObj));
             }
         }
-        return new PartitionAssignor.Assignment(CONSUMER_PROTOCOL_V0, partitions, userData);
+        return new Assignment(partitions, userData);
     }
 
-    public static PartitionAssignor.Assignment deserializeAssignmentV1(ByteBuffer buffer) {
-        Struct struct = ASSIGNMENT_V1.read(buffer);
-        ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME);
-        List<TopicPartition> partitions = new ArrayList<>();
-        for (Object structObj : struct.getArray(TOPIC_PARTITIONS_KEY_NAME)) {
-            Struct assignment = (Struct) structObj;
-            String topic = assignment.getString(TOPIC_KEY_NAME);
-            for (Object partitionObj : assignment.getArray(PARTITIONS_KEY_NAME)) {
-                partitions.add(new TopicPartition(topic, (Integer) partitionObj));
-            }
-        }
-
-        AssignmentError error = AssignmentError.fromCode(struct.get(ERROR_CODE));
-
-        return new PartitionAssignor.Assignment(CONSUMER_PROTOCOL_V1, partitions, userData, error);
+    public static Assignment deserializeAssignmentV1(ByteBuffer buffer) {
+        return deserializeAssignmentV0(buffer);
     }
 
-    public static PartitionAssignor.Assignment deserializeAssignment(ByteBuffer buffer) {
+    public static Assignment deserializeAssignment(ByteBuffer buffer) {
         Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
         Short version = header.getShort(VERSION_KEY_NAME);
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
index c26f684..b3f2ada 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
@@ -18,18 +18,12 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.protocol.types.SchemaException;
 
 import java.nio.ByteBuffer;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 
-import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.CONSUMER_PROTOCOL_V0;
-import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.CONSUMER_PROTOCOL_V1;
-
 /**
  * This interface is used to define custom partition assignment for use in
  * {@link org.apache.kafka.clients.consumer.KafkaConsumer}. Members of the consumer group subscribe
@@ -43,6 +37,7 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.CONSU
  * userData in the returned Subscription. For example, to have a rack-aware assignor, an implementation
  * can use this user data to forward the rackId belonging to each member.
  */
+@Deprecated
 public interface PartitionAssignor {
 
     /**
@@ -79,21 +74,6 @@ public interface PartitionAssignor {
         onAssignment(assignment);
     }
 
-    /**
-     * Indicate which rebalance protocol this assignor works with;
-     * By default it should always work with {@link RebalanceProtocol#EAGER}.
-     */
-    default List<RebalanceProtocol> supportedProtocols() {
-        return Collections.singletonList(RebalanceProtocol.EAGER);
-    }
-
-    /**
-     * Return the version of the assignor which indicates how the user metadata encodings
-     * and the assignment algorithm gets evolved.
-     */
-    default short version() {
-        return (short) 0;
-    }
 
     /**
      * Unique name for this assignor (e.g. "range" or "roundrobin" or "sticky")
@@ -101,156 +81,52 @@ public interface PartitionAssignor {
      */
     String name();
 
-    enum RebalanceProtocol {
-        EAGER((byte) 0), COOPERATIVE((byte) 1);
-
-        private final byte id;
-
-        RebalanceProtocol(byte id) {
-            this.id = id;
-        }
-
-        public byte id() {
-            return id;
-        }
-
-        public static RebalanceProtocol forId(byte id) {
-            switch (id) {
-                case 0:
-                    return EAGER;
-                case 1:
-                    return COOPERATIVE;
-                default:
-                    throw new IllegalArgumentException("Unknown rebalance protocol id: " + id);
-            }
-        }
-    }
-
     class Subscription {
-        private final Short version;
         private final List<String> topics;
         private final ByteBuffer userData;
-        private final List<TopicPartition> ownedPartitions;
-        private Optional<String> groupInstanceId;
 
-        Subscription(Short version,
-                     List<String> topics,
-                     ByteBuffer userData,
-                     List<TopicPartition> ownedPartitions) {
-            this.version = version;
+        public Subscription(List<String> topics, ByteBuffer userData) {
             this.topics = topics;
             this.userData = userData;
-            this.ownedPartitions = ownedPartitions;
-            this.groupInstanceId = Optional.empty();
-
-            if (version < CONSUMER_PROTOCOL_V0)
-                throw new SchemaException("Unsupported subscription version: " + version);
-
-            if (version < CONSUMER_PROTOCOL_V1 && !ownedPartitions.isEmpty())
-                throw new IllegalArgumentException("Subscription version smaller than 1 should not have owned partitions");
-        }
-
-        Subscription(Short version, List<String> topics, ByteBuffer userData) {
-            this(version, topics, userData, Collections.emptyList());
-        }
-
-        public Subscription(List<String> topics, ByteBuffer userData, List<TopicPartition> ownedPartitions) {
-            this(CONSUMER_PROTOCOL_V1, topics, userData, ownedPartitions);
-        }
-
-        public Subscription(List<String> topics, ByteBuffer userData) {
-            this(CONSUMER_PROTOCOL_V1, topics, userData);
         }
 
         public Subscription(List<String> topics) {
             this(topics, ByteBuffer.wrap(new byte[0]));
         }
 
-        Short version() {
-            return version;
-        }
-
         public List<String> topics() {
             return topics;
         }
 
-        public List<TopicPartition> ownedPartitions() {
-            return ownedPartitions;
-        }
-
         public ByteBuffer userData() {
             return userData;
         }
 
-        public void setGroupInstanceId(Optional<String> groupInstanceId) {
-            this.groupInstanceId = groupInstanceId;
-        }
-
-        public Optional<String> groupInstanceId() {
-            return groupInstanceId;
-        }
-
         @Override
         public String toString() {
             return "Subscription(" +
-                    "version=" + version +
-                    ", topics=" + topics +
-                    ", ownedPartitions=" + ownedPartitions +
-                    ", group.instance.id=" + groupInstanceId + ")";
+                "topics=" + topics +
+                ')';
         }
     }
 
     class Assignment {
-        private final Short version;
-        private List<TopicPartition> partitions;
+        private final List<TopicPartition> partitions;
         private final ByteBuffer userData;
-        private ConsumerProtocol.AssignmentError error;
 
-        Assignment(Short version, List<TopicPartition> partitions, ByteBuffer userData, ConsumerProtocol.AssignmentError error) {
-            this.version = version;
+        public Assignment(List<TopicPartition> partitions, ByteBuffer userData) {
             this.partitions = partitions;
             this.userData = userData;
-            this.error = error;
-
-            if (version < CONSUMER_PROTOCOL_V0)
-                throw new SchemaException("Unsupported subscription version: " + version);
-
-            if (version < CONSUMER_PROTOCOL_V1 && error != ConsumerProtocol.AssignmentError.NONE)
-                throw new IllegalArgumentException("Assignment version smaller than 1 should not have error code.");
-        }
-
-        Assignment(Short version, List<TopicPartition> partitions, ByteBuffer userData) {
-            this(version, partitions, userData, ConsumerProtocol.AssignmentError.NONE);
-        }
-
-        public Assignment(List<TopicPartition> partitions, ByteBuffer userData) {
-            this(CONSUMER_PROTOCOL_V1, partitions, userData);
         }
 
         public Assignment(List<TopicPartition> partitions) {
             this(partitions, ByteBuffer.wrap(new byte[0]));
         }
 
-        Short version() {
-            return version;
-        }
-
         public List<TopicPartition> partitions() {
             return partitions;
         }
 
-        public ConsumerProtocol.AssignmentError error() {
-            return error;
-        }
-
-        public void updatePartitions(List<TopicPartition> partitions) {
-            this.partitions = partitions;
-        }
-
-        public void setError(ConsumerProtocol.AssignmentError error) {
-            this.error = error;
-        }
-
         public ByteBuffer userData() {
             return userData;
         }
@@ -258,10 +134,8 @@ public interface PartitionAssignor {
         @Override
         public String toString() {
             return "Assignment(" +
-                    "version=" + version +
-                    ", partitions=" + partitions +
-                    ", error=" + error +
-                    ')';
+                "partitions=" + partitions +
+                ')';
         }
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 3f1cf98..af834ce 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import java.util.ArrayList;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
@@ -388,6 +389,13 @@ public class SubscriptionState {
     }
 
     /**
+     * @return a modifiable copy of the currently assigned partitions as a list
+     */
+    public synchronized List<TopicPartition> assignedPartitionsList() {
+        return new ArrayList<>(this.assignment.partitionSet());
+    }
+
+    /**
      * Provides the number of assigned partitions in a thread safe manner.
      * @return the number of assigned partitions.
      */
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 711c8f9..769f58c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -21,9 +21,9 @@ import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
-import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.ElectionType;
 import org.apache.kafka.common.KafkaException;
@@ -1222,7 +1222,7 @@ public class KafkaAdminClientTest {
             topicPartitions.add(1, myTopicPartition1);
             topicPartitions.add(2, myTopicPartition2);
 
-            final ByteBuffer memberAssignment = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(topicPartitions));
+            final ByteBuffer memberAssignment = ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(topicPartitions));
             byte[] memberAssignmentBytes = new byte[memberAssignment.remaining()];
             memberAssignment.get(memberAssignmentBytes);
 
@@ -1282,7 +1282,7 @@ public class KafkaAdminClientTest {
             topicPartitions.add(1, myTopicPartition1);
             topicPartitions.add(2, myTopicPartition2);
 
-            final ByteBuffer memberAssignment = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(topicPartitions));
+            final ByteBuffer memberAssignment = ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(topicPartitions));
             byte[] memberAssignmentBytes = new byte[memberAssignment.remaining()];
             memberAssignment.get(memberAssignmentBytes);
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index c1adf19..1227c27 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -28,7 +28,6 @@ import org.apache.kafka.clients.consumer.internals.ConsumerMetrics;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.clients.consumer.internals.Fetcher;
-import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
@@ -396,7 +395,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        PartitionAssignor assignor = new RoundRobinAssignor();
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
 
@@ -430,7 +429,7 @@ public class KafkaConsumerTest {
 
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
-        PartitionAssignor assignor = new RoundRobinAssignor();
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
@@ -465,7 +464,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        final PartitionAssignor assignor = new RoundRobinAssignor();
+        final ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
@@ -489,7 +488,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        final PartitionAssignor assignor = new RoundRobinAssignor();
+        final ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
@@ -512,7 +511,7 @@ public class KafkaConsumerTest {
         MockClient client = new MockClient(time, metadata);
 
         initMetadata(client, Collections.singletonMap(topic, 1));
-        PartitionAssignor assignor = new RoundRobinAssignor();
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.assign(singleton(tp0));
@@ -587,7 +586,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        PartitionAssignor assignor = new RoundRobinAssignor();
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor,
                 true, groupId, groupInstanceId);
@@ -611,7 +610,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        PartitionAssignor assignor = new RoundRobinAssignor();
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor,
                 true, groupId, groupInstanceId);
@@ -636,7 +635,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        PartitionAssignor assignor = new RoundRobinAssignor();
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor,
                 true, groupId, groupInstanceId);
@@ -663,7 +662,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        PartitionAssignor assignor = new RoundRobinAssignor();
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor,
                 true, groupId, Optional.empty());
@@ -686,7 +685,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 2));
         Node node = metadata.fetch().nodes().get(0);
 
-        PartitionAssignor assignor = new RoundRobinAssignor();
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.assign(singletonList(tp0));
@@ -724,7 +723,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        PartitionAssignor assignor = new RoundRobinAssignor();
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
@@ -764,7 +763,7 @@ public class KafkaConsumerTest {
         initMetadata(client, partitionCounts);
         Node node = metadata.fetch().nodes().get(0);
 
-        PartitionAssignor assignor = new RoundRobinAssignor();
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
         prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null);
@@ -782,7 +781,7 @@ public class KafkaConsumerTest {
 
     @Test
     public void testChangingRegexSubscription() {
-        PartitionAssignor assignor = new RoundRobinAssignor();
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         String otherTopic = "other";
         TopicPartition otherTopicPartition = new TopicPartition(otherTopic, 0);
@@ -828,7 +827,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        PartitionAssignor assignor = new RoundRobinAssignor();
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
@@ -878,7 +877,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        final PartitionAssignor assignor = new RoundRobinAssignor();
+        final ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
@@ -908,7 +907,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        PartitionAssignor assignor = new RangeAssignor();
+        ConsumerPartitionAssignor assignor = new RangeAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
         consumer.subscribe(singletonList(topic), getConsumerRebalanceListener(consumer));
@@ -948,7 +947,7 @@ public class KafkaConsumerTest {
         initMetadata(client, tpCounts);
         Node node = metadata.fetch().nodes().get(0);
 
-        PartitionAssignor assignor = new RangeAssignor();
+        ConsumerPartitionAssignor assignor = new RangeAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
 
@@ -1062,7 +1061,7 @@ public class KafkaConsumerTest {
         initMetadata(client, tpCounts);
         Node node = metadata.fetch().nodes().get(0);
 
-        PartitionAssignor assignor = new RangeAssignor();
+        ConsumerPartitionAssignor assignor = new RangeAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
 
@@ -1124,7 +1123,7 @@ public class KafkaConsumerTest {
         initMetadata(client, tpCounts);
         Node node = metadata.fetch().nodes().get(0);
 
-        PartitionAssignor assignor = new RangeAssignor();
+        ConsumerPartitionAssignor assignor = new RangeAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
 
@@ -1180,7 +1179,7 @@ public class KafkaConsumerTest {
         initMetadata(client, tpCounts);
         Node node = metadata.fetch().nodes().get(0);
 
-        PartitionAssignor assignor = new RangeAssignor();
+        ConsumerPartitionAssignor assignor = new RangeAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
 
@@ -1234,7 +1233,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 2));
         Node node = metadata.fetch().nodes().get(0);
 
-        PartitionAssignor assignor = new RangeAssignor();
+        ConsumerPartitionAssignor assignor = new RangeAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
 
@@ -1429,7 +1428,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        PartitionAssignor assignor = new RoundRobinAssignor();
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
@@ -1457,7 +1456,7 @@ public class KafkaConsumerTest {
                 coordinator);
 
         // join group
-        final ByteBuffer byteBuffer = ConsumerProtocol.serializeSubscription(new PartitionAssignor.Subscription(singletonList(topic)));
+        final ByteBuffer byteBuffer = ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription(singletonList(topic)));
 
         // This member becomes the leader
         String memberId = "memberId";
@@ -1512,7 +1511,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        PartitionAssignor assignor = new RoundRobinAssignor();
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, Optional.empty());
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
@@ -1649,7 +1648,7 @@ public class KafkaConsumerTest {
         initMetadata(client, singletonMap(topic, 1));
         Node node = metadata.fetch().nodes().get(0);
 
-        PartitionAssignor assignor = new RangeAssignor();
+        ConsumerPartitionAssignor assignor = new RangeAssignor();
 
         client.createPendingAuthenticationError(node, 0);
         return newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
@@ -1675,7 +1674,7 @@ public class KafkaConsumerTest {
                                     subscription, new LogContext(), new ClusterResourceListeners());
     }
 
-    private Node prepareRebalance(MockClient client, Node node, final Set<String> subscribedTopics, PartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) {
+    private Node prepareRebalance(MockClient client, Node node, final Set<String> subscribedTopics, ConsumerPartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) {
         if (coordinator == null) {
             // lookup coordinator
             client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
@@ -1692,7 +1691,7 @@ public class KafkaConsumerTest {
                 assertTrue(protocolIterator.hasNext());
 
                 ByteBuffer protocolMetadata = ByteBuffer.wrap(protocolIterator.next().metadata());
-                PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(protocolMetadata);
+                ConsumerPartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(protocolMetadata);
                 return subscribedTopics.equals(new HashSet<>(subscription.topics()));
             }
         }, joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE), coordinator);
@@ -1703,7 +1702,7 @@ public class KafkaConsumerTest {
         return coordinator;
     }
 
-    private Node prepareRebalance(MockClient client, Node node, PartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) {
+    private Node prepareRebalance(MockClient client, Node node, ConsumerPartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) {
         if (coordinator == null) {
             // lookup coordinator
             client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
@@ -1764,7 +1763,7 @@ public class KafkaConsumerTest {
         return new OffsetCommitResponse(responseData);
     }
 
-    private JoinGroupResponse joinGroupFollowerResponse(PartitionAssignor assignor, int generationId, String memberId, String leaderId, Errors error) {
+    private JoinGroupResponse joinGroupFollowerResponse(ConsumerPartitionAssignor assignor, int generationId, String memberId, String leaderId, Errors error) {
         return new JoinGroupResponse(
                 new JoinGroupResponseData()
                         .setErrorCode(error.code())
@@ -1777,7 +1776,7 @@ public class KafkaConsumerTest {
     }
 
     private SyncGroupResponse syncGroupResponse(List<TopicPartition> partitions, Errors error) {
-        ByteBuffer buf = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(partitions));
+        ByteBuffer buf = ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(partitions));
         return new SyncGroupResponse(
                 new SyncGroupResponseData()
                         .setErrorCode(error.code())
@@ -1848,7 +1847,7 @@ public class KafkaConsumerTest {
                                                       KafkaClient client,
                                                       SubscriptionState subscription,
                                                       ConsumerMetadata metadata,
-                                                      PartitionAssignor assignor,
+                                                      ConsumerPartitionAssignor assignor,
                                                       boolean autoCommitEnabled,
                                                       Optional<String> groupInstanceId) {
         return newConsumer(time, client, subscription, metadata, assignor, autoCommitEnabled, groupId, groupInstanceId);
@@ -1865,7 +1864,7 @@ public class KafkaConsumerTest {
                                                       KafkaClient client,
                                                       SubscriptionState subscription,
                                                       ConsumerMetadata metadata,
-                                                      PartitionAssignor assignor,
+                                                      ConsumerPartitionAssignor assignor,
                                                       boolean autoCommitEnabled,
                                                       String groupId,
                                                       Optional<String> groupInstanceId) {
@@ -1885,7 +1884,7 @@ public class KafkaConsumerTest {
         Deserializer<String> keyDeserializer = new StringDeserializer();
         Deserializer<String> valueDeserializer = new StringDeserializer();
 
-        List<PartitionAssignor> assignors = singletonList(assignor);
+        List<ConsumerPartitionAssignor> assignors = singletonList(assignor);
         ConsumerInterceptors<String, String> interceptors = new ConsumerInterceptors<>(Collections.emptyList());
 
         Metrics metrics = new Metrics();
@@ -1985,7 +1984,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 1));
         Cluster cluster = metadata.fetch();
 
-        PartitionAssignor assignor = new RoundRobinAssignor();
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         String invalidTopicName = "topic abc";  // Invalid topic name due to space
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java
index f08ca14..118e60a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.clients.consumer;
 
-import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
 import org.apache.kafka.common.TopicPartition;
 import org.junit.Test;
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java
index fa68406..02fb9ff 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.clients.consumer;
 
 import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.MemberInfo;
-import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
 import org.apache.kafka.common.TopicPartition;
 import org.junit.Test;
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
index 89f0d37..6dd3062 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
@@ -30,7 +30,7 @@ import java.util.Random;
 import java.util.Set;
 
 import org.apache.kafka.clients.consumer.StickyAssignor.ConsumerUserData;
-import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index a0878fb..a81e73e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.consumer.CommitFailedException;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
@@ -125,9 +126,9 @@ public class ConsumerCoordinatorTest {
     private final MockTime time = new MockTime();
     private GroupRebalanceConfig rebalanceConfig;
 
-    private final PartitionAssignor.RebalanceProtocol protocol;
+    private final ConsumerPartitionAssignor.RebalanceProtocol protocol;
     private final MockPartitionAssignor partitionAssignor;
-    private final List<PartitionAssignor> assignors;
+    private final List<ConsumerPartitionAssignor> assignors;
     private MockClient client;
     private MetadataResponse metadataResponse = TestUtils.metadataUpdateWith(1, new HashMap<String, Integer>() {
         {
@@ -144,7 +145,7 @@ public class ConsumerCoordinatorTest {
     private MockCommitCallback mockOffsetCommitCallback;
     private ConsumerCoordinator coordinator;
 
-    public ConsumerCoordinatorTest(final PartitionAssignor.RebalanceProtocol protocol) {
+    public ConsumerCoordinatorTest(final ConsumerPartitionAssignor.RebalanceProtocol protocol) {
         this.protocol = protocol;
         this.partitionAssignor = new MockPartitionAssignor(Collections.singletonList(protocol));
         this.assignors = Collections.singletonList(partitionAssignor);
@@ -153,7 +154,7 @@ public class ConsumerCoordinatorTest {
     @Parameterized.Parameters(name = "rebalance protocol = {0}")
     public static Collection<Object[]> data() {
         final List<Object[]> values = new ArrayList<>();
-        for (final PartitionAssignor.RebalanceProtocol protocol: PartitionAssignor.RebalanceProtocol.values()) {
+        for (final ConsumerPartitionAssignor.RebalanceProtocol protocol: ConsumerPartitionAssignor.RebalanceProtocol.values()) {
             values.add(new Object[]{protocol});
         }
         return values;
@@ -198,20 +199,20 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testSelectRebalanceProtcol() {
-        List<PartitionAssignor> assignors = new ArrayList<>();
-        assignors.add(new MockPartitionAssignor(Collections.singletonList(PartitionAssignor.RebalanceProtocol.EAGER)));
-        assignors.add(new MockPartitionAssignor(Collections.singletonList(PartitionAssignor.RebalanceProtocol.COOPERATIVE)));
+        List<ConsumerPartitionAssignor> assignors = new ArrayList<>();
+        assignors.add(new MockPartitionAssignor(Collections.singletonList(ConsumerPartitionAssignor.RebalanceProtocol.EAGER)));
+        assignors.add(new MockPartitionAssignor(Collections.singletonList(ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE)));
 
         // no commonly supported protocols
         assertThrows(IllegalArgumentException.class, () -> buildCoordinator(rebalanceConfig, new Metrics(), assignors, false));
 
         assignors.clear();
-        assignors.add(new MockPartitionAssignor(Arrays.asList(PartitionAssignor.RebalanceProtocol.EAGER, PartitionAssignor.RebalanceProtocol.COOPERATIVE)));
-        assignors.add(new MockPartitionAssignor(Arrays.asList(PartitionAssignor.RebalanceProtocol.EAGER, PartitionAssignor.RebalanceProtocol.COOPERATIVE)));
+        assignors.add(new MockPartitionAssignor(Arrays.asList(ConsumerPartitionAssignor.RebalanceProtocol.EAGER, ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE)));
+        assignors.add(new MockPartitionAssignor(Arrays.asList(ConsumerPartitionAssignor.RebalanceProtocol.EAGER, ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE)));
 
         // select higher indexed (more advanced) protocols
         try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, false)) {
-            assertEquals(PartitionAssignor.RebalanceProtocol.COOPERATIVE, coordinator.getProtocol());
+            assertEquals(ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE, coordinator.getProtocol());
         }
     }
 
@@ -553,7 +554,7 @@ public class ConsumerCoordinatorTest {
         final int addCount = 1;
 
         // with eager protocol we will call revoke on the old assignment as well
-        if (protocol == PartitionAssignor.RebalanceProtocol.EAGER) {
+        if (protocol == ConsumerPartitionAssignor.RebalanceProtocol.EAGER) {
             revokeCount += 1;
         }
 
@@ -670,7 +671,7 @@ public class ConsumerCoordinatorTest {
                 JoinGroupRequestData.JoinGroupRequestProtocol protocolMetadata = protocolIterator.next();
 
                 ByteBuffer metadata = ByteBuffer.wrap(protocolMetadata.metadata());
-                PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(metadata);
+                ConsumerPartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(metadata);
                 metadata.rewind();
                 return subscription.topics().containsAll(updatedSubscription);
             }
@@ -2326,7 +2327,7 @@ public class ConsumerCoordinatorTest {
 
     private ConsumerCoordinator buildCoordinator(final GroupRebalanceConfig rebalanceConfig,
                                                  final Metrics metrics,
-                                                 final List<PartitionAssignor> assignors,
+                                                 final List<ConsumerPartitionAssignor> assignors,
                                                  final boolean autoCommitEnabled) {
         return new ConsumerCoordinator(
                 rebalanceConfig,
@@ -2385,7 +2386,7 @@ public class ConsumerCoordinatorTest {
                                                       Errors error) {
         List<JoinGroupResponseData.JoinGroupResponseMember> metadata = new ArrayList<>();
         for (Map.Entry<String, List<String>> subscriptionEntry : subscriptions.entrySet()) {
-            PartitionAssignor.Subscription subscription = new PartitionAssignor.Subscription(subscriptionEntry.getValue());
+            ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(subscriptionEntry.getValue());
             ByteBuffer buf = ConsumerProtocol.serializeSubscription(subscription);
             metadata.add(new JoinGroupResponseData.JoinGroupResponseMember()
                     .setMemberId(subscriptionEntry.getKey())
@@ -2416,7 +2417,7 @@ public class ConsumerCoordinatorTest {
     }
 
     private SyncGroupResponse syncGroupResponse(List<TopicPartition> partitions, Errors error) {
-        ByteBuffer buf = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(partitions));
+        ByteBuffer buf = ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(partitions));
         return new SyncGroupResponse(
                 new SyncGroupResponseData()
                         .setErrorCode(error.code())
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
index 9e601b0..3cf7be5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
@@ -16,8 +16,8 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
-import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Assignment;
-import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
@@ -39,7 +39,6 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.TOPIC
 import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.TOPIC_PARTITIONS_KEY_NAME;
 import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.USER_DATA_KEY_NAME;
 import static org.apache.kafka.clients.consumer.internals.ConsumerProtocol.VERSION_KEY_NAME;
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
 import static org.apache.kafka.test.TestUtils.toSet;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -54,7 +53,7 @@ public class ConsumerProtocolTest {
 
     @Test
     public void serializeDeserializeMetadata() {
-        Subscription subscription = new Subscription(Arrays.asList("foo", "bar"));
+        Subscription subscription = new Subscription(Arrays.asList("foo", "bar"), ByteBuffer.wrap(new byte[0]));
         ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
         Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer);
         assertEquals(subscription.topics(), parsedSubscription.topics());
@@ -64,7 +63,7 @@ public class ConsumerProtocolTest {
 
     @Test
     public void serializeDeserializeMetadataAndGroupInstanceId() {
-        Subscription subscription = new Subscription(Arrays.asList("foo", "bar"));
+        Subscription subscription = new Subscription(Arrays.asList("foo", "bar"), ByteBuffer.wrap(new byte[0]));
         ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
 
         Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer);
@@ -85,8 +84,8 @@ public class ConsumerProtocolTest {
 
     @Test
     public void deserializeOldSubscriptionVersion() {
-        Subscription subscription = new Subscription((short) 0, Arrays.asList("foo", "bar"), null);
-        ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
+        Subscription subscription = new Subscription(Arrays.asList("foo", "bar"), null);
+        ByteBuffer buffer = ConsumerProtocol.serializeSubscriptionV0(subscription);
         Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer);
         assertEquals(parsedSubscription.topics(), parsedSubscription.topics());
         assertNull(parsedSubscription.userData());
@@ -95,7 +94,7 @@ public class ConsumerProtocolTest {
 
     @Test
     public void deserializeNewSubscriptionWithOldVersion() {
-        Subscription subscription = new Subscription((short) 1, Arrays.asList("foo", "bar"), null, Collections.singletonList(tp2));
+        Subscription subscription = new Subscription(Arrays.asList("foo", "bar"), null, Collections.singletonList(tp2));
         ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
         // ignore the version assuming it is the old byte code, as it will blindly deserialize as V0
         Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
@@ -145,7 +144,7 @@ public class ConsumerProtocolTest {
     @Test
     public void serializeDeserializeAssignment() {
         List<TopicPartition> partitions = Arrays.asList(tp1, tp2);
-        ByteBuffer buffer = ConsumerProtocol.serializeAssignment(new Assignment(partitions));
+        ByteBuffer buffer = ConsumerProtocol.serializeAssignment(new Assignment(partitions, ByteBuffer.wrap(new byte[0])));
         Assignment parsedAssignment = ConsumerProtocol.deserializeAssignment(buffer);
         assertEquals(toSet(partitions), toSet(parsedAssignment.partitions()));
         assertEquals(0, parsedAssignment.userData().limit());
@@ -161,29 +160,6 @@ public class ConsumerProtocolTest {
     }
 
     @Test
-    public void deserializeOldAssignmentVersion() {
-        List<TopicPartition> partitions = Arrays.asList(tp1, tp2);
-        ByteBuffer buffer = ConsumerProtocol.serializeAssignment(new Assignment((short) 0, partitions, null));
-        Assignment parsedAssignment = ConsumerProtocol.deserializeAssignment(buffer);
-        assertEquals(toSet(partitions), toSet(parsedAssignment.partitions()));
-        assertNull(parsedAssignment.userData());
-        assertEquals(ConsumerProtocol.AssignmentError.NONE, parsedAssignment.error());
-    }
-
-    @Test
-    public void deserializeNewAssignmentWithOldVersion() {
-        List<TopicPartition> partitions = Collections.singletonList(tp1);
-        ByteBuffer buffer = ConsumerProtocol.serializeAssignment(new Assignment((short) 1, partitions, null, ConsumerProtocol.AssignmentError.NEED_REJOIN));
-        // ignore the version assuming it is the old byte code, as it will blindly deserialize as 0
-        Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
-        header.getShort(VERSION_KEY_NAME);
-        Assignment parsedAssignment = ConsumerProtocol.deserializeAssignmentV0(buffer);
-        assertEquals(toSet(partitions), toSet(parsedAssignment.partitions()));
-        assertNull(parsedAssignment.userData());
-        assertEquals(ConsumerProtocol.AssignmentError.NONE, parsedAssignment.error());
-    }
-
-    @Test
     public void deserializeFutureAssignmentVersion() {
         // verify that a new version which adds a field is still parseable
         short version = 100;
@@ -191,7 +167,6 @@ public class ConsumerProtocolTest {
         Schema assignmentSchemaV100 = new Schema(
                 new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(TOPIC_ASSIGNMENT_V0)),
                 new Field(USER_DATA_KEY_NAME, Type.BYTES),
-                ERROR_CODE,
                 new Field("foo", Type.STRING));
 
         Struct assignmentV100 = new Struct(assignmentSchemaV100);
@@ -200,7 +175,6 @@ public class ConsumerProtocolTest {
                         .set(ConsumerProtocol.TOPIC_KEY_NAME, tp1.topic())
                         .set(ConsumerProtocol.PARTITIONS_KEY_NAME, new Object[]{tp1.partition()})});
         assignmentV100.set(USER_DATA_KEY_NAME, ByteBuffer.wrap(new byte[0]));
-        assignmentV100.set(ERROR_CODE.name, ConsumerProtocol.AssignmentError.NEED_REJOIN.code());
         assignmentV100.set("foo", "bar");
 
         Struct headerV100 = new Struct(CONSUMER_PROTOCOL_HEADER_SCHEMA);
@@ -212,8 +186,7 @@ public class ConsumerProtocolTest {
 
         buffer.flip();
 
-        PartitionAssignor.Assignment assignment = ConsumerProtocol.deserializeAssignment(buffer);
+        Assignment assignment = ConsumerProtocol.deserializeAssignment(buffer);
         assertEquals(toSet(Collections.singletonList(tp1)), toSet(assignment.partitions()));
-        assertEquals(ConsumerProtocol.AssignmentError.NEED_REJOIN, assignment.error());
     }
 }
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 6761b0c..4e06716 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -31,8 +31,8 @@ import kafka.server.{FetchDataInfo, KafkaConfig, LogOffsetMetadata, ReplicaManag
 import kafka.server.HostedPartition
 import kafka.utils.{KafkaScheduler, MockTime, TestUtils}
 import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
-import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.protocol.Errors
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 714bb0e..fa5f511 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -16,8 +16,10 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import java.nio.ByteBuffer;
 import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.KafkaException;
@@ -56,7 +58,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import static org.apache.kafka.common.utils.Utils.getHost;
 import static org.apache.kafka.common.utils.Utils.getPort;
 
-public class StreamsPartitionAssignor implements PartitionAssignor, Configurable {
+public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Configurable {
 
     final static int UNKNOWN = -1;
     private final static int VERSION_ONE = 1;
@@ -309,7 +311,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
     }
 
     @Override
-    public Subscription subscription(final Set<String> topics) {
+    public ByteBuffer subscriptionUserData(final Set<String> topics) {
         // Adds the following information to subscription
         // 1. Client UUID (a unique id assigned to an instance of KafkaStreams)
         // 2. Task ids of previously running tasks
@@ -327,7 +329,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
 
         taskManager.updateSubscriptionsFromMetadata(topics);
 
-        return new Subscription(new ArrayList<>(topics), data.encode());
+        return data.encode();
     }
 
     private Map<String, Assignment> errorAssignment(final Map<UUID, ClientMetadata> clientsMetadata,
@@ -371,8 +373,8 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
      * 3. within each client, tasks are assigned to consumer clients in round-robin manner.
      */
     @Override
-    public Map<String, Assignment> assign(final Cluster metadata,
-                                          final Map<String, Subscription> subscriptions) {
+    public GroupAssignment assign(final Cluster metadata, final GroupSubscription groupSubscriptions) {
+        final Map<String, Subscription> subscriptions = groupSubscriptions.groupSubscription();
         // construct the client metadata from the decoded subscription info
         final Map<UUID, ClientMetadata> clientMetadataMap = new HashMap<>();
         final Set<String> futureConsumers = new HashSet<>();
@@ -446,7 +448,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
                     !metadata.topics().contains(topic)) {
                     log.error("Missing source topic {} durign assignment. Returning error {}.",
                               topic, Error.INCOMPLETE_SOURCE_TOPIC_METADATA.name());
-                    return errorAssignment(clientMetadataMap, topic, Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code);
+                    return new GroupAssignment(errorAssignment(clientMetadataMap, topic, Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code));
                 }
             }
             for (final InternalTopicConfig topic: topicsInfo.repartitionSourceTopics.values()) {
@@ -644,7 +646,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
             assignment = computeNewAssignment(clientMetadataMap, partitionsForTask, partitionsByHostState, minReceivedMetadataVersion);
         }
 
-        return assignment;
+        return new GroupAssignment(assignment);
     }
 
     private Map<String, Assignment> computeNewAssignment(final Map<UUID, ClientMetadata> clientsMetadata,
@@ -777,7 +779,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
      * @throws TaskAssignmentException if there is no task id for one of the partitions specified
      */
     @Override
-    public void onAssignment(final Assignment assignment) {
+    public void onAssignment(final Assignment assignment, final ConsumerGroupMetadata metadata) {
         final List<TopicPartition> partitions = new ArrayList<>(assignment.partitions());
         partitions.sort(PARTITION_COMPARATOR);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 5fa6653..616deaf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -16,7 +16,8 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
@@ -146,7 +147,7 @@ public class StreamsPartitionAssignorTest {
         EasyMock.replay(taskManager);
     }
 
-    private Map<String, PartitionAssignor.Subscription> subscriptions;
+    private Map<String, ConsumerPartitionAssignor.Subscription> subscriptions;
 
     @Before
     public void setUp() {
@@ -200,7 +201,9 @@ public class StreamsPartitionAssignorTest {
         mockTaskManager(prevTasks, cachedTasks, processId, builder);
 
         configurePartitionAssignor(Collections.emptyMap());
-        final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1", "topic2"));
+
+        final Set<String> topics = Utils.mkSet("topic1", "topic2");
+        final ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(new ArrayList<>(topics), partitionAssignor.subscriptionUserData(topics));
 
         Collections.sort(subscription.topics());
         assertEquals(asList("topic1", "topic2"), subscription.topics());
@@ -236,16 +239,16 @@ public class StreamsPartitionAssignorTest {
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
 
         subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics,
+                new ConsumerPartitionAssignor.Subscription(topics,
                         new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, userEndPoint).encode()));
         subscriptions.put("consumer11",
-                new PartitionAssignor.Subscription(topics,
+                new ConsumerPartitionAssignor.Subscription(topics,
                         new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11, userEndPoint).encode()));
         subscriptions.put("consumer20",
-                new PartitionAssignor.Subscription(topics,
+                new ConsumerPartitionAssignor.Subscription(topics,
                         new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20, userEndPoint).encode()));
 
-        final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+        final Map<String, ConsumerPartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
 
         // check assigned partitions
         assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0), Utils.mkSet(t1p1, t2p1)),
@@ -320,13 +323,13 @@ public class StreamsPartitionAssignorTest {
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
 
         subscriptions.put("consumer10",
-                          new PartitionAssignor.Subscription(topics,
+                          new ConsumerPartitionAssignor.Subscription(topics,
                                   new SubscriptionInfo(uuid1, new HashSet<>(), new HashSet<>(), userEndPoint).encode()));
         subscriptions.put("consumer11",
-                          new PartitionAssignor.Subscription(topics,
+                          new ConsumerPartitionAssignor.Subscription(topics,
                                   new SubscriptionInfo(uuid1, new HashSet<>(), new HashSet<>(), userEndPoint).encode()));
 
-        final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(localMetadata, subscriptions);
+        final Map<String, ConsumerPartitionAssignor.Assignment> assignments = partitionAssignor.assign(localMetadata, new GroupSubscription(subscriptions)).groupAssignment();
 
         // check assigned partitions
         assertEquals(Utils.mkSet(Utils.mkSet(t2p2, t1p0, t1p2, t2p0), Utils.mkSet(t1p1, t2p1, t1p3, t2p3)),
@@ -365,10 +368,10 @@ public class StreamsPartitionAssignorTest {
 
         // will throw exception if it fails
         subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics,
+                new ConsumerPartitionAssignor.Subscription(topics,
                         new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode()
         ));
-        final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+        final Map<String, ConsumerPartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
 
         // check assignment info
         final AssignmentInfo info10 = checkAssignment(Utils.mkSet("topic1"), assignments.get("consumer10"));
@@ -399,12 +402,12 @@ public class StreamsPartitionAssignorTest {
         configurePartitionAssignor(Collections.emptyMap());
 
         subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics,
+                new ConsumerPartitionAssignor.Subscription(topics,
                         new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, userEndPoint).encode()
                 ));
 
         // initially metadata is empty
-        Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(emptyMetadata, subscriptions);
+        Map<String, ConsumerPartitionAssignor.Assignment> assignments = partitionAssignor.assign(emptyMetadata, new GroupSubscription(subscriptions)).groupAssignment();
 
         // check assigned partitions
         assertEquals(Collections.emptySet(),
@@ -417,7 +420,7 @@ public class StreamsPartitionAssignorTest {
         assertEquals(0, allActiveTasks.size());
 
         // then metadata gets populated
-        assignments = partitionAssignor.assign(metadata, subscriptions);
+        assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
         // check assigned partitions
         assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0, t1p0, t2p0, t1p1, t2p1, t1p2, t2p2)),
             Utils.mkSet(new HashSet<>(assignments.get("consumer10").partitions())));
@@ -455,16 +458,16 @@ public class StreamsPartitionAssignorTest {
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
 
         subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics,
+                new ConsumerPartitionAssignor.Subscription(topics,
                         new SubscriptionInfo(uuid1, prevTasks10, emptyTasks, userEndPoint).encode()));
         subscriptions.put("consumer11",
-                new PartitionAssignor.Subscription(topics,
+                new ConsumerPartitionAssignor.Subscription(topics,
                         new SubscriptionInfo(uuid1, prevTasks11, emptyTasks, userEndPoint).encode()));
         subscriptions.put("consumer20",
-                new PartitionAssignor.Subscription(topics,
+                new ConsumerPartitionAssignor.Subscription(topics,
                         new SubscriptionInfo(uuid2, prevTasks20, emptyTasks, userEndPoint).encode()));
 
-        final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+        final Map<String, ConsumerPartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
 
         // check assigned partitions: since there is no previous task for topic 3 it will be assigned randomly so we cannot check exact match
         // also note that previously assigned partitions / tasks may not stay on the previous host since we may assign the new task first and
@@ -521,16 +524,16 @@ public class StreamsPartitionAssignorTest {
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
 
         subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics,
+                new ConsumerPartitionAssignor.Subscription(topics,
                         new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode()));
         subscriptions.put("consumer11",
-                new PartitionAssignor.Subscription(topics,
+                new ConsumerPartitionAssignor.Subscription(topics,
                         new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode()));
         subscriptions.put("consumer20",
-                new PartitionAssignor.Subscription(topics,
+                new ConsumerPartitionAssignor.Subscription(topics,
                         new SubscriptionInfo(uuid2, emptyTasks, emptyTasks, userEndPoint).encode()));
 
-        final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+        final Map<String, ConsumerPartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
 
         // check assigned partition size: since there is no previous task and there are two sub-topologies the assignment is random so we cannot check exact match
         assertEquals(2, assignments.get("consumer10").partitions().size());
@@ -609,16 +612,16 @@ public class StreamsPartitionAssignorTest {
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
 
         subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics,
+                new ConsumerPartitionAssignor.Subscription(topics,
                         new SubscriptionInfo(uuid1, prevTasks00, standbyTasks01, userEndPoint).encode()));
         subscriptions.put("consumer11",
-                new PartitionAssignor.Subscription(topics,
+                new ConsumerPartitionAssignor.Subscription(topics,
                         new SubscriptionInfo(uuid1, prevTasks01, standbyTasks02, userEndPoint).encode()));
         subscriptions.put("consumer20",
-                new PartitionAssignor.Subscription(topics,
+                new ConsumerPartitionAssignor.Subscription(topics,
                         new SubscriptionInfo(uuid2, prevTasks02, standbyTasks00, "any:9097").encode()));
 
-        final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+        final Map<String, ConsumerPartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
 
         // the first consumer
         final AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10"));
@@ -666,7 +669,7 @@ public class StreamsPartitionAssignorTest {
         standbyTasks.put(task2, Utils.mkSet(t3p2));
 
         final AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks, hostState);
-        final PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(asList(t3p0, t3p3), info.encode());
+        final ConsumerPartitionAssignor.Assignment assignment = new ConsumerPartitionAssignor.Assignment(asList(t3p0, t3p3), info.encode());
 
         final Capture<Cluster> capturedCluster = EasyMock.newCapture();
         taskManager.setPartitionsByHostState(hostState);
@@ -677,7 +680,7 @@ public class StreamsPartitionAssignorTest {
         EasyMock.expectLastCall();
         EasyMock.replay(taskManager);
 
-        partitionAssignor.onAssignment(assignment);
+        partitionAssignor.onAssignment(assignment, null);
 
         EasyMock.verify(taskManager);
 
@@ -704,10 +707,10 @@ public class StreamsPartitionAssignorTest {
         partitionAssignor.setInternalTopicManager(internalTopicManager);
 
         subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics,
+                new ConsumerPartitionAssignor.Subscription(topics,
                         new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode())
         );
-        partitionAssignor.assign(metadata, subscriptions);
+        partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
 
         // check prepared internal topics
         assertEquals(1, internalTopicManager.readyTopics.size());
@@ -738,10 +741,10 @@ public class StreamsPartitionAssignorTest {
         partitionAssignor.setInternalTopicManager(internalTopicManager);
 
         subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics,
+                new ConsumerPartitionAssignor.Subscription(topics,
                         new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode())
         );
-        partitionAssignor.assign(metadata, subscriptions);
+        partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
 
         // check prepared internal topics
         assertEquals(2, internalTopicManager.readyTopics.size());
@@ -790,11 +793,11 @@ public class StreamsPartitionAssignorTest {
         partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
 
         subscriptions.put(client,
-                new PartitionAssignor.Subscription(
+                new ConsumerPartitionAssignor.Subscription(
                         asList("topic1", "topic3"),
                         new SubscriptionInfo(uuid, emptyTasks, emptyTasks, userEndPoint).encode())
         );
-        final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, subscriptions);
+        final Map<String, ConsumerPartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
 
         final Map<String, Integer> expectedCreatedInternalTopics = new HashMap<>();
         expectedCreatedInternalTopics.put(applicationId + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 4);
@@ -841,7 +844,8 @@ public class StreamsPartitionAssignorTest {
             uuid1,
             builder);
         configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint));
-        final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("input"));
+        final Set<String> topics = Utils.mkSet("input");
+        final ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(new ArrayList<>(topics), partitionAssignor.subscriptionUserData(topics));
         final SubscriptionInfo subscriptionInfo = SubscriptionInfo.decode(subscription.userData());
         assertEquals("localhost:8080", subscriptionInfo.userEndPoint());
     }
@@ -863,11 +867,11 @@ public class StreamsPartitionAssignorTest {
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
 
         subscriptions.put("consumer1",
-                new PartitionAssignor.Subscription(topics,
+                new ConsumerPartitionAssignor.Subscription(topics,
                         new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode())
         );
-        final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
-        final PartitionAssignor.Assignment consumerAssignment = assignments.get("consumer1");
+        final Map<String, ConsumerPartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
+        final ConsumerPartitionAssignor.Assignment consumerAssignment = assignments.get("consumer1");
         final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumerAssignment.userData());
         final Set<TopicPartition> topicPartitions = assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080));
         assertEquals(
@@ -961,11 +965,11 @@ public class StreamsPartitionAssignorTest {
         partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
 
         subscriptions.put(client,
-                new PartitionAssignor.Subscription(
+                new ConsumerPartitionAssignor.Subscription(
                         Collections.singletonList("unknownTopic"),
                         new SubscriptionInfo(uuid, emptyTasks, emptyTasks, userEndPoint).encode())
         );
-        final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, subscriptions);
+        final Map<String, ConsumerPartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
 
         assertThat(mockInternalTopicManager.readyTopics.isEmpty(), equalTo(true));
 
@@ -985,7 +989,7 @@ public class StreamsPartitionAssignorTest {
         EasyMock.expectLastCall();
         EasyMock.replay(taskManager);
 
-        partitionAssignor.onAssignment(createAssignment(hostState));
+        partitionAssignor.onAssignment(createAssignment(hostState), null);
 
         EasyMock.verify(taskManager);
     }
@@ -1015,18 +1019,18 @@ public class StreamsPartitionAssignorTest {
             mockClientSupplier.restoreConsumer));
 
         subscriptions.put("consumer1",
-                new PartitionAssignor.Subscription(
+                new ConsumerPartitionAssignor.Subscription(
                         Collections.singletonList("topic1"),
                         new SubscriptionInfo(uuid, emptyTasks, emptyTasks, userEndPoint).encode())
         );
         subscriptions.put("consumer2",
-                new PartitionAssignor.Subscription(
+                new ConsumerPartitionAssignor.Subscription(
                         Collections.singletonList("topic1"),
                         new SubscriptionInfo(UUID.randomUUID(), emptyTasks, emptyTasks, "other:9090").encode())
         );
         final Set<TopicPartition> allPartitions = Utils.mkSet(t1p0, t1p1, t1p2);
-        final Map<String, PartitionAssignor.Assignment> assign = partitionAssignor.assign(metadata, subscriptions);
-        final PartitionAssignor.Assignment consumer1Assignment = assign.get("consumer1");
+        final Map<String, ConsumerPartitionAssignor.Assignment> assign = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
+        final ConsumerPartitionAssignor.Assignment consumer1Assignment = assign.get("consumer1");
         final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumer1Assignment.userData());
         final Set<TopicPartition> consumer1partitions = assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080));
         final Set<TopicPartition> consumer2Partitions = assignmentInfo.partitionsByHost().get(new HostInfo("other", 9090));
@@ -1109,12 +1113,12 @@ public class StreamsPartitionAssignorTest {
     private void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(final int smallestVersion,
                                                                                      final int otherVersion) {
         subscriptions.put("consumer1",
-                new PartitionAssignor.Subscription(
+                new ConsumerPartitionAssignor.Subscription(
                         Collections.singletonList("topic1"),
                         new SubscriptionInfo(smallestVersion, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode())
         );
         subscriptions.put("consumer2",
-                new PartitionAssignor.Subscription(
+                new ConsumerPartitionAssignor.Subscription(
                         Collections.singletonList("topic1"),
                         new SubscriptionInfo(otherVersion, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()
                 )
@@ -1126,7 +1130,7 @@ public class StreamsPartitionAssignorTest {
             UUID.randomUUID(),
             builder);
         partitionAssignor.configure(configProps());
-        final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, subscriptions);
+        final Map<String, ConsumerPartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
 
         assertThat(assignment.size(), equalTo(2));
         assertThat(AssignmentInfo.decode(assignment.get("consumer1").userData()).version(), equalTo(smallestVersion));
@@ -1142,7 +1146,8 @@ public class StreamsPartitionAssignorTest {
             builder);
         configurePartitionAssignor(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_0100));
 
-        final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1"));
+        final Set<String> topics = Utils.mkSet("topic1");
+        final ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(new ArrayList<>(topics), partitionAssignor.subscriptionUserData(topics));
 
         assertThat(SubscriptionInfo.decode(subscription.userData()).version(), equalTo(1));
     }
@@ -1180,7 +1185,8 @@ public class StreamsPartitionAssignorTest {
             builder);
         configurePartitionAssignor(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFromValue));
 
-        final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1"));
+        final Set<String> topics = Utils.mkSet("topic1");
+        final ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(new ArrayList<>(topics), partitionAssignor.subscriptionUserData(topics));
 
         assertThat(SubscriptionInfo.decode(subscription.userData()).version(), equalTo(2));
     }
@@ -1200,12 +1206,12 @@ public class StreamsPartitionAssignorTest {
         };
 
         subscriptions.put("consumer1",
-                new PartitionAssignor.Subscription(
+                new ConsumerPartitionAssignor.Subscription(
                         Collections.singletonList("topic1"),
                         new SubscriptionInfo(UUID.randomUUID(), activeTasks, standbyTasks, null).encode())
         );
         subscriptions.put("future-consumer",
-                new PartitionAssignor.Subscription(
+                new ConsumerPartitionAssignor.Subscription(
                         Collections.singletonList("topic1"),
                         encodeFutureSubscription())
         );
@@ -1216,7 +1222,7 @@ public class StreamsPartitionAssignorTest {
             UUID.randomUUID(),
             builder);
         partitionAssignor.configure(configProps());
-        final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, subscriptions);
+        final Map<String, ConsumerPartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
 
         assertThat(assignment.size(), equalTo(2));
         assertThat(
@@ -1252,12 +1258,12 @@ public class StreamsPartitionAssignorTest {
 
     private void shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(final int oldVersion) {
         subscriptions.put("consumer1",
-                new PartitionAssignor.Subscription(
+                new ConsumerPartitionAssignor.Subscription(
                         Collections.singletonList("topic1"),
                         new SubscriptionInfo(oldVersion, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode())
         );
         subscriptions.put("future-consumer",
-                new PartitionAssignor.Subscription(
+                new ConsumerPartitionAssignor.Subscription(
                         Collections.singletonList("topic1"),
                         encodeFutureSubscription())
         );
@@ -1270,24 +1276,24 @@ public class StreamsPartitionAssignorTest {
         partitionAssignor.configure(configProps());
 
         try {
-            partitionAssignor.assign(metadata, subscriptions);
+            partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
             fail("Should have thrown IllegalStateException");
         } catch (final IllegalStateException expected) {
             // pass
         }
     }
 
-    private PartitionAssignor.Assignment createAssignment(final Map<HostInfo, Set<TopicPartition>> firstHostState) {
+    private ConsumerPartitionAssignor.Assignment createAssignment(final Map<HostInfo, Set<TopicPartition>> firstHostState) {
         final AssignmentInfo info = new AssignmentInfo(Collections.emptyList(),
                                                        Collections.emptyMap(),
                                                        firstHostState);
 
-        return new PartitionAssignor.Assignment(
+        return new ConsumerPartitionAssignor.Assignment(
                 Collections.emptyList(), info.encode());
     }
 
     private AssignmentInfo checkAssignment(final Set<String> expectedTopics,
-                                           final PartitionAssignor.Assignment assignment) {
+                                           final ConsumerPartitionAssignor.Assignment assignment) {
 
         // This assumed 1) DefaultPartitionGrouper is used, and 2) there is an only one topic group.
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index 27bee81..0b2d0b3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -18,8 +18,9 @@ package org.apache.kafka.streams.tests;
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
@@ -113,7 +114,7 @@ public class StreamsUpgradeTest {
         }
 
         @Override
-        public Subscription subscription(final Set<String> topics) {
+        public ByteBuffer subscriptionUserData(final Set<String> topics) {
             // Adds the following information to subscription
             // 1. Client UUID (a unique id assigned to an instance of KafkaStreams)
             // 2. Task ids of previously running tasks
@@ -133,13 +134,13 @@ public class StreamsUpgradeTest {
 
             taskManager.updateSubscriptionsFromMetadata(topics);
 
-            return new Subscription(new ArrayList<>(topics), data.encode());
+            return data.encode();
         }
 
         @Override
-        public void onAssignment(final PartitionAssignor.Assignment assignment) {
+        public void onAssignment(final ConsumerPartitionAssignor.Assignment assignment, final ConsumerGroupMetadata metadata) {
             try {
-                super.onAssignment(assignment);
+                super.onAssignment(assignment, metadata);
                 return;
             } catch (final TaskAssignmentException cannotProcessFutureVersion) {
                 // continue
@@ -183,15 +184,15 @@ public class StreamsUpgradeTest {
         }
 
         @Override
-        public Map<String, Assignment> assign(final Cluster metadata,
-                                              final Map<String, Subscription> subscriptions) {
+        public GroupAssignment assign(final Cluster metadata, final GroupSubscription groupSubscription) {
+            final Map<String, Subscription> subscriptions = groupSubscription.groupSubscription();
             Map<String, Assignment> assignment = null;
 
             final Map<String, Subscription> downgradedSubscriptions = new HashMap<>();
             for (final Subscription subscription : subscriptions.values()) {
                 final SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
                 if (info.version() < SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1) {
-                    assignment = super.assign(metadata, subscriptions);
+                    assignment = super.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
                     break;
                 }
             }
@@ -219,7 +220,7 @@ public class StreamsUpgradeTest {
                                 info.userEndPoint())
                                 .encode()));
                 }
-                assignment = super.assign(metadata, downgradedSubscriptions);
+                assignment = super.assign(metadata, new GroupSubscription(downgradedSubscriptions)).groupAssignment();
                 bumpUsedVersion = true;
                 bumpSupportedVersion = true;
             }
@@ -238,7 +239,7 @@ public class StreamsUpgradeTest {
                             .encode()));
             }
 
-            return newAssignment;
+            return new GroupAssignment(newAssignment);
         }
     }
 


Mime
View raw message