kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [7/8] kafka git commit: KAFKA-2464: client-side assignment for new consumer
Date Wed, 21 Oct 2015 19:08:51 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
new file mode 100644
index 0000000..0020993
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * ConsumerProtocol contains the schemas for consumer subscriptions and assignments for use with
+ * Kafka's generalized group management protocol. Below is the version 0 format:
+ *
+ * <pre>
+ * Subscription => Version Topics
+ *   Version    => Int16
+ *   Topics     => [String]
+ *   UserData   => Bytes
+ *
+ * Assignment => Version TopicPartitions
+ *   Version         => int16
+ *   TopicPartitions => [Topic Partitions]
+ *     Topic         => String
+ *     Partitions    => [int32]
+ * </pre>
+ *
+ * The current implementation assumes that future versions will not break compatibility. When
+ * it encounters a newer version, it parses it using the current format. This basically means
+ * that new versions cannot remove or reorder any of the existing fields.
+ */
+public class ConsumerProtocol {
+
+    public static final String VERSION_KEY_NAME = "version";
+    public static final String TOPICS_KEY_NAME = "topics";
+    public static final String TOPIC_KEY_NAME = "topic";
+    public static final String PARTITIONS_KEY_NAME = "partitions";
+    public static final String TOPIC_PARTITIONS_KEY_NAME = "topic_partitions";
+    public static final String USER_DATA_KEY_NAME = "user_data";
+
+    public static final short CONSUMER_PROTOCOL_V0 = 0;
+    public static final Schema CONSUMER_PROTOCOL_HEADER_SCHEMA = new Schema(
+            new Field(VERSION_KEY_NAME, Type.INT16));
+    private static final Struct CONSUMER_PROTOCOL_HEADER_V0 = new Struct(CONSUMER_PROTOCOL_HEADER_SCHEMA)
+            .set(VERSION_KEY_NAME, CONSUMER_PROTOCOL_V0);
+
+    public static final Schema SUBSCRIPTION_V0 = new Schema(
+            new Field(TOPICS_KEY_NAME, new ArrayOf(Type.STRING)),
+            new Field(USER_DATA_KEY_NAME, Type.BYTES));
+    public static final Schema TOPIC_ASSIGNMENT_V0 = new Schema(
+            new Field(TOPIC_KEY_NAME, Type.STRING),
+            new Field(PARTITIONS_KEY_NAME, new ArrayOf(Type.INT32)));
+    public static final Schema ASSIGNMENT_V0 = new Schema(
+            new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(TOPIC_ASSIGNMENT_V0)),
+            new Field(USER_DATA_KEY_NAME, Type.BYTES));
+
+    public static ByteBuffer serializeSubscription(PartitionAssignor.Subscription subscription) {
+        Struct struct = new Struct(SUBSCRIPTION_V0);
+        struct.set(USER_DATA_KEY_NAME, subscription.userData());
+        struct.set(TOPICS_KEY_NAME, subscription.topics().toArray());
+        ByteBuffer buffer = ByteBuffer.allocate(CONSUMER_PROTOCOL_HEADER_V0.sizeOf() + SUBSCRIPTION_V0.sizeOf(struct));
+        CONSUMER_PROTOCOL_HEADER_V0.writeTo(buffer);
+        SUBSCRIPTION_V0.write(buffer, struct);
+        buffer.flip();
+        return buffer;
+    }
+
+    public static PartitionAssignor.Subscription deserializeSubscription(ByteBuffer buffer) {
+        Struct header = (Struct) CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
+        Short version = header.getShort(VERSION_KEY_NAME);
+        checkVersionCompatibility(version);
+        Struct struct = (Struct) SUBSCRIPTION_V0.read(buffer);
+        ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME);
+        List<String> topics = new ArrayList<>();
+        for (Object topicObj : struct.getArray(TOPICS_KEY_NAME))
+            topics.add((String) topicObj);
+        return new PartitionAssignor.Subscription(topics, userData);
+    }
+
+    public static PartitionAssignor.Assignment deserializeAssignment(ByteBuffer buffer) {
+        Struct header = (Struct) CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
+        Short version = header.getShort(VERSION_KEY_NAME);
+        checkVersionCompatibility(version);
+        Struct struct = (Struct) ASSIGNMENT_V0.read(buffer);
+        ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME);
+        List<TopicPartition> partitions = new ArrayList<>();
+        for (Object structObj : struct.getArray(TOPIC_PARTITIONS_KEY_NAME)) {
+            Struct assignment = (Struct) structObj;
+            String topic = assignment.getString(TOPIC_KEY_NAME);
+            for (Object partitionObj : assignment.getArray(PARTITIONS_KEY_NAME)) {
+                Integer partition = (Integer) partitionObj;
+                partitions.add(new TopicPartition(topic, partition));
+            }
+        }
+        return new PartitionAssignor.Assignment(partitions, userData);
+    }
+
+    public static ByteBuffer serializeAssignment(PartitionAssignor.Assignment assignment) {
+        Struct struct = new Struct(ASSIGNMENT_V0);
+        struct.set(USER_DATA_KEY_NAME, assignment.userData());
+        List<Struct> topicAssignments = new ArrayList<>();
+        for (Map.Entry<String, List<Integer>> topicEntry : asMap(assignment.partitions()).entrySet()) {
+            Struct topicAssignment = new Struct(TOPIC_ASSIGNMENT_V0);
+            topicAssignment.set(TOPIC_KEY_NAME, topicEntry.getKey());
+            topicAssignment.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray());
+            topicAssignments.add(topicAssignment);
+        }
+        struct.set(TOPIC_PARTITIONS_KEY_NAME, topicAssignments.toArray());
+        ByteBuffer buffer = ByteBuffer.allocate(CONSUMER_PROTOCOL_HEADER_V0.sizeOf() + ASSIGNMENT_V0.sizeOf(struct));
+        CONSUMER_PROTOCOL_HEADER_V0.writeTo(buffer);
+        ASSIGNMENT_V0.write(buffer, struct);
+        buffer.flip();
+        return buffer;
+    }
+
+    private static void checkVersionCompatibility(short version) {
+        // check for invalid versions
+        if (version < CONSUMER_PROTOCOL_V0)
+            throw new SchemaException("Unsupported subscription version: " + version);
+
+        // otherwise, assume versions can be parsed as V0
+    }
+
+
+    private static Map<String, List<Integer>> asMap(Collection<TopicPartition> partitions) {
+        Map<String, List<Integer>> partitionMap = new HashMap<>();
+        for (TopicPartition partition : partitions) {
+            String topic = partition.topic();
+            List<Integer> topicPartitions = partitionMap.get(topic);
+            if (topicPartitions == null) {
+                topicPartitions = new ArrayList<>();
+                partitionMap.put(topic, topicPartitions);
+            }
+            topicPartitions.add(partition.partition());
+        }
+        return partitionMap;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
deleted file mode 100644
index 98193e8..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
+++ /dev/null
@@ -1,848 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.clients.consumer.internals;
-
-import org.apache.kafka.clients.ClientResponse;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerWakeupException;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.consumer.OffsetCommitCallback;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.DisconnectException;
-import org.apache.kafka.common.errors.UnknownConsumerIdException;
-import org.apache.kafka.common.metrics.Measurable;
-import org.apache.kafka.common.metrics.MetricConfig;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.metrics.stats.Avg;
-import org.apache.kafka.common.metrics.stats.Count;
-import org.apache.kafka.common.metrics.stats.Max;
-import org.apache.kafka.common.metrics.stats.Rate;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.ConsumerMetadataRequest;
-import org.apache.kafka.common.requests.ConsumerMetadataResponse;
-import org.apache.kafka.common.requests.HeartbeatRequest;
-import org.apache.kafka.common.requests.HeartbeatResponse;
-import org.apache.kafka.common.requests.JoinGroupRequest;
-import org.apache.kafka.common.requests.JoinGroupResponse;
-import org.apache.kafka.common.requests.OffsetCommitRequest;
-import org.apache.kafka.common.requests.OffsetCommitResponse;
-import org.apache.kafka.common.requests.OffsetFetchRequest;
-import org.apache.kafka.common.requests.OffsetFetchResponse;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-/**
- * This class manages the coordination process with the consumer coordinator.
- */
-public final class Coordinator implements Closeable {
-
-    private static final Logger log = LoggerFactory.getLogger(Coordinator.class);
-
-    private final ConsumerNetworkClient client;
-    private final Time time;
-    private final String groupId;
-    private final Heartbeat heartbeat;
-    private final HeartbeatTask heartbeatTask;
-    private final int sessionTimeoutMs;
-    private final String assignmentStrategy;
-    private final SubscriptionState subscriptions;
-    private final CoordinatorMetrics sensors;
-    private final long requestTimeoutMs;
-    private final long retryBackoffMs;
-    private final OffsetCommitCallback defaultOffsetCommitCallback;
-    private final boolean autoCommitEnabled;
-
-    private Node consumerCoordinator;
-    private String consumerId;
-    private int generation;
-
-    /**
-     * Initialize the coordination manager.
-     */
-    public Coordinator(ConsumerNetworkClient client,
-                       String groupId,
-                       int sessionTimeoutMs,
-                       int heartbeatIntervalMs,
-                       String assignmentStrategy,
-                       SubscriptionState subscriptions,
-                       Metrics metrics,
-                       String metricGrpPrefix,
-                       Map<String, String> metricTags,
-                       Time time,
-                       long requestTimeoutMs,
-                       long retryBackoffMs,
-                       OffsetCommitCallback defaultOffsetCommitCallback,
-                       boolean autoCommitEnabled,
-                       long autoCommitIntervalMs) {
-        this.client = client;
-        this.time = time;
-        this.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID;
-        this.consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
-        this.groupId = groupId;
-        this.consumerCoordinator = null;
-        this.subscriptions = subscriptions;
-        this.sessionTimeoutMs = sessionTimeoutMs;
-        this.assignmentStrategy = assignmentStrategy;
-        this.heartbeat = new Heartbeat(this.sessionTimeoutMs, heartbeatIntervalMs, time.milliseconds());
-        this.heartbeatTask = new HeartbeatTask();
-        this.sensors = new CoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
-        this.requestTimeoutMs = requestTimeoutMs;
-        this.retryBackoffMs = retryBackoffMs;
-        this.defaultOffsetCommitCallback = defaultOffsetCommitCallback;
-        this.autoCommitEnabled = autoCommitEnabled;
-
-        if (autoCommitEnabled)
-            scheduleAutoCommitTask(autoCommitIntervalMs);
-    }
-
-    /**
-     * Refresh the committed offsets for provided partitions.
-     */
-    public void refreshCommittedOffsetsIfNeeded() {
-        if (subscriptions.refreshCommitsNeeded()) {
-            Map<TopicPartition, OffsetAndMetadata> offsets = fetchCommittedOffsets(subscriptions.assignedPartitions());
-            for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
-                TopicPartition tp = entry.getKey();
-                // verify assignment is still active
-                if (subscriptions.isAssigned(tp))
-                    this.subscriptions.committed(tp, entry.getValue());
-            }
-            this.subscriptions.commitsRefreshed();
-        }
-    }
-
-    /**
-     * Fetch the current committed offsets from the coordinator for a set of partitions.
-     * @param partitions The partitions to fetch offsets for
-     * @return A map from partition to the committed offset
-     */
-    public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(Set<TopicPartition> partitions) {
-        while (true) {
-            ensureCoordinatorKnown();
-
-            // contact coordinator to fetch committed offsets
-            RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future = sendOffsetFetchRequest(partitions);
-            client.poll(future);
-
-            if (future.succeeded())
-                return future.value();
-
-            if (!future.isRetriable())
-                throw future.exception();
-
-            Utils.sleep(retryBackoffMs);
-        }
-    }
-
-    /**
-     * Ensure that we have a valid partition assignment from the coordinator.
-     */
-    public void ensurePartitionAssignment() {
-        if (!subscriptions.partitionAssignmentNeeded())
-            return;
-
-        // commit offsets prior to rebalance if auto-commit enabled
-        maybeAutoCommitOffsetsSync();
-
-        ConsumerRebalanceListener listener = subscriptions.listener();
-
-        // execute the user's listener before rebalance
-        log.debug("Revoking previously assigned partitions {}", this.subscriptions.assignedPartitions());
-        try {
-            Set<TopicPartition> revoked = new HashSet<>(subscriptions.assignedPartitions());
-            listener.onPartitionsRevoked(revoked);
-        } catch (Exception e) {
-            log.error("User provided listener " + listener.getClass().getName()
-                    + " failed on partition revocation: ", e);
-        }
-
-        reassignPartitions();
-
-        // execute the user's listener after rebalance
-        log.debug("Setting newly assigned partitions {}", this.subscriptions.assignedPartitions());
-        try {
-            Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions());
-            listener.onPartitionsAssigned(assigned);
-        } catch (Exception e) {
-            log.error("User provided listener " + listener.getClass().getName()
-                    + " failed on partition assignment: ", e);
-        }
-    }
-
-    private void reassignPartitions() {
-        while (subscriptions.partitionAssignmentNeeded()) {
-            ensureCoordinatorKnown();
-
-            // ensure that there are no pending requests to the coordinator. This is important
-            // in particular to avoid resending a pending JoinGroup request.
-            if (client.pendingRequestCount(this.consumerCoordinator) > 0) {
-                client.awaitPendingRequests(this.consumerCoordinator);
-                continue;
-            }
-
-            RequestFuture<Void> future = sendJoinGroupRequest();
-            client.poll(future);
-
-            if (future.failed()) {
-                if (future.exception() instanceof UnknownConsumerIdException)
-                    continue;
-                else if (!future.isRetriable())
-                    throw future.exception();
-                Utils.sleep(retryBackoffMs);
-            }
-        }
-    }
-
-    /**
-     * Block until the coordinator for this group is known.
-     */
-    public void ensureCoordinatorKnown() {
-        while (coordinatorUnknown()) {
-            RequestFuture<Void> future = sendConsumerMetadataRequest();
-            client.poll(future, requestTimeoutMs);
-
-            if (future.failed())
-                client.awaitMetadataUpdate();
-        }
-    }
-
-
-    @Override
-    public void close() {
-        // commit offsets prior to closing if auto-commit enabled
-        while (true) {
-            try {
-                maybeAutoCommitOffsetsSync();
-                return;
-            } catch (ConsumerWakeupException e) {
-                // ignore wakeups while closing to ensure we have a chance to commit
-                continue;
-            }
-        }
-    }
-
-    private class HeartbeatTask implements DelayedTask {
-
-        public void reset() {
-            // start or restart the heartbeat task to be executed at the next chance
-            long now = time.milliseconds();
-            heartbeat.resetSessionTimeout(now);
-            client.unschedule(this);
-            client.schedule(this, now);
-        }
-
-        @Override
-        public void run(final long now) {
-            if (!subscriptions.partitionsAutoAssigned() ||
-                    subscriptions.partitionAssignmentNeeded() ||
-                    coordinatorUnknown())
-                // no need to send if we're not using auto-assignment or if we are
-                // awaiting a rebalance
-                return;
-
-            if (heartbeat.sessionTimeoutExpired(now)) {
-                // we haven't received a successful heartbeat in one session interval
-                // so mark the coordinator dead
-                coordinatorDead();
-                return;
-            }
-
-            if (!heartbeat.shouldHeartbeat(now)) {
-                // we don't need to heartbeat now, so reschedule for when we do
-                client.schedule(this, now + heartbeat.timeToNextHeartbeat(now));
-            } else {
-                heartbeat.sentHeartbeat(now);
-                RequestFuture<Void> future = sendHeartbeatRequest();
-                future.addListener(new RequestFutureListener<Void>() {
-                    @Override
-                    public void onSuccess(Void value) {
-                        long now = time.milliseconds();
-                        heartbeat.receiveHeartbeat(now);
-                        long nextHeartbeatTime = now + heartbeat.timeToNextHeartbeat(now);
-                        client.schedule(HeartbeatTask.this, nextHeartbeatTime);
-                    }
-
-                    @Override
-                    public void onFailure(RuntimeException e) {
-                        client.schedule(HeartbeatTask.this, time.milliseconds() + retryBackoffMs);
-                    }
-                });
-            }
-        }
-    }
-
-    /**
-     * Send a request to get a new partition assignment. This is a non-blocking call which sends
-     * a JoinGroup request to the coordinator (if it is available). The returned future must
-     * be polled to see if the request completed successfully.
-     * @return A request future whose completion indicates the result of the JoinGroup request.
-     */
-    private RequestFuture<Void> sendJoinGroupRequest() {
-        if (coordinatorUnknown())
-            return RequestFuture.coordinatorNotAvailable();
-
-        // send a join group request to the coordinator
-        List<String> subscribedTopics = new ArrayList<String>(subscriptions.subscription());
-        log.debug("(Re-)joining group {} with subscribed topics {}", groupId, subscribedTopics);
-
-        JoinGroupRequest request = new JoinGroupRequest(groupId,
-                this.sessionTimeoutMs,
-                subscribedTopics,
-                this.consumerId,
-                this.assignmentStrategy);
-
-        // create the request for the coordinator
-        log.debug("Issuing request ({}: {}) to coordinator {}", ApiKeys.JOIN_GROUP, request, this.consumerCoordinator.id());
-        return client.send(consumerCoordinator, ApiKeys.JOIN_GROUP, request)
-                .compose(new JoinGroupResponseHandler());
-    }
-
-    private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, Void> {
-
-        @Override
-        public JoinGroupResponse parse(ClientResponse response) {
-            return new JoinGroupResponse(response.responseBody());
-        }
-
-        @Override
-        public void handle(JoinGroupResponse joinResponse, RequestFuture<Void> future) {
-            // process the response
-            short errorCode = joinResponse.errorCode();
-
-            if (errorCode == Errors.NONE.code()) {
-                Coordinator.this.consumerId = joinResponse.consumerId();
-                Coordinator.this.generation = joinResponse.generationId();
-
-                // set the flag to refresh last committed offsets
-                subscriptions.needRefreshCommits();
-
-                log.debug("Joined group: {}", joinResponse.toStruct());
-
-                // record re-assignment time
-                sensors.partitionReassignments.record(response.requestLatencyMs());
-
-                // update partition assignment
-                subscriptions.changePartitionAssignment(joinResponse.assignedPartitions());
-                heartbeatTask.reset();
-                future.complete(null);
-            } else if (errorCode == Errors.UNKNOWN_CONSUMER_ID.code()) {
-                // reset the consumer id and retry immediately
-                Coordinator.this.consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
-                log.info("Attempt to join group {} failed due to unknown consumer id, resetting and retrying.",
-                        groupId);
-                future.raise(Errors.UNKNOWN_CONSUMER_ID);
-            } else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
-                    || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
-                // re-discover the coordinator and retry with backoff
-                coordinatorDead();
-                log.info("Attempt to join group {} failed due to obsolete coordinator information, retrying.",
-                        groupId);
-                future.raise(Errors.forCode(errorCode));
-            } else if (errorCode == Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code()
-                    || errorCode == Errors.INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY.code()
-                    || errorCode == Errors.INVALID_SESSION_TIMEOUT.code()) {
-                // log the error and re-throw the exception
-                Errors error = Errors.forCode(errorCode);
-                log.error("Attempt to join group {} failed due to: {}",
-                        groupId, error.exception().getMessage());
-                future.raise(error);
-            } else {
-                // unexpected error, throw the exception
-                future.raise(new KafkaException("Unexpected error in join group response: "
-                        + Errors.forCode(joinResponse.errorCode()).exception().getMessage()));
-            }
-        }
-    }
-
-    public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
-        this.subscriptions.needRefreshCommits();
-        RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
-        final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;
-        future.addListener(new RequestFutureListener<Void>() {
-            @Override
-            public void onSuccess(Void value) {
-                cb.onComplete(offsets, null);
-            }
-
-            @Override
-            public void onFailure(RuntimeException e) {
-                cb.onComplete(offsets, e);
-            }
-        });
-    }
-
-    public void commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
-        if (offsets.isEmpty())
-            return;
-
-        while (true) {
-            ensureCoordinatorKnown();
-
-            RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
-            client.poll(future);
-
-            if (future.succeeded()) {
-                return;
-            }
-
-            if (!future.isRetriable()) {
-                throw future.exception();
-            }
-
-            Utils.sleep(retryBackoffMs);
-        }
-    }
-
-    private void scheduleAutoCommitTask(final long interval) {
-        DelayedTask task = new DelayedTask() {
-            public void run(long now) {
-                commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() {
-                    @Override
-                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
-                        if (exception != null)
-                            log.error("Auto offset commit failed.", exception);
-                    }
-                });
-                client.schedule(this, now + interval);
-            }
-        };
-        client.schedule(task, time.milliseconds() + interval);
-    }
-
-    private void maybeAutoCommitOffsetsSync() {
-        if (autoCommitEnabled) {
-            try {
-                commitOffsetsSync(subscriptions.allConsumed());
-            } catch (ConsumerWakeupException e) {
-                // rethrow wakeups since they are triggered by the user
-                throw e;
-            } catch (Exception e) {
-                // consistent with async auto-commit failures, we do not propagate the exception
-                log.error("Auto offset commit failed.", e);
-            }
-        }
-    }
-
-    /**
-     * Reset the generation/consumerId tracked by this consumer.
-     */
-    public void resetGeneration() {
-        this.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID;
-        this.consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
-    }
-
-    /**
-     * Commit offsets for the specified list of topics and partitions. This is a non-blocking call
-     * which returns a request future that can be polled in the case of a synchronous commit or ignored in the
-     * asynchronous case.
-     *
-     * @param offsets The list of offsets per partition that should be committed.
-     * @return A request future whose value indicates whether the commit was successful or not
-     */
-    private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
-        if (coordinatorUnknown())
-            return RequestFuture.coordinatorNotAvailable();
-
-        if (offsets.isEmpty())
-            return RequestFuture.voidSuccess();
-
-        // create the offset commit request
-        Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<>(offsets.size());
-        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
-            OffsetAndMetadata offsetAndMetadata = entry.getValue();
-            offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(
-                    offsetAndMetadata.offset(), offsetAndMetadata.metadata()));
-        }
-
-        OffsetCommitRequest req = new OffsetCommitRequest(this.groupId,
-                this.generation,
-                this.consumerId,
-                OffsetCommitRequest.DEFAULT_RETENTION_TIME,
-                offsetData);
-
-        return client.send(consumerCoordinator, ApiKeys.OFFSET_COMMIT, req)
-                .compose(new OffsetCommitResponseHandler(offsets));
-    }
-
-    public static class DefaultOffsetCommitCallback implements OffsetCommitCallback {
-        @Override
-        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
-            if (exception != null)
-                log.error("Offset commit failed.", exception);
-        }
-    }
-
-    private class OffsetCommitResponseHandler extends CoordinatorResponseHandler<OffsetCommitResponse, Void> {
-
-        private final Map<TopicPartition, OffsetAndMetadata> offsets;
-
-        public OffsetCommitResponseHandler(Map<TopicPartition, OffsetAndMetadata> offsets) {
-            this.offsets = offsets;
-        }
-
-        @Override
-        public OffsetCommitResponse parse(ClientResponse response) {
-            return new OffsetCommitResponse(response.responseBody());
-        }
-
-        @Override
-        public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {
-            sensors.commitLatency.record(response.requestLatencyMs());
-            for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) {
-                TopicPartition tp = entry.getKey();
-                OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp);
-                long offset = offsetAndMetadata.offset();
-
-                short errorCode = entry.getValue();
-                if (errorCode == Errors.NONE.code()) {
-                    log.debug("Committed offset {} for partition {}", offset, tp);
-                    if (subscriptions.isAssigned(tp))
-                        // update the local cache only if the partition is still assigned
-                        subscriptions.committed(tp, offsetAndMetadata);
-                } else {
-                    if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
-                            || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
-                        coordinatorDead();
-                    } else if (errorCode == Errors.UNKNOWN_CONSUMER_ID.code()
-                            || errorCode == Errors.ILLEGAL_GENERATION.code()) {
-                        // need to re-join group
-                        subscriptions.needReassignment();
-                    }
-
-                    log.error("Error committing partition {} at offset {}: {}",
-                            tp,
-                            offset,
-                            Errors.forCode(errorCode).exception().getMessage());
-
-                    future.raise(Errors.forCode(errorCode));
-                    return;
-                }
-            }
-
-            future.complete(null);
-        }
-    }
-
-    /**
-     * Fetch the committed offsets for a set of partitions. This is a non-blocking call. The
-     * returned future can be polled to get the actual offsets returned from the broker.
-     *
-     * @param partitions The set of partitions to get offsets for.
-     * @return A request future containing the committed offsets.
-     */
-    private RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchRequest(Set<TopicPartition> partitions) {
-        if (coordinatorUnknown())
-            return RequestFuture.coordinatorNotAvailable();
-
-        log.debug("Fetching committed offsets for partitions: {}",  Utils.join(partitions, ", "));
-        // construct the request
-        OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<TopicPartition>(partitions));
-
-        // send the request with a callback
-        return client.send(consumerCoordinator, ApiKeys.OFFSET_FETCH, request)
-                .compose(new OffsetFetchResponseHandler());
-    }
-
-    private class OffsetFetchResponseHandler extends CoordinatorResponseHandler<OffsetFetchResponse, Map<TopicPartition, OffsetAndMetadata>> {
-
-        @Override
-        public OffsetFetchResponse parse(ClientResponse response) {
-            return new OffsetFetchResponse(response.responseBody());
-        }
-
-        @Override
-        public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
-            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(response.responseData().size());
-            for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) {
-                TopicPartition tp = entry.getKey();
-                OffsetFetchResponse.PartitionData data = entry.getValue();
-                if (data.hasError()) {
-                    log.debug("Error fetching offset for topic-partition {}: {}", tp, Errors.forCode(data.errorCode)
-                            .exception()
-                            .getMessage());
-                    if (data.errorCode == Errors.OFFSET_LOAD_IN_PROGRESS.code()) {
-                        // just retry
-                        future.raise(Errors.OFFSET_LOAD_IN_PROGRESS);
-                    } else if (data.errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
-                        // re-discover the coordinator and retry
-                        coordinatorDead();
-                        future.raise(Errors.NOT_COORDINATOR_FOR_CONSUMER);
-                    } else if (data.errorCode == Errors.UNKNOWN_CONSUMER_ID.code()
-                            || data.errorCode == Errors.ILLEGAL_GENERATION.code()) {
-                        // need to re-join group
-                        subscriptions.needReassignment();
-                        future.raise(Errors.forCode(data.errorCode));
-                    } else {
-                        future.raise(new KafkaException("Unexpected error in fetch offset response: "
-                                + Errors.forCode(data.errorCode).exception().getMessage()));
-                    }
-                    return;
-                } else if (data.offset >= 0) {
-                    // record the position with the offset (-1 indicates no committed offset to fetch)
-                    offsets.put(tp, new OffsetAndMetadata(data.offset, data.metadata));
-                } else {
-                    log.debug("No committed offset for partition " + tp);
-                }
-            }
-
-            future.complete(offsets);
-        }
-    }
-
-    /**
-     * Send a heartbeat request now (visible only for testing).
-     */
-    public RequestFuture<Void> sendHeartbeatRequest() {
-        HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation, this.consumerId);
-        return client.send(consumerCoordinator, ApiKeys.HEARTBEAT, req)
-                .compose(new HeartbeatCompletionHandler());
-    }
-
-    public boolean coordinatorUnknown() {
-        return this.consumerCoordinator == null;
-    }
-
-    /**
-     * Discover the current coordinator for the consumer group. Sends a ConsumerMetadata request to
-     * one of the brokers. The returned future should be polled to get the result of the request.
-     * @return A request future which indicates the completion of the metadata request
-     */
-    private RequestFuture<Void> sendConsumerMetadataRequest() {
-        // initiate the consumer metadata request
-        // find a node to ask about the coordinator
-        Node node = this.client.leastLoadedNode();
-        if (node == null) {
-            // TODO: If there are no brokers left, perhaps we should use the bootstrap set
-            // from configuration?
-            return RequestFuture.noBrokersAvailable();
-        } else {
-            // create a consumer metadata request
-            log.debug("Issuing consumer metadata request to broker {}", node.id());
-            ConsumerMetadataRequest metadataRequest = new ConsumerMetadataRequest(this.groupId);
-            return client.send(node, ApiKeys.CONSUMER_METADATA, metadataRequest)
-                    .compose(new RequestFutureAdapter<ClientResponse, Void>() {
-                        @Override
-                        public void onSuccess(ClientResponse response, RequestFuture<Void> future) {
-                            handleConsumerMetadataResponse(response, future);
-                        }
-                    });
-        }
-    }
-
-    private void handleConsumerMetadataResponse(ClientResponse resp, RequestFuture<Void> future) {
-        log.debug("Consumer metadata response {}", resp);
-
-        // parse the response to get the coordinator info if it is not disconnected,
-        // otherwise we need to request metadata update
-        if (resp.wasDisconnected()) {
-            future.raise(new DisconnectException());
-        } else if (!coordinatorUnknown()) {
-            // We already found the coordinator, so ignore the request
-            future.complete(null);
-        } else {
-            ConsumerMetadataResponse consumerMetadataResponse = new ConsumerMetadataResponse(resp.responseBody());
-            // use MAX_VALUE - node.id as the coordinator id to mimic separate connections
-            // for the coordinator in the underlying network client layer
-            // TODO: this needs to be better handled in KAFKA-1935
-            if (consumerMetadataResponse.errorCode() == Errors.NONE.code()) {
-                this.consumerCoordinator = new Node(Integer.MAX_VALUE - consumerMetadataResponse.node().id(),
-                        consumerMetadataResponse.node().host(),
-                        consumerMetadataResponse.node().port());
-                heartbeatTask.reset();
-                future.complete(null);
-            } else {
-                future.raise(Errors.forCode(consumerMetadataResponse.errorCode()));
-            }
-        }
-    }
-
-    /**
-     * Mark the current coordinator as dead.
-     */
-    private void coordinatorDead() {
-        if (this.consumerCoordinator != null) {
-            log.info("Marking the coordinator {} dead.", this.consumerCoordinator.id());
-            this.consumerCoordinator = null;
-        }
-    }
-
-    private class HeartbeatCompletionHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
-        @Override
-        public HeartbeatResponse parse(ClientResponse response) {
-            return new HeartbeatResponse(response.responseBody());
-        }
-
-        @Override
-        public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
-            sensors.heartbeatLatency.record(response.requestLatencyMs());
-            short error = heartbeatResponse.errorCode();
-            if (error == Errors.NONE.code()) {
-                log.debug("Received successful heartbeat response.");
-                future.complete(null);
-            } else if (error == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
-                    || error == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
-                log.info("Attempt to heart beat failed since coordinator is either not started or not valid, marking it as dead.");
-                coordinatorDead();
-                future.raise(Errors.forCode(error));
-            } else if (error == Errors.REBALANCE_IN_PROGRESS.code()) {
-                log.info("Attempt to heart beat failed since the group is rebalancing, try to re-join group.");
-                subscriptions.needReassignment();
-                future.raise(Errors.REBALANCE_IN_PROGRESS);
-            } else if (error == Errors.ILLEGAL_GENERATION.code()) {
-                log.info("Attempt to heart beat failed since generation id is not legal, try to re-join group.");
-                subscriptions.needReassignment();
-                future.raise(Errors.ILLEGAL_GENERATION);
-            } else if (error == Errors.UNKNOWN_CONSUMER_ID.code()) {
-                log.info("Attempt to heart beat failed since consumer id is not valid, reset it and try to re-join group.");
-                consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
-                subscriptions.needReassignment();
-                future.raise(Errors.UNKNOWN_CONSUMER_ID);
-            } else {
-                future.raise(new KafkaException("Unexpected error in heartbeat response: "
-                        + Errors.forCode(error).exception().getMessage()));
-            }
-        }
-    }
-
-    private abstract class CoordinatorResponseHandler<R, T>
-            extends RequestFutureAdapter<ClientResponse, T> {
-        protected ClientResponse response;
-
-        public abstract R parse(ClientResponse response);
-
-        public abstract void handle(R response, RequestFuture<T> future);
-
-        @Override
-        public void onSuccess(ClientResponse clientResponse, RequestFuture<T> future) {
-            this.response = clientResponse;
-
-            if (clientResponse.wasDisconnected()) {
-                int correlation = response.request().request().header().correlationId();
-                log.debug("Cancelled request {} with correlation id {} due to coordinator {} being disconnected",
-                        response.request(),
-                        correlation,
-                        response.request().request().destination());
-
-                // mark the coordinator as dead
-                coordinatorDead();
-                future.raise(new DisconnectException());
-                return;
-            }
-
-            R response = parse(clientResponse);
-            handle(response, future);
-        }
-
-        @Override
-        public void onFailure(RuntimeException e, RequestFuture<T> future) {
-            if (e instanceof DisconnectException) {
-                log.debug("Coordinator request failed", e);
-                coordinatorDead();
-            }
-            future.raise(e);
-        }
-    }
-
-
-    private class CoordinatorMetrics {
-        public final Metrics metrics;
-        public final String metricGrpName;
-
-        public final Sensor commitLatency;
-        public final Sensor heartbeatLatency;
-        public final Sensor partitionReassignments;
-
-        public CoordinatorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) {
-            this.metrics = metrics;
-            this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
-
-            this.commitLatency = metrics.sensor("commit-latency");
-            this.commitLatency.add(new MetricName("commit-latency-avg",
-                this.metricGrpName,
-                "The average time taken for a commit request",
-                tags), new Avg());
-            this.commitLatency.add(new MetricName("commit-latency-max",
-                this.metricGrpName,
-                "The max time taken for a commit request",
-                tags), new Max());
-            this.commitLatency.add(new MetricName("commit-rate",
-                this.metricGrpName,
-                "The number of commit calls per second",
-                tags), new Rate(new Count()));
-
-            this.heartbeatLatency = metrics.sensor("heartbeat-latency");
-            this.heartbeatLatency.add(new MetricName("heartbeat-response-time-max",
-                this.metricGrpName,
-                "The max time taken to receive a response to a hearbeat request",
-                tags), new Max());
-            this.heartbeatLatency.add(new MetricName("heartbeat-rate",
-                this.metricGrpName,
-                "The average number of heartbeats per second",
-                tags), new Rate(new Count()));
-
-            this.partitionReassignments = metrics.sensor("reassignment-latency");
-            this.partitionReassignments.add(new MetricName("reassignment-time-avg",
-                this.metricGrpName,
-                "The average time taken for a partition reassignment",
-                tags), new Avg());
-            this.partitionReassignments.add(new MetricName("reassignment-time-max",
-                this.metricGrpName,
-                "The max time taken for a partition reassignment",
-                tags), new Avg());
-            this.partitionReassignments.add(new MetricName("reassignment-rate",
-                this.metricGrpName,
-                "The number of partition reassignments per second",
-                tags), new Rate(new Count()));
-
-            Measurable numParts =
-                new Measurable() {
-                    public double measure(MetricConfig config, long now) {
-                        return subscriptions.assignedPartitions().size();
-                    }
-                };
-            metrics.addMetric(new MetricName("assigned-partitions",
-                this.metricGrpName,
-                "The number of partitions currently assigned to this consumer",
-                tags),
-                numParts);
-
-            Measurable lastHeartbeat =
-                new Measurable() {
-                    public double measure(MetricConfig config, long now) {
-                        return TimeUnit.SECONDS.convert(now - heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS);
-                    }
-                };
-            metrics.addMetric(new MetricName("last-heartbeat-seconds-ago",
-                this.metricGrpName,
-                "The number of seconds since the last controller heartbeat",
-                tags),
-                lastHeartbeat);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 7e55d46..f119552 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -167,20 +167,31 @@ public class Fetcher<K, V> {
         }
     }
 
-
+    /**
+     * Get topic metadata for all topics in the cluster
+     * @param timeout time for which getting topic metadata is attempted
+     * @return The map of topics with their partition information
+     */
+    public Map<String, List<PartitionInfo>> getAllTopicMetadata(long timeout) {
+        return getTopicMetadata(null, timeout);
+    }
 
     /**
      * Get metadata for all topics present in Kafka cluster
      *
-     * @param timeout time for which getting all topics is attempted
-     * @return The map of topics and its partitions
+     * @param topics The list of topics to fetch or null to fetch all
+     * @param timeout time for which getting topic metadata is attempted
+     * @return The map of topics with their partition information
      */
-    public Map<String, List<PartitionInfo>> getAllTopics(long timeout) {
+    public Map<String, List<PartitionInfo>> getTopicMetadata(List<String> topics, long timeout) {
+        if (topics != null && topics.isEmpty())
+            return Collections.emptyMap();
+
         final HashMap<String, List<PartitionInfo>> topicsPartitionInfos = new HashMap<>();
         long startTime = time.milliseconds();
 
         while (time.milliseconds() - startTime < timeout) {
-            RequestFuture<ClientResponse> requestFuture = sendMetadataRequest();
+            RequestFuture<ClientResponse> requestFuture = sendMetadataRequest(topics);
             if (requestFuture != null) {
                 client.poll(requestFuture);
 
@@ -209,11 +220,12 @@ public class Fetcher<K, V> {
      * Send Metadata Request to least loaded node in Kafka cluster asynchronously
      * @return A future that indicates result of sent metadata request
      */
-    public RequestFuture<ClientResponse> sendMetadataRequest() {
+    public RequestFuture<ClientResponse> sendMetadataRequest(List<String> topics) {
+        if (topics == null)
+            topics = Collections.emptyList();
         final Node node = client.leastLoadedNode();
         return node == null ? null :
-            client.send(
-                node, ApiKeys.METADATA, new MetadataRequest(Collections.<String>emptyList()));
+            client.send(node, ApiKeys.METADATA, new MetadataRequest(topics));
     }
 
     /**
@@ -448,8 +460,9 @@ public class Fetcher<K, V> {
                 long fetched = this.subscriptions.fetched(partition);
                 long consumed = this.subscriptions.consumed(partition);
                 // Only fetch data for partitions whose previously fetched data has been consumed
-                if (consumed == fetched)
+                if (consumed == fetched) {
                     fetch.put(partition, new FetchRequest.PartitionData(fetched, this.fetchSize));
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
new file mode 100644
index 0000000..46bfa75
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.TopicPartition;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This interface is used to define custom partition assignment for use in
+ * {@link org.apache.kafka.clients.consumer.KafkaConsumer}. Members of the consumer group subscribe
+ * to the topics they are interested in and forward their subscriptions to a Kafka broker serving
+ * as the group coordinator. The coordinator selects one member to perform the group assignment and
+ * propagates the subscriptions of all members to it. Then {@link #assign(Cluster, Map)} is called
+ * to perform the assignment and the results are forwarded back to each respective members
+ *
+ * In some cases, it is useful to forward additional metadata to the assignor in order to make
+ * assignment decisions. For this, you can override {@link #subscription(Set)} and provide custom
+ * userData in the returned Subscription. For example, to have a rack-aware assignor, an implementation
+ * can use this user data to forward the rackId belonging to each member.
+ */
+public interface PartitionAssignor {
+
+    /**
+     * Return a serializable object representing the local member's subscription. This can include
+     * additional information as well (e.g. local host/rack information) which can be leveraged in
+     * {@link #assign(Cluster, Map)}.
+     * @param topics Topics subscribed to through {@link org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(List)}
+     *               and variants
+     * @return Non-null subscription with optional user data
+     */
+    Subscription subscription(Set<String> topics);
+
+    /**
+     * Perform the group assignment given the member subscriptions and current cluster metadata.
+     * @param metadata Current topic/broker metadata known by consumer
+     * @param subscriptions Subscriptions from all members provided through {@link #subscription(Set)}
+     * @return A map from the members to their respective assignment. This should have one entry
+     *         for all members who in the input subscription map.
+     */
+    Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions);
+
+
+    /**
+     * Callback which is invoked when a group member receives its assignment from the leader.
+     * @param assignment The local member's assignment as provided by the leader in {@link #assign(Cluster, Map)}
+     */
+    void onAssignment(Assignment assignment);
+
+
+    /**
+     * Unique name for this assignor (e.g. "range" or "roundrobin")
+     * @return non-null unique name
+     */
+    String name();
+
+    class Subscription {
+        private final List<String> topics;
+        private final ByteBuffer userData;
+
+        public Subscription(List<String> topics, ByteBuffer userData) {
+            this.topics = topics;
+            this.userData = userData;
+        }
+
+        public Subscription(List<String> topics) {
+            this(topics, ByteBuffer.wrap(new byte[0]));
+        }
+
+        public List<String> topics() {
+            return topics;
+        }
+
+        public ByteBuffer userData() {
+            return userData;
+        }
+
+    }
+
+    class Assignment {
+        private final List<TopicPartition> partitions;
+        private final ByteBuffer userData;
+
+        public Assignment(List<TopicPartition> partitions, ByteBuffer userData) {
+            this.partitions = partitions;
+            this.userData = userData;
+        }
+
+        public Assignment(List<TopicPartition> partitions) {
+            this(partitions, ByteBuffer.wrap(new byte[0]));
+        }
+
+        public List<TopicPartition> partitions() {
+            return partitions;
+        }
+
+        public ByteBuffer userData() {
+            return userData;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
index f5c1afc..7be99bd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
@@ -175,6 +175,20 @@ public class RequestFuture<T> {
         return adapted;
     }
 
+    public void chain(final RequestFuture<T> future) {
+        addListener(new RequestFutureListener<T>() {
+            @Override
+            public void onSuccess(T value) {
+                future.complete(value);
+            }
+
+            @Override
+            public void onFailure(RuntimeException e) {
+                future.raise(e);
+            }
+        });
+    }
+
     public static <T> RequestFuture<T> failure(RuntimeException e) {
         RequestFuture<T> future = new RequestFuture<T>();
         future.raise(e);
@@ -188,7 +202,7 @@ public class RequestFuture<T> {
     }
 
     public static <T> RequestFuture<T> coordinatorNotAvailable() {
-        return failure(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.exception());
+        return failure(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.exception());
     }
 
     public static <T> RequestFuture<T> leaderNotAvailable() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 25a0e90..6e79a7f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -29,7 +29,7 @@ import java.util.regex.Pattern;
 /**
  * A class for tracking the topics, partitions, and offsets for the consumer. A partition
  * is "assigned" either directly with {@link #assign(List)} (manual assignment)
- * or with {@link #changePartitionAssignment(List)} (automatic assignment).
+ * or with {@link #changePartitionAssignment(Collection)} (automatic assignment).
  *
  * Once assigned, the partition is not considered "fetchable" until its initial position has
  * been set with {@link #seek(TopicPartition, long)}. Fetchable partitions track a fetch
@@ -54,6 +54,9 @@ public class SubscriptionState {
     /* the list of topics the user has requested */
     private final Set<String> subscription;
 
+    /* the list of topics the group has subscribed to (set only for the leader on join group completion) */
+    private final Set<String> groupSubscription;
+
     /* the list of partitions the user has requested */
     private final Set<TopicPartition> userAssignment;
 
@@ -80,6 +83,7 @@ public class SubscriptionState {
         this.subscription = new HashSet<>();
         this.userAssignment = new HashSet<>();
         this.assignment = new HashMap<>();
+        this.groupSubscription = new HashSet<>();
         this.needsPartitionAssignment = false;
         this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to fetch offset upon starting up
         this.subscribedPattern = null;
@@ -101,6 +105,7 @@ public class SubscriptionState {
         if (!this.subscription.equals(new HashSet<>(topicsToSubscribe))) {
             this.subscription.clear();
             this.subscription.addAll(topicsToSubscribe);
+            this.groupSubscription.addAll(topicsToSubscribe);
             this.needsPartitionAssignment = true;
 
             // Remove any assigned partitions which are no longer subscribed to
@@ -110,10 +115,22 @@ public class SubscriptionState {
                     it.remove();
             }
         }
+    }
 
+    /**
+     * Add topics to the current group subscription. This is used by the group leader to ensure
+     * that it receives metadata updates for all topics that the group is interested in.
+     * @param topics The topics to add to the group subscription
+     */
+    public void groupSubscribe(Collection<String> topics) {
+        if (!this.userAssignment.isEmpty())
+            throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE);
+        this.groupSubscription.addAll(topics);
     }
 
     public void needReassignment() {
+        //
+        this.groupSubscription.retainAll(subscription);
         this.needsPartitionAssignment = true;
     }
 
@@ -142,6 +159,10 @@ public class SubscriptionState {
         this.subscribedPattern = pattern;
     }
 
+    public boolean hasPatternSubscription() {
+        return subscribedPattern != null;
+    }
+
     public void unsubscribe() {
         this.subscription.clear();
         this.assignment.clear();
@@ -154,15 +175,24 @@ public class SubscriptionState {
         return this.subscribedPattern;
     }
 
-    public void clearAssignment() {
-        this.assignment.clear();
-        this.needsPartitionAssignment = !subscription().isEmpty();
-    }
-
     public Set<String> subscription() {
         return this.subscription;
     }
 
+    /**
+     * Get the subscription for the group. For the leader, this will include the union of the
+     * subscriptions of all group members. For followers, it is just that member's subscription.
+     * This is used when querying topic metadata to detect the metadata changes which would
+     * require rebalancing. The leader fetches metadata for all topics in the group so that it
+     * can do the partition assignment (which requires at least partition counts for all topics
+     * to be assigned).
+     * @return The union of all subscribed topics in the group if this member is the leader
+     *   of the current generation; otherwise it returns the same set as {@link #subscription()}
+     */
+    public Set<String> groupSubscription() {
+        return this.groupSubscription;
+    }
+
     public Long fetched(TopicPartition tp) {
         return assignedState(tp).fetched;
     }
@@ -280,7 +310,7 @@ public class SubscriptionState {
         for (TopicPartition tp : assignments)
             if (!this.subscription.contains(tp.topic()))
                 throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic.");
-        this.clearAssignment();
+        this.assignment.clear();
         for (TopicPartition tp: assignments)
             addAssignedPartition(tp);
         this.needsPartitionAssignment = false;

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index 60594a7..e6a2e43 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -173,6 +173,16 @@ public final class Cluster {
     }
 
     /**
+     * Get the number of partitions for the given topic
+     * @param topic The topic to get the number of partitions for
+     * @return The number of partitions or null if there is no corresponding metadata
+     */
+    public Integer partitionCountForTopic(String topic) {
+        List<PartitionInfo> partitionInfos = this.partitionsByTopic.get(topic);
+        return partitionInfos == null ? null : partitionInfos.size();
+    }
+
+    /**
      * Get all topics.
      * @return a set of all topics
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java b/clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
deleted file mode 100644
index ba9ce82..0000000
--- a/clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.kafka.common.errors;
-
-/**
- * The broker returns this error code for consumer metadata requests or offset commit requests if the offsets topic has
- * not yet been created.
- */
-public class ConsumerCoordinatorNotAvailableException extends RetriableException {
-
-    private static final long serialVersionUID = 1L;
-
-    public ConsumerCoordinatorNotAvailableException() {
-        super();
-    }
-
-    public ConsumerCoordinatorNotAvailableException(String message) {
-        super(message);
-    }
-
-    public ConsumerCoordinatorNotAvailableException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public ConsumerCoordinatorNotAvailableException(Throwable cause) {
-        super(cause);
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java b/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java
new file mode 100644
index 0000000..c0949e3
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.errors;
+
+/**
+ * The broker returns this error code for consumer metadata requests or offset commit requests if the offsets topic has
+ * not yet been created.
+ */
+public class GroupCoordinatorNotAvailableException extends RetriableException {
+
+    private static final long serialVersionUID = 1L;
+
+    public GroupCoordinatorNotAvailableException() {
+        super();
+    }
+
+    public GroupCoordinatorNotAvailableException(String message) {
+        super(message);
+    }
+
+    public GroupCoordinatorNotAvailableException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public GroupCoordinatorNotAvailableException(Throwable cause) {
+        super(cause);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java
deleted file mode 100644
index b6c83b4..0000000
--- a/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.kafka.common.errors;
-
-/**
- * The broker returns this error code if it receives an offset fetch or commit request for a consumer group that it is
- * not a coordinator for.
- */
-public class NotCoordinatorForConsumerException extends RetriableException {
-
-    private static final long serialVersionUID = 1L;
-
-    public NotCoordinatorForConsumerException() {
-        super();
-    }
-
-    public NotCoordinatorForConsumerException(String message) {
-        super(message);
-    }
-
-    public NotCoordinatorForConsumerException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public NotCoordinatorForConsumerException(Throwable cause) {
-        super(cause);
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForGroupException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForGroupException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForGroupException.java
new file mode 100644
index 0000000..bc56eb0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForGroupException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.errors;
+
+/**
+ * The broker returns this error code if it receives an offset fetch or commit request for a consumer group that it is
+ * not a coordinator for.
+ */
+public class NotCoordinatorForGroupException extends RetriableException {
+
+    private static final long serialVersionUID = 1L;
+
+    public NotCoordinatorForGroupException() {
+        super();
+    }
+
+    public NotCoordinatorForGroupException(String message) {
+        super(message);
+    }
+
+    public NotCoordinatorForGroupException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public NotCoordinatorForGroupException(Throwable cause) {
+        super(cause);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java
deleted file mode 100644
index 28bfd72..0000000
--- a/clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.common.errors;
-
-public class UnknownConsumerIdException extends ApiException {
-    private static final long serialVersionUID = 1L;
-
-    public UnknownConsumerIdException() {
-        super();
-    }
-
-    public UnknownConsumerIdException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public UnknownConsumerIdException(String message) {
-        super(message);
-    }
-
-    public UnknownConsumerIdException(Throwable cause) {
-        super(cause);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/errors/UnknownMemberIdException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnknownMemberIdException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnknownMemberIdException.java
new file mode 100644
index 0000000..f8eab90
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/UnknownMemberIdException.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class UnknownMemberIdException extends ApiException {
+    private static final long serialVersionUID = 1L;
+
+    public UnknownMemberIdException() {
+        super();
+    }
+
+    public UnknownMemberIdException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public UnknownMemberIdException(String message) {
+        super(message);
+    }
+
+    public UnknownMemberIdException(Throwable cause) {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index fab8b02..af7b266 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -30,10 +30,11 @@ public enum ApiKeys {
     CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown"),
     OFFSET_COMMIT(8, "OffsetCommit"),
     OFFSET_FETCH(9, "OffsetFetch"),
-    CONSUMER_METADATA(10, "ConsumerMetadata"),
+    GROUP_METADATA(10, "GroupMetadata"),
     JOIN_GROUP(11, "JoinGroup"),
     HEARTBEAT(12, "Heartbeat"),
-    LEAVE_GROUP(13, "LeaveGroup");
+    LEAVE_GROUP(13, "LeaveGroup"),
+    SYNC_GROUP(14, "SyncGroup");
 
     private static ApiKeys[] codeToType;
     public static final int MAX_API_KEY;

http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 220132f..3191636 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -60,10 +60,10 @@ public enum Errors {
             new NetworkException("The server disconnected before a response was received.")),
     OFFSET_LOAD_IN_PROGRESS(14,
             new OffsetLoadInProgressException("The coordinator is loading offsets and can't process requests.")),
-    CONSUMER_COORDINATOR_NOT_AVAILABLE(15,
-            new ConsumerCoordinatorNotAvailableException("The coordinator is not available.")),
-    NOT_COORDINATOR_FOR_CONSUMER(16,
-            new NotCoordinatorForConsumerException("This is not the correct coordinator for this consumer.")),
+    GROUP_COORDINATOR_NOT_AVAILABLE(15,
+            new GroupCoordinatorNotAvailableException("The group coordinator is not available.")),
+    NOT_COORDINATOR_FOR_GROUP(16,
+            new NotCoordinatorForGroupException("This is not the correct coordinator for this group.")),
     INVALID_TOPIC_EXCEPTION(17,
             new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")),
     RECORD_LIST_TOO_LARGE(18,
@@ -75,17 +75,13 @@ public enum Errors {
     INVALID_REQUIRED_ACKS(21,
             new InvalidRequiredAcksException("Produce request specified an invalid value for required acks.")),
     ILLEGAL_GENERATION(22,
-            new IllegalGenerationException("Specified consumer generation id is not valid.")),
-    INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY(23,
-            new ApiException("The request partition assignment strategy does not match that of the group.")),
-    UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY(24,
-            new ApiException("The request partition assignment strategy is unknown to the broker.")),
-    UNKNOWN_CONSUMER_ID(25,
-            new UnknownConsumerIdException("The coordinator is not aware of this consumer.")),
+            new IllegalGenerationException("Specified group generation id is not valid.")),
+    INCONSISTENT_GROUP_PROTOCOL(23,
+            new ApiException("The group member's supported protocols are incompatible with those of existing members.")),
+    UNKNOWN_MEMBER_ID(25,
+            new UnknownMemberIdException("The coordinator is not aware of this member.")),
     INVALID_SESSION_TIMEOUT(26,
             new ApiException("The session timeout is not within an acceptable range.")),
-    COMMITTING_PARTITIONS_NOT_ASSIGNED(27,
-            new ApiException("Some of the committing partitions are not assigned the committer")),
     INVALID_COMMIT_OFFSET_SIZE(28,
             new ApiException("The committing offset data size is not valid")),
     AUTHORIZATION_FAILED(29, new ApiException("Request is not authorized.")),


Mime
View raw message