kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-8179: Part 3, Add PartitionsLost API for resetGenerations and metadata/subscription change (#6884)
Date Thu, 08 Aug 2019 21:31:41 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e867a58  KAFKA-8179: Part 3, Add PartitionsLost API for resetGenerations and metadata/subscription change (#6884)
e867a58 is described below

commit e867a58425876767b952e06892c72b5e13066acc
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Thu Aug 8 14:31:22 2019 -0700

    KAFKA-8179: Part 3, Add PartitionsLost API for resetGenerations and metadata/subscription change (#6884)
    
    1. Add onPartitionsLost into the RebalanceListener, which will be triggered when the consumer found that the generation is reset due to fatal errors in response handling.
    
    2. Semantical behavior change: with COOPERATIVE protocol, if the revoked / lost partitions are empty, do not trigger the corresponding callback at all. For added partitions though, even if it is empty we would still trigger the callback as a way to notify the rebalance event; with EAGER protocol, revoked / assigned callbacks are always triggered.
    
    The ordering of the callback would be the following:
    
    a. Callback onPartitionsRevoked / onPartitionsLost triggered.
    b. Update the assignment (both revoked and added).
    c. Callback onPartitionsAssigned triggered.
    
    In this way we are assured that users can still access the partitions being revoked, whereas they can also access the partitions being added.
    
    3. Semantical behavior change (KAFKA-4600): if the rebalance listener throws an exception, pass it along all the way to the consumer.poll caller, but still completes the rest of the actions. Also, the newly assigned partitions list does not gets affected with exception thrown since it is just for notifying the users.
    
    4. Semantical behavior change: the ConsumerCoordinator would not try to modify assignor's returned assignments, instead it will validate that assignments and set the error code accordingly: if there are overlaps between added / revoked partitions, it is a fatal error and would be communicated to all members to stop; if revoked is not empty, it is an error indicate re-join; otherwise, it is normal.
    
    5. Minor: with the error code removed from the Assignment, ConsumerCoordinator will request re-join if the revoked partitions list is not empty.
    
    6. Updated ConsumerCoordinatorTest accordingly. Also found a minor bug in MetadataUpdate that removed topic would still be retained with null value of num.partitions.
    
    6. Updated a few other flaky tests that are exposed due to this change.
    
    Reviewers: John Roesler <vvcephei@users.noreply.github.com>, A. Sophie Blee-Goldman <sophie@confluent.io>, Jason Gustafson <jason@confluent.io>
---
 .../consumer/ConsumerPartitionAssignor.java        |  17 ++
 .../consumer/ConsumerRebalanceListener.java        |  98 +++++-
 .../kafka/clients/consumer/KafkaConsumer.java      |  19 +-
 .../consumer/internals/AbstractCoordinator.java    |  62 ++--
 .../consumer/internals/ConsumerCoordinator.java    | 332 +++++++++++++++------
 .../consumer/internals/PartitionAssignor.java      |   1 -
 .../consumer/internals/SubscriptionState.java      |  55 ++--
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  64 +++-
 .../clients/consumer/RoundRobinAssignorTest.java   |  20 +-
 .../internals/ConsumerCoordinatorTest.java         | 184 ++++++++----
 .../consumer/internals/SubscriptionStateTest.java  |  40 ++-
 core/src/main/scala/kafka/tools/MirrorMaker.scala  |   2 +
 .../kafka/api/AbstractConsumerTest.scala           |  14 +-
 .../integration/kafka/api/ConsumerBounceTest.scala |  13 +-
 .../kafka/api/PlaintextConsumerTest.scala          |   8 +-
 .../org/apache/kafka/streams/KafkaStreams.java     |   5 +-
 .../streams/processor/internals/StreamThread.java  |   5 +-
 .../StreamTableJoinIntegrationTest.java            |   5 +-
 .../integration/utils/IntegrationTestUtils.java    |  17 +-
 19 files changed, 686 insertions(+), 275 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
index 07e153e..f9a4217 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
@@ -180,6 +180,23 @@ public interface ConsumerPartitionAssignor {
         }
     }
 
+    /**
+     * The rebalance protocol defines partition assignment and revocation semantics. The purpose is to establish a
+     * consistent set of rules that all consumers in a group follow in order to transfer ownership of a partition.
+     * {@link ConsumerPartitionAssignor} implementors can claim supporting one or more rebalance protocols via the
+     * {@link ConsumerPartitionAssignor#supportedProtocols()}, and it is their responsibility to respect the rules
+     * of those protocols in their {@link ConsumerPartitionAssignor#assign(Cluster, GroupSubscription)} implementations.
+     * Failures to follow the rules of the supported protocols would lead to runtime error or undefined behavior.
+     *
+     * The {@link RebalanceProtocol#EAGER} rebalance protocol requires a consumer to always revoke all its owned
+     * partitions before participating in a rebalance event. It therefore allows a complete reshuffling of the assignment.
+     *
+     * {@link RebalanceProtocol#COOPERATIVE} rebalance protocol allows a consumer to retain its currently owned
+     * partitions before participating in a rebalance event. The assignor should not reassign any owned partitions
+     * immediately, but instead may indicate consumers the need for partition revocation so that the revoked
+     * partitions can be reassigned to other consumers in the next rebalance event. This is designed for sticky assignment
+     * logic which attempts to minimize partition reassignment with cooperative adjustments.
+     */
     enum RebalanceProtocol {
         EAGER((byte) 0), COOPERATIVE((byte) 1);
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
index 74e8b06..6046ef9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer;
 
+import java.time.Duration;
 import java.util.Collection;
 
 import org.apache.kafka.common.TopicPartition;
@@ -29,7 +30,7 @@ import org.apache.kafka.common.TopicPartition;
  * <p>
  * When Kafka is managing the group membership, a partition re-assignment will be triggered any time the members of the group change or the subscription
  * of the members changes. This can occur when processes die, new process instances are added or old instances come back to life after failure.
- * Rebalances can also be triggered by changes affecting the subscribed topics (e.g. when the number of partitions is
+ * Partition re-assignments can also be triggered by changes affecting the subscribed topics (e.g. when the number of partitions is
  * administratively adjusted).
  * <p>
  * There are many uses for this functionality. One common use is saving offsets in a custom store. By saving offsets in
@@ -47,11 +48,43 @@ import org.apache.kafka.common.TopicPartition;
  * This callback will only execute in the user thread as part of the {@link Consumer#poll(java.time.Duration) poll(long)} call
  * whenever partition assignment changes.
  * <p>
- * It is guaranteed that all consumer processes will invoke {@link #onPartitionsRevoked(Collection) onPartitionsRevoked} prior to
- * any process invoking {@link #onPartitionsAssigned(Collection) onPartitionsAssigned}. So if offsets or other state is saved in the
- * {@link #onPartitionsRevoked(Collection) onPartitionsRevoked} call it is guaranteed to be saved by the time the process taking over that
- * partition has their {@link #onPartitionsAssigned(Collection) onPartitionsAssigned} callback called to load the state.
+ * Under normal conditions, if a partition is reassigned from one consumer to another, then the old consumer will
+ * always invoke {@link #onPartitionsRevoked(Collection) onPartitionsRevoked} for that partition prior to the new consumer
+ * invoking {@link #onPartitionsAssigned(Collection) onPartitionsAssigned} for the same partition. So if offsets or other state is saved in the
+ * {@link #onPartitionsRevoked(Collection) onPartitionsRevoked} call by one consumer member, it will be always accessible by the time the
+ * other consumer member taking over that partition and triggering its {@link #onPartitionsAssigned(Collection) onPartitionsAssigned} callback to load the state.
  * <p>
+ * You can think of revocation as a graceful way to give up ownership of a partition. In some cases, the consumer may not have an opportunity to do so.
+ * For example, if the session times out, then the partitions may be reassigned before we have a chance to revoke them gracefully.
+ * For this case, we have a third callback {@link #onPartitionsLost(Collection)}. The difference between this function and
+ * {@link #onPartitionsRevoked(Collection)} is that upon invocation of {@link #onPartitionsLost(Collection)}, the partitions
+ * may already be owned by some other members in the group and therefore users would not be able to commit its consumed offsets for example.
+ * Users could implement these two functions differently (by default,
+ * {@link #onPartitionsLost(Collection)} will be calling {@link #onPartitionsRevoked(Collection)} directly); for example, in the
+ * {@link #onPartitionsLost(Collection)} we should not need to store the offsets since we know these partitions are no longer owned by the consumer
+ * at that time.
+ * <p>
+ * During a rebalance event, the {@link #onPartitionsAssigned(Collection) onPartitionsAssigned} function will always be triggered exactly once when
+ * the rebalance completes. That is, even if there is no newly assigned partitions for a consumer member, its {@link #onPartitionsAssigned(Collection) onPartitionsAssigned}
+ * will still be triggered with an empty collection of partitions. As a result this function can be used also to notify when a rebalance event has happened.
+ * On the other hand, {@link #onPartitionsRevoked(Collection)} and {@link #onPartitionsLost(Collection)}
+ * will only be triggered when there are non-empty partitions revoked or lost from this consumer member during a rebalance event.
+ * <p>
+ * It is possible
+ * for a {@link org.apache.kafka.common.errors.WakeupException} or {@link org.apache.kafka.common.errors.InterruptException}
+ * to be raised from one of these nested invocations. In this case, the exception will be propagated to the current
+ * invocation of {@link KafkaConsumer#poll(java.time.Duration)} in which this callback is being executed. This means it is not
+ * necessary to catch these exceptions and re-attempt to wakeup or interrupt the consumer thread.
+ * Also if the callback function implementation itself throws an exception, this exception will be propagated to the current
+ * invocation of {@link KafkaConsumer#poll(java.time.Duration)} as well.
+ * <p>
+ * Note that callbacks only serve as notification of an assignment change.
+ * They cannot be used to express acceptance of the change.
+ * Hence throwing an exception from a callback does not affect the assignment in any way,
+ * as it will be propagated all the way up to the {@link KafkaConsumer#poll(java.time.Duration)} call.
+ * If user captures the exception in the caller, the callback is still assumed successful and no further retries will be attempted.
+ * <p>
+ *
  * Here is pseudo-code for a callback implementation for saving offsets:
  * <pre>
  * {@code
@@ -68,6 +101,10 @@ import org.apache.kafka.common.TopicPartition;
  *              saveOffsetInExternalStore(consumer.position(partition));
  *       }
  *
+ *       public void onPartitionsLost(Collection<TopicPartition> partitions) {
+ *           // do not need to save the offsets since these partitions are probably owned by other consumers already
+ *       }
+ *
  *       public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
  *           // read the offsets from an external store using some custom code not described here
  *           for(TopicPartition partition: partitions)
@@ -82,7 +119,9 @@ public interface ConsumerRebalanceListener {
     /**
      * A callback method the user can implement to provide handling of offset commits to a customized store on the start
      * of a rebalance operation. This method will be called before a rebalance operation starts and after the consumer
-     * stops fetching data. It is recommended that offsets should be committed in this callback to either Kafka or a
+     * stops fetching data. It can also be called when consumer is being closed ({@link KafkaConsumer#close(Duration)})
+     * or is unsubscribing ({@link KafkaConsumer#unsubscribe()}).
+     * It is recommended that offsets should be committed in this callback to either Kafka or a
      * custom offset store to prevent duplicate data.
      * <p>
      * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer}
@@ -91,11 +130,12 @@ public interface ConsumerRebalanceListener {
      * <p>
      * It is common for the revocation callback to use the consumer instance in order to commit offsets. It is possible
      * for a {@link org.apache.kafka.common.errors.WakeupException} or {@link org.apache.kafka.common.errors.InterruptException}
-     * to be raised from one these nested invocations. In this case, the exception will be propagated to the current
+     * to be raised from one of these nested invocations. In this case, the exception will be propagated to the current
      * invocation of {@link KafkaConsumer#poll(java.time.Duration)} in which this callback is being executed. This means it is not
      * necessary to catch these exceptions and re-attempt to wakeup or interrupt the consumer thread.
      *
-     * @param partitions The list of partitions that were assigned to the consumer on the last rebalance
+     * @param partitions The list of partitions that were assigned to the consumer and now need to be revoked (may not
+     *                   include all currently assigned partitions, i.e. there may still be some partitions left)
      * @throws org.apache.kafka.common.errors.WakeupException If raised from a nested call to {@link KafkaConsumer}
      * @throws org.apache.kafka.common.errors.InterruptException If raised from a nested call to {@link KafkaConsumer}
      */
@@ -106,20 +146,52 @@ public interface ConsumerRebalanceListener {
      * partition re-assignment. This method will be called after the partition re-assignment completes and before the
      * consumer starts fetching data, and only as the result of a {@link Consumer#poll(java.time.Duration) poll(long)} call.
      * <p>
-     * It is guaranteed that all the processes in a consumer group will execute their
+     * It is guaranteed that under normal conditions all the processes in a consumer group will execute their
      * {@link #onPartitionsRevoked(Collection)} callback before any instance executes its
-     * {@link #onPartitionsAssigned(Collection)} callback.
+     * {@link #onPartitionsAssigned(Collection)} callback. During exceptional scenarios, partitions may be migrated
+     * without the old owner being notified (i.e. their {@link #onPartitionsRevoked(Collection)} callback not triggered),
+     * and later when the old owner consumer realized this event, the {@link #onPartitionsLost(Collection)} (Collection)} callback
+     * will be triggered by the consumer then.
      * <p>
      * It is common for the assignment callback to use the consumer instance in order to query offsets. It is possible
      * for a {@link org.apache.kafka.common.errors.WakeupException} or {@link org.apache.kafka.common.errors.InterruptException}
-     * to be raised from one these nested invocations. In this case, the exception will be propagated to the current
+     * to be raised from one of these nested invocations. In this case, the exception will be propagated to the current
      * invocation of {@link KafkaConsumer#poll(java.time.Duration)} in which this callback is being executed. This means it is not
      * necessary to catch these exceptions and re-attempt to wakeup or interrupt the consumer thread.
      *
-     * @param partitions The list of partitions that are now assigned to the consumer (may include partitions previously
-     *            assigned to the consumer)
+     * @param partitions The list of partitions that are now assigned to the consumer (previously owned partitions will
+     *                   NOT be included, i.e. this list will only include newly added partitions)
      * @throws org.apache.kafka.common.errors.WakeupException If raised from a nested call to {@link KafkaConsumer}
      * @throws org.apache.kafka.common.errors.InterruptException If raised from a nested call to {@link KafkaConsumer}
      */
     void onPartitionsAssigned(Collection<TopicPartition> partitions);
+
+    /**
+     * A callback method you can implement to provide handling of cleaning up resources for partitions that have already
+     * been reassigned to other consumers. This method will not be called during normal execution as the owned partitions would
+     * first be revoked by calling the {@link ConsumerRebalanceListener#onPartitionsRevoked}, before being reassigned
+     * to other consumers during a rebalance event. However, during exceptional scenarios when the consumer realized that it
+     * does not own this partition any longer, i.e. not revoked via a normal rebalance event, then this method would be invoked.
+     * <p>
+     * For example, this function is called if a consumer's session timeout has expired, or if a fatal error has been
+     * received indicating the consumer is no longer part of the group.
+     * <p>
+     * By default it will just trigger {@link ConsumerRebalanceListener#onPartitionsRevoked}; for users who want to distinguish
+     * the handling logic of revoked partitions v.s. lost partitions, they can override the default implementation.
+     * <p>
+     * It is possible
+     * for a {@link org.apache.kafka.common.errors.WakeupException} or {@link org.apache.kafka.common.errors.InterruptException}
+     * to be raised from one of these nested invocations. In this case, the exception will be propagated to the current
+     * invocation of {@link KafkaConsumer#poll(java.time.Duration)} in which this callback is being executed. This means it is not
+     * necessary to catch these exceptions and re-attempt to wakeup or interrupt the consumer thread.
+     *
+     * @param partitions The list of partitions that were assigned to the consumer and now have been reassigned
+     *                   to other consumers (may not include all currently assigned partitions, i.e. there may still
+     *                   be some partitions left)
+     * @throws org.apache.kafka.common.errors.WakeupException If raised from a nested call to {@link KafkaConsumer}
+     * @throws org.apache.kafka.common.errors.InterruptException If raised from a nested call to {@link KafkaConsumer}
+     */
+    default void onPartitionsLost(Collection<TopicPartition> partitions) {
+        onPartitionsRevoked(partitions);
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 0d0efe1..be3e176 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1046,14 +1046,18 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     /**
      * Unsubscribe from topics currently subscribed with {@link #subscribe(Collection)} or {@link #subscribe(Pattern)}.
      * This also clears any partitions directly assigned through {@link #assign(Collection)}.
+     *
+     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. rebalance callback errors)
      */
     public void unsubscribe() {
         acquireAndEnsureOpen();
         try {
             fetcher.clearBufferedDataForUnassignedPartitions(Collections.emptySet());
             this.subscriptions.unsubscribe();
-            if (this.coordinator != null)
+            if (this.coordinator != null) {
+                this.coordinator.onLeavePrepare();
                 this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics");
+            }
             log.info("Unsubscribed all topics or patterns and assigned partitions");
         } finally {
             release();
@@ -1176,7 +1180,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @throws org.apache.kafka.common.errors.AuthorizationException if caller lacks Read access to any of the subscribed
      *             topics or to the configured groupId. See the exception for more details
      * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. invalid groupId or
-     *             session timeout, errors deserializing key/value pairs, or any new error cases in future versions)
+     *             session timeout, errors deserializing key/value pairs, your rebalance callback thrown exceptions,
+     *             or any new error cases in future versions)
      * @throws java.lang.IllegalArgumentException if the timeout value is negative
      * @throws java.lang.IllegalStateException if the consumer is not subscribed to any topics or manually assigned any
      *             partitions to consume from
@@ -1190,6 +1195,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         return poll(time.timer(timeout), true);
     }
 
+    /**
+     * @throws KafkaException if the rebalance callback throws exception
+     */
     private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
         acquireAndEnsureOpen();
         try {
@@ -1244,6 +1252,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         return updateFetchPositions(timer);
     }
 
+    /**
+     * @throws KafkaException if the rebalance callback throws exception
+     */
     private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {
         long pollTimeout = coordinator == null ? timer.remainingMs() :
                 Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());
@@ -2162,10 +2173,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         acquire();
         try {
             if (!closed) {
-                closed = true;
+                // need to close before setting the flag since the close function
+                // itself may trigger rebalance callback that needs the consumer to be open still
                 close(timeout.toMillis(), false);
             }
         } finally {
+            closed = true;
             release();
         }
     }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index fc385f6..4c71a89 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -46,6 +46,7 @@ import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Meter;
 import org.apache.kafka.common.metrics.stats.WindowedCount;
+import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.FindCoordinatorRequest;
 import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
@@ -202,6 +203,13 @@ public abstract class AbstractCoordinator implements Closeable {
                                            ByteBuffer memberAssignment);
 
     /**
+     * Invoked prior to each leave group event. This is typically used to cleanup assigned partitions;
+     * note it is triggered by the consumer's API caller thread (i.e. background heartbeat thread would
+     * not trigger it even if it tries to force leaving group upon heartbeat session expiration)
+     */
+    protected void onLeavePrepare() {}
+
+    /**
      * Visible for testing.
      *
      * Ensure that the coordinator is ready to receive requests.
@@ -314,6 +322,7 @@ public abstract class AbstractCoordinator implements Closeable {
      * Ensure the group is active (i.e., joined and synced)
      *
      * @param timer Timer bounding how long this method can block
+     * @throws KafkaException if the callback throws exception
      * @return true iff the group is active
      */
     boolean ensureActiveGroup(final Timer timer) {
@@ -362,6 +371,7 @@ public abstract class AbstractCoordinator implements Closeable {
      * Visible for testing.
      *
      * @param timer Timer bounding how long this method can block
+     * @throws KafkaException if the callback throws exception
      * @return true iff the operation succeeded
      */
     boolean joinGroupIfNeeded(final Timer timer) {
@@ -376,8 +386,10 @@ public abstract class AbstractCoordinator implements Closeable {
             // refresh which changes the matched subscription set) can occur while another rebalance is
             // still in progress.
             if (needsJoinPrepare) {
-                onJoinPrepare(generation.generationId, generation.memberId);
+                // need to set the flag before calling onJoinPrepare since the user callback may throw
+                // exception, in which case upon retry we should not retry onJoinPrepare either.
                 needsJoinPrepare = false;
+                onJoinPrepare(generation.generationId, generation.memberId);
             }
 
             final RequestFuture<ByteBuffer> future = initiateJoinGroup();
@@ -522,7 +534,7 @@ public abstract class AbstractCoordinator implements Closeable {
                 future.raise(error);
             } else if (error == Errors.UNKNOWN_MEMBER_ID) {
                 // reset the member id and retry immediately
-                resetGeneration();
+                resetGenerationOnResponseError(ApiKeys.JOIN_GROUP, error);
                 log.debug("Attempt to join group failed due to unknown member id.");
                 future.raise(error);
             } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
@@ -645,7 +657,7 @@ public abstract class AbstractCoordinator implements Closeable {
                 } else if (error == Errors.UNKNOWN_MEMBER_ID
                         || error == Errors.ILLEGAL_GENERATION) {
                     log.debug("SyncGroup failed: {}", error.message());
-                    resetGeneration();
+                    resetGenerationOnResponseError(ApiKeys.SYNC_GROUP, error);
                     future.raise(error);
                 } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
                         || error == Errors.NOT_COORDINATOR) {
@@ -795,14 +807,20 @@ public abstract class AbstractCoordinator implements Closeable {
         return generation != null && generation.hasMemberId();
     }
 
-
-    /**
-     * Reset the generation and memberId because we have fallen out of the group.
-     */
-    protected synchronized void resetGeneration() {
+    private synchronized void resetGeneration() {
         this.generation = Generation.NO_GENERATION;
-        this.rejoinNeeded = true;
         this.state = MemberState.UNJOINED;
+        this.rejoinNeeded = true;
+    }
+
+    synchronized void resetGenerationOnResponseError(ApiKeys api, Errors error) {
+        log.debug("Resetting generation after encountering " + error + " from " + api + " response");
+        resetGeneration();
+    }
+
+    synchronized void resetGenerationOnLeaveGroup() {
+        log.debug("Resetting generation due to consumer pro-actively leaving the group");
+        resetGeneration();
     }
 
     protected synchronized void requestRejoin() {
@@ -817,6 +835,9 @@ public abstract class AbstractCoordinator implements Closeable {
         close(time.timer(0));
     }
 
+    /**
+     * @throws KafkaException if the rebalance callback throws exception
+     */
     protected void close(Timer timer) {
         try {
             closeHeartbeatThread();
@@ -825,6 +846,7 @@ public abstract class AbstractCoordinator implements Closeable {
             // needs this lock to complete and terminate after close flag is set.
             synchronized (this) {
                 if (rebalanceConfig.leaveGroupOnClose) {
+                    onLeavePrepare();
                     maybeLeaveGroup("the consumer is being closed");
                 }
 
@@ -841,31 +863,31 @@ public abstract class AbstractCoordinator implements Closeable {
     }
 
     /**
-     * Leave the current group and reset local generation/memberId.
-     * @param leaveReason reason to attempt leaving the group
+     * @throws KafkaException if the rebalance callback throws exception
      */
     public synchronized RequestFuture<Void> maybeLeaveGroup(String leaveReason) {
         RequestFuture<Void> future = null;
+
         // Starting from 2.3, only dynamic members will send LeaveGroupRequest to the broker,
         // consumer with valid group.instance.id is viewed as static member that never sends LeaveGroup,
         // and the membership expiration is only controlled by session timeout.
         if (isDynamicMember() && !coordinatorUnknown() &&
-                state != MemberState.UNJOINED && generation.hasMemberId()) {
+            state != MemberState.UNJOINED && generation.hasMemberId()) {
             // this is a minimal effort attempt to leave the group. we do not
             // attempt any resending if the request fails or times out.
             log.info("Member {} sending LeaveGroup request to coordinator {} due to {}",
-                     generation.memberId, coordinator, leaveReason);
+                generation.memberId, coordinator, leaveReason);
             LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(
                 rebalanceConfig.groupId,
-                Collections.singletonList(new MemberIdentity()
-                                              .setMemberId(generation.memberId))
+                Collections.singletonList(new MemberIdentity().setMemberId(generation.memberId))
             );
-            future = client.send(coordinator, request)
-                    .compose(new LeaveGroupResponseHandler());
+
+            future = client.send(coordinator, request).compose(new LeaveGroupResponseHandler());
             client.pollNoWakeup();
         }
 
-        resetGeneration();
+        resetGenerationOnLeaveGroup();
+
         return future;
     }
 
@@ -926,14 +948,14 @@ public abstract class AbstractCoordinator implements Closeable {
                 future.raise(error);
             } else if (error == Errors.ILLEGAL_GENERATION) {
                 log.info("Attempt to heartbeat failed since generation {} is not current", generation.generationId);
-                resetGeneration();
+                resetGenerationOnResponseError(ApiKeys.HEARTBEAT, error);
                 future.raise(error);
             } else if (error == Errors.FENCED_INSTANCE_ID) {
                 log.error("Received fatal exception: group.instance.id gets fenced");
                 future.raise(error);
             } else if (error == Errors.UNKNOWN_MEMBER_ID) {
                 log.info("Attempt to heartbeat failed for since member id {} is not valid.", generation.memberId);
-                resetGeneration();
+                resetGenerationOnResponseError(ApiKeys.HEARTBEAT, error);
                 future.raise(error);
             } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                 future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index aae16b9..a3aafac 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.clients.consumer.CommitFailedException;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
 import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription;
@@ -48,6 +49,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
@@ -73,6 +75,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 /**
@@ -235,27 +238,80 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     }
 
     private void maybeUpdateJoinedSubscription(Set<TopicPartition> assignedPartitions) {
-        // Check if the assignment contains some topics that were not in the original
-        // subscription, if yes we will obey what leader has decided and add these topics
-        // into the subscriptions as long as they still match the subscribed pattern
-
-        Set<String> addedTopics = new HashSet<>();
-        //this is a copy because its handed to listener below
-        for (TopicPartition tp : assignedPartitions) {
-            if (!joinedSubscription.contains(tp.topic()))
-                addedTopics.add(tp.topic());
+        if (subscriptions.hasPatternSubscription()) {
+            // Check if the assignment contains some topics that were not in the original
+            // subscription, if yes we will obey what leader has decided and add these topics
+            // into the subscriptions as long as they still match the subscribed pattern
+
+            Set<String> addedTopics = new HashSet<>();
+            // this is a copy because its handed to listener below
+            for (TopicPartition tp : assignedPartitions) {
+                if (!joinedSubscription.contains(tp.topic()))
+                    addedTopics.add(tp.topic());
+            }
+
+            if (!addedTopics.isEmpty()) {
+                Set<String> newSubscription = new HashSet<>(subscriptions.subscription());
+                Set<String> newJoinedSubscription = new HashSet<>(joinedSubscription);
+                newSubscription.addAll(addedTopics);
+                newJoinedSubscription.addAll(addedTopics);
+
+                if (this.subscriptions.subscribeFromPattern(newSubscription))
+                    metadata.requestUpdateForNewTopics();
+                this.joinedSubscription = newJoinedSubscription;
+            }
+        }
+    }
+
+    private Exception invokePartitionsAssigned(final Set<TopicPartition> assignedPartitions) {
+        log.info("Adding newly assigned partitions: {}", Utils.join(assignedPartitions, ", "));
+
+        ConsumerRebalanceListener listener = subscriptions.rebalanceListener();
+        try {
+            listener.onPartitionsAssigned(assignedPartitions);
+        } catch (WakeupException | InterruptException e) {
+            throw e;
+        } catch (Exception e) {
+            log.error("User provided listener {} failed on invocation of onPartitionsAssigned for partitions {}",
+                listener.getClass().getName(), assignedPartitions, e);
+            return e;
+        }
+
+        return null;
+    }
+
+    private Exception invokePartitionsRevoked(final Set<TopicPartition> revokedPartitions) {
+        log.info("Revoke previously assigned partitions {}", Utils.join(revokedPartitions, ", "));
+
+        ConsumerRebalanceListener listener = subscriptions.rebalanceListener();
+        try {
+            listener.onPartitionsRevoked(revokedPartitions);
+        } catch (WakeupException | InterruptException e) {
+            throw e;
+        } catch (Exception e) {
+            log.error("User provided listener {} failed on invocation of onPartitionsRevoked for partitions {}",
+                listener.getClass().getName(), revokedPartitions, e);
+            return e;
         }
 
-        if (!addedTopics.isEmpty()) {
-            Set<String> newSubscription = new HashSet<>(subscriptions.subscription());
-            Set<String> newJoinedSubscription = new HashSet<>(joinedSubscription);
-            newSubscription.addAll(addedTopics);
-            newJoinedSubscription.addAll(addedTopics);
+        return null;
+    }
+
+    private Exception invokePartitionsLost(final Set<TopicPartition> lostPartitions) {
+        log.info("Lost previously assigned partitions {}", Utils.join(lostPartitions, ", "));
 
-            if (this.subscriptions.subscribeFromPattern(newSubscription))
-                metadata.requestUpdateForNewTopics();
-            this.joinedSubscription = newJoinedSubscription;
+        ConsumerRebalanceListener listener = subscriptions.rebalanceListener();
+        try {
+            listener.onPartitionsLost(lostPartitions);
+        } catch (WakeupException | InterruptException e) {
+            throw e;
+        } catch (Exception e) {
+            log.error("User provided listener {} failed on invocation of onPartitionsLost for partitions {}",
+                listener.getClass().getName(), lostPartitions, e);
+            return e;
         }
+
+        return null;
     }
 
     @Override
@@ -263,6 +319,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                                   String memberId,
                                   String assignmentStrategy,
                                   ByteBuffer assignmentBuffer) {
+        log.debug("Executing onJoinComplete with generation {} and memberId {}", generation, memberId);
+
         // only the leader is responsible for monitoring for metadata changes (i.e. partition changes)
         if (!isLeader)
             assignmentSnapshot = null;
@@ -274,7 +332,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions());
 
         Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);
-        if (!subscriptions.assignFromSubscribed(assignment.partitions())) {
+
+        Set<TopicPartition> assignedPartitions = new HashSet<>(assignment.partitions());
+
+        if (!subscriptions.checkAssignmentMatchedSubscription(assignedPartitions)) {
             log.warn("We received an assignment {} that doesn't match our current subscription {}; it is likely " +
                 "that the subscription has changed since we joined the group. Will try re-join the group with current subscription",
                 assignment.partitions(), subscriptions.prettyString());
@@ -284,8 +345,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
             return;
         }
 
-        Set<TopicPartition> assignedPartitions = subscriptions.assignedPartitions();
-
         // The leader may have assigned partitions which match our subscription pattern, but which
         // were not explicitly requested, so we update the joined subscription here.
         maybeUpdateJoinedSubscription(assignedPartitions);
@@ -299,63 +358,52 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
             this.nextAutoCommitTimer.updateAndReset(autoCommitIntervalMs);
 
         // execute the user's callback after rebalance
-        ConsumerRebalanceListener listener = subscriptions.rebalanceListener();
+        final AtomicReference<Exception> firstException = new AtomicReference<>(null);
+        Set<TopicPartition> addedPartitions = new HashSet<>(assignedPartitions);
+        addedPartitions.removeAll(ownedPartitions);
 
         switch (protocol) {
             case EAGER:
-                if (!ownedPartitions.isEmpty()) {
-                    log.info("Coordinator has owned partitions {} that are not revoked with {} protocol, " +
-                        "it is likely client is woken up before a previous pending rebalance completes its callback", ownedPartitions, protocol);
-                }
+                // assign partitions that are not yet owned
+                subscriptions.assignFromSubscribed(assignedPartitions);
+
+                firstException.compareAndSet(null, invokePartitionsAssigned(addedPartitions));
 
-                log.info("Setting newly assigned partitions: {}", Utils.join(assignedPartitions, ", "));
-                try {
-                    listener.onPartitionsAssigned(assignedPartitions);
-                } catch (WakeupException | InterruptException e) {
-                    throw e;
-                } catch (Exception e) {
-                    log.error("User provided listener {} failed on partition assignment", listener.getClass().getName(), e);
-                }
                 break;
 
             case COOPERATIVE:
-                assignAndRevoke(listener, assignedPartitions, ownedPartitions);
-
-                break;
-        }
+                Set<TopicPartition> revokedPartitions = new HashSet<>(ownedPartitions);
+                revokedPartitions.removeAll(assignedPartitions);
+
+                log.info("Updating with newly assigned partitions: {}, compare with already owned partitions: {}, " +
+                        "newly added partitions: {}, revoking partitions: {}",
+                    Utils.join(assignedPartitions, ", "),
+                    Utils.join(ownedPartitions, ", "),
+                    Utils.join(addedPartitions, ", "),
+                    Utils.join(revokedPartitions, ", "));
+
+                // revoke partitions that was previously owned but no longer assigned;
+                // note that we should only change the assignment AFTER we've triggered
+                // the revoke callback
+                if (!revokedPartitions.isEmpty()) {
+                    firstException.compareAndSet(null, invokePartitionsRevoked(revokedPartitions));
+                }
 
-    }
+                subscriptions.assignFromSubscribed(assignedPartitions);
 
-    private void assignAndRevoke(final ConsumerRebalanceListener listener,
-                                 final Set<TopicPartition> assignedPartitions,
-                                 final Set<TopicPartition> ownedPartitions) {
-        Set<TopicPartition> addedPartitions = new HashSet<>(assignedPartitions);
-        Set<TopicPartition> revokedPartitions = new HashSet<>(ownedPartitions);
-        addedPartitions.removeAll(ownedPartitions);
-        revokedPartitions.removeAll(assignedPartitions);
+                // add partitions that were not previously owned but are now assigned
+                firstException.compareAndSet(null, invokePartitionsAssigned(addedPartitions));
 
-        log.info("Updating with newly assigned partitions: {}, compare with already owned partitions: {}, " +
-                "newly added partitions: {}, revoking partitions: {}",
-            Utils.join(assignedPartitions, ", "),
-            Utils.join(ownedPartitions, ", "),
-            Utils.join(addedPartitions, ", "),
-            Utils.join(revokedPartitions, ", "));
+                // if revoked any partitions, need to re-join the group afterwards
+                if (!revokedPartitions.isEmpty()) {
+                    requestRejoin();
+                }
 
-        try {
-            listener.onPartitionsAssigned(addedPartitions);
-        } catch (WakeupException | InterruptException e) {
-            throw e;
-        } catch (Exception e) {
-            log.error("User provided listener {} failed on partition assignment", listener.getClass().getName(), e);
+                break;
         }
 
-        try {
-            listener.onPartitionsRevoked(revokedPartitions);
-        } catch (WakeupException | InterruptException e) {
-            throw e;
-        } catch (Exception e) {
-            log.error("User provided listener {} failed on partition revocation", listener.getClass().getName(), e);
-        }
+        if (firstException.get() != null)
+            throw new KafkaException("User rebalance callback throws an error", firstException.get());
     }
 
     void maybeUpdateSubscriptionMetadata() {
@@ -380,6 +428,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
      * Returns early if the timeout expires
      *
      * @param timer Timer bounding how long this method can block
+     * @throws KafkaException if the rebalance callback throws an exception
      * @return true iff the operation succeeded
      */
     public boolean poll(Timer timer) {
@@ -389,8 +438,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
         if (subscriptions.partitionsAutoAssigned()) {
             if (protocol == null) {
-                throw new IllegalStateException("User confingure ConsumerConfig#PARTITION_ASSIGNMENT_STRATEGY_CONFIG to empty " +
-                    "while trying to subscribe for group protocol to auto assign partitions");
+                throw new IllegalStateException("User configured " + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG +
+                    " to empty while trying to subscribe for group protocol to auto assign partitions");
             }
             // Always update the heartbeat last poll time so that the heartbeat thread does not leave the
             // group proactively due to application inactivity even if (say) the coordinator cannot be found.
@@ -479,11 +528,16 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
         Set<String> allSubscribedTopics = new HashSet<>();
         Map<String, Subscription> subscriptions = new HashMap<>();
+
+        // collect all the owned partitions
+        Map<String, List<TopicPartition>> ownedPartitions = new HashMap<>();
+
         for (JoinGroupResponseData.JoinGroupResponseMember memberSubscription : allSubscriptions) {
             Subscription subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberSubscription.metadata()));
             subscription.setGroupInstanceId(Optional.ofNullable(memberSubscription.groupInstanceId()));
             subscriptions.put(memberSubscription.memberId(), subscription);
             allSubscribedTopics.addAll(subscription.topics());
+            ownedPartitions.put(memberSubscription.memberId(), subscription.ownedPartitions());
         }
 
         // the leader will begin watching for changes to any of the topics the group is interested in,
@@ -496,6 +550,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
         Map<String, Assignment> assignments = assignor.assign(metadata.fetch(), new GroupSubscription(subscriptions)).groupAssignment();
 
+        if (protocol == RebalanceProtocol.COOPERATIVE) {
+            validateCooperativeAssignment(ownedPartitions, assignments);
+        }
+
         // user-customized assignor may have created some topics that are not in the subscription list
         // and assign their partitions to the members; in this case we would like to update the leader's
         // own metadata with the newly added topics so that it will not trigger a subsequent rebalance
@@ -538,51 +596,137 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         return groupAssignment;
     }
 
+    /**
+     * Used by COOPERATIVE rebalance protocol only.
+     *
+     * Validate the assignments returned by the assignor such that no owned partitions are going to
+     * be reassigned to a different consumer directly: if the assignor wants to reassign an owned partition,
+     * it must first remove it from the new assignment of the current owner so that it is not assigned to any
+     * member, and then in the next rebalance it can finally reassign those partitions not owned by anyone to consumers.
+     */
+    private void validateCooperativeAssignment(final Map<String, List<TopicPartition>> ownedPartitions,
+                                               final Map<String, Assignment> assignments) {
+        Set<TopicPartition> totalRevokedPartitions = new HashSet<>();
+        Set<TopicPartition> totalAddedPartitions = new HashSet<>();
+        for (final Map.Entry<String, Assignment> entry : assignments.entrySet()) {
+            final Assignment assignment = entry.getValue();
+            final Set<TopicPartition> addedPartitions = new HashSet<>(assignment.partitions());
+            addedPartitions.removeAll(ownedPartitions.get(entry.getKey()));
+            final Set<TopicPartition> revokedPartitions = new HashSet<>(ownedPartitions.get(entry.getKey()));
+            revokedPartitions.removeAll(assignment.partitions());
+
+            totalAddedPartitions.addAll(addedPartitions);
+            totalRevokedPartitions.addAll(revokedPartitions);
+        }
+
+        // if there are overlap between revoked partitions and added partitions, it means some partitions
+        // immediately gets re-assigned to another member while it is still claimed by some member
+        totalAddedPartitions.retainAll(totalRevokedPartitions);
+        if (!totalAddedPartitions.isEmpty()) {
+            log.error("With the COOPERATIVE protocol, owned partitions cannot be " +
+                "reassigned to other members; however the assignor has reassigned partitions {} which are still owned " +
+                "by some members; return the error code to all members to let them stop", totalAddedPartitions);
+
+            throw new IllegalStateException("Assignor supporting the COOPERATIVE protocol violates its requirements");
+        }
+    }
+
     @Override
     protected void onJoinPrepare(int generation, String memberId) {
+        log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId);
         // commit offsets prior to rebalance if auto-commit enabled
         maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs));
 
-        // execute the user's callback before rebalance
-        ConsumerRebalanceListener listener = subscriptions.rebalanceListener();
+        // the generation / member-id can possibly be reset by the heartbeat thread
+        // upon getting errors or heartbeat timeouts; in this case whatever is previously
+        // owned partitions would be lost, we should trigger the callback and cleanup the assignment;
+        // otherwise we can proceed normally and revoke the partitions depending on the protocol,
+        // and in that case we should only change the assignment AFTER the revoke callback is triggered
+        // so that users can still access the previously owned partitions to commit offsets etc.
+        Exception exception = null;
+        final Set<TopicPartition> revokedPartitions;
+        if (generation == Generation.NO_GENERATION.generationId &&
+            memberId.equals(Generation.NO_GENERATION.memberId)) {
+            revokedPartitions = new HashSet<>(subscriptions.assignedPartitions());
+
+            if (!revokedPartitions.isEmpty()) {
+                log.info("Giving away all assigned partitions as lost since generation has been reset," +
+                    "indicating that consumer is no longer part of the group");
+                exception = invokePartitionsLost(revokedPartitions);
 
-        switch (protocol) {
-            case EAGER:
-                Set<TopicPartition> revokedPartitions = new HashSet<>(subscriptions.assignedPartitions());
-                log.info("Revoking previously assigned partitions {}", revokedPartitions);
-                try {
-                    listener.onPartitionsRevoked(revokedPartitions);
-                } catch (WakeupException | InterruptException e) {
-                    throw e;
-                } catch (Exception e) {
-                    log.error("User provided listener {} failed on partition revocation", listener.getClass().getName(), e);
-                }
-
-                // also clear the assigned partitions since all have been revoked
                 subscriptions.assignFromSubscribed(Collections.emptySet());
+            }
+        } else {
+            switch (protocol) {
+                case EAGER:
+                    // revoke all partitions
+                    revokedPartitions = new HashSet<>(subscriptions.assignedPartitions());
+                    exception = invokePartitionsRevoked(revokedPartitions);
 
-                break;
+                    subscriptions.assignFromSubscribed(Collections.emptySet());
 
-            case COOPERATIVE:
-                break;
+                    break;
+
+                case COOPERATIVE:
+                    // only revoke those partitions that are not in the subscription any more.
+                    Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions());
+                    revokedPartitions = ownedPartitions.stream()
+                        .filter(tp -> !subscriptions.subscription().contains(tp.topic()))
+                        .collect(Collectors.toSet());
+
+                    if (!revokedPartitions.isEmpty()) {
+                        exception = invokePartitionsRevoked(revokedPartitions);
+
+                        ownedPartitions.removeAll(revokedPartitions);
+                        subscriptions.assignFromSubscribed(ownedPartitions);
+                    }
+
+                    break;
+            }
         }
 
+
         isLeader = false;
         subscriptions.resetGroupSubscription();
+
+        if (exception != null) {
+            throw new KafkaException("User rebalance callback throws an error", exception);
+        }
+    }
+
+    @Override
+    public void onLeavePrepare() {
+        // we should reset assignment and trigger the callback before leaving group
+        Set<TopicPartition> droppedPartitions = new HashSet<>(subscriptions.assignedPartitions());
+
+        if (subscriptions.partitionsAutoAssigned() && !droppedPartitions.isEmpty()) {
+            final Exception e = invokePartitionsRevoked(droppedPartitions);
+
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+
+            if (e != null) {
+                throw new KafkaException("User rebalance callback throws an error", e);
+            }
+        }
     }
 
+    /**
+     * @throws KafkaException if the callback throws exception
+     */
     @Override
     public boolean rejoinNeededOrPending() {
         if (!subscriptions.partitionsAutoAssigned())
             return false;
 
-        // we need to rejoin if we performed the assignment and metadata has changed
+        // we need to rejoin if we performed the assignment and metadata has changed;
+        // also for those owned-but-no-longer-existed partitions we should drop them as lost
         if (assignmentSnapshot != null && !assignmentSnapshot.matches(metadataSnapshot))
             return true;
 
         // we need to join if our subscription has changed since the last join
-        if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription()))
+        if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription())) {
             return true;
+        }
 
         return super.rejoinNeededOrPending();
     }
@@ -661,6 +805,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         return null;
     }
 
+    /**
+     * @throws KafkaException if the rebalance callback throws exception
+     */
     public void close(final Timer timer) {
         // we do not need to re-enable wakeups since we are closing already
         client.disableWakeups();
@@ -990,7 +1137,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                         } else if (error == Errors.UNKNOWN_MEMBER_ID
                                 || error == Errors.ILLEGAL_GENERATION) {
                             // need to reset generation and re-join group
-                            resetGeneration();
+                            resetGenerationOnResponseError(ApiKeys.OFFSET_COMMIT, error);
                             future.raise(new CommitFailedException());
                             return;
                         } else {
@@ -1121,8 +1268,11 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
         private MetadataSnapshot(SubscriptionState subscription, Cluster cluster, int version) {
             Map<String, Integer> partitionsPerTopic = new HashMap<>();
-            for (String topic : subscription.groupSubscription())
-                partitionsPerTopic.put(topic, cluster.partitionCountForTopic(topic));
+            for (String topic : subscription.groupSubscription()) {
+                Integer numPartitions = cluster.partitionCountForTopic(topic);
+                if (numPartitions != null)
+                    partitionsPerTopic.put(topic, numPartitions);
+            }
             this.partitionsPerTopic = partitionsPerTopic;
             this.version = version;
         }
@@ -1130,6 +1280,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         boolean matches(MetadataSnapshot other) {
             return version == other.version || partitionsPerTopic.equals(other.partitionsPerTopic);
         }
+
+        Map<String, Integer> partitionsPerTopic() {
+            return partitionsPerTopic;
+        }
     }
 
     private static class OffsetCommitCompletion {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
index 1ecb15c..a66921d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java
@@ -77,7 +77,6 @@ public interface PartitionAssignor {
         onAssignment(assignment);
     }
 
-
     /**
      * Unique name for this assignor (e.g. "range" or "roundrobin" or "sticky")
      * @return non-null unique name
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index af834ce..7c965ae 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -238,42 +238,42 @@ public class SubscriptionState {
     }
 
     /**
-     * Change the assignment to the specified partitions returned from the coordinator, note this is
-     * different from {@link #assignFromUser(Set)} which directly set the assignment from user inputs.
-     *
      * @return true if assignments matches subscription, otherwise false
      */
-    public synchronized boolean assignFromSubscribed(Collection<TopicPartition> assignments) {
-        if (!this.partitionsAutoAssigned())
-            throw new IllegalArgumentException("Attempt to dynamically assign partitions while manual assignment in use");
-
-        boolean assignmentMatchedSubscription = true;
+    public synchronized boolean checkAssignmentMatchedSubscription(Collection<TopicPartition> assignments) {
         for (TopicPartition topicPartition : assignments) {
             if (this.subscribedPattern != null) {
-                assignmentMatchedSubscription = this.subscribedPattern.matcher(topicPartition.topic()).matches();
-                if (!assignmentMatchedSubscription) {
+                if (!this.subscribedPattern.matcher(topicPartition.topic()).matches()) {
                     log.info("Assigned partition {} for non-subscribed topic regex pattern; subscription pattern is {}",
-                            topicPartition,
-                            this.subscribedPattern);
-                    break;
+                        topicPartition,
+                        this.subscribedPattern);
+
+                    return false;
                 }
             } else {
-                assignmentMatchedSubscription = this.subscription.contains(topicPartition.topic());
-                if (!assignmentMatchedSubscription) {
+                if (!this.subscription.contains(topicPartition.topic())) {
                     log.info("Assigned partition {} for non-subscribed topic; subscription is {}", topicPartition, this.subscription);
-                    break;
+
+                    return false;
                 }
             }
         }
 
-        if (assignmentMatchedSubscription) {
-            Map<TopicPartition, TopicPartitionState> assignedPartitionStates = partitionToStateMap(
-                    assignments);
-            assignmentId++;
-            this.assignment.set(assignedPartitionStates);
-        }
+        return true;
+    }
 
-        return assignmentMatchedSubscription;
+    /**
+     * Change the assignment to the specified partitions returned from the coordinator, note this is
+     * different from {@link #assignFromUser(Set)} which directly set the assignment from user inputs.
+     */
+    public synchronized void assignFromSubscribed(Collection<TopicPartition> assignments) {
+        if (!this.partitionsAutoAssigned())
+            throw new IllegalArgumentException("Attempt to dynamically assign partitions while manual assignment in use");
+
+
+        Map<TopicPartition, TopicPartitionState> assignedPartitionStates = partitionToStateMap(assignments);
+        assignmentId++;
+        this.assignment.set(assignedPartitionStates);
     }
 
     private void registerRebalanceListener(ConsumerRebalanceListener listener) {
@@ -652,8 +652,13 @@ public class SubscriptionState {
     }
 
     synchronized void requestFailed(Set<TopicPartition> partitions, long nextRetryTimeMs) {
-        for (TopicPartition partition : partitions)
-            assignedState(partition).requestFailed(nextRetryTimeMs);
+        for (TopicPartition partition : partitions) {
+            // by the time the request failed, the assignment may no longer
+            // contain this partition any more, in which case we would just ignore.
+            final TopicPartitionState state = assignedStateOrNull(partition);
+            if (state != null)
+                state.requestFailed(nextRetryTimeMs);
+        }
     }
 
     synchronized void movePartitionToEnd(TopicPartition tp) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 1227c27..7892d6d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1332,7 +1332,7 @@ public class KafkaConsumerTest {
     }
 
     @Test
-    public void closeShouldBeIdempotent() {
+    public void testCloseShouldBeIdempotent() {
         KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) null);
         consumer.close();
         consumer.close();
@@ -1419,7 +1419,7 @@ public class KafkaConsumerTest {
     }
 
     @Test
-    public void shouldAttemptToRejoinGroupAfterSyncGroupFailed() throws Exception {
+    public void testShouldAttemptToRejoinGroupAfterSyncGroupFailed() throws Exception {
         Time time = new MockTime();
         SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
         ConsumerMetadata metadata = createMetadata(subscription);
@@ -1639,6 +1639,52 @@ public class KafkaConsumerTest {
         consumer.committed(tp0);
     }
 
+    @Test
+    public void testRebalanceException() {
+        Time time = new MockTime();
+        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
+        ConsumerMetadata metadata = createMetadata(subscription);
+        MockClient client = new MockClient(time, metadata);
+
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
+
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
+
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+
+        consumer.subscribe(singleton(topic), getExceptionConsumerRebalanceListener());
+        Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
+
+        client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
+        client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE), coordinator);
+        client.prepareResponseFrom(syncGroupResponse(singletonList(tp0), Errors.NONE), coordinator);
+
+        // assign throws
+        try {
+            consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
+            fail("Should throw exception");
+        } catch (Throwable e) {
+            assertEquals("boom!", e.getCause().getMessage());
+        }
+
+        // the assignment is still updated regardless of the exception
+        assertEquals(singleton(tp0), subscription.assignedPartitions());
+
+        // close's revoke throws
+        try {
+            consumer.close(Duration.ofMillis(0));
+            fail("Should throw exception");
+        } catch (Throwable e) {
+            assertEquals("boom!", e.getCause().getCause().getMessage());
+        }
+
+        consumer.close(Duration.ofMillis(0));
+
+        // the assignment is still updated regardless of the exception
+        assertTrue(subscription.assignedPartitions().isEmpty());
+    }
+
     private KafkaConsumer<String, String> consumerWithPendingAuthenticationError() {
         Time time = new MockTime();
         SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
@@ -1669,6 +1715,20 @@ public class KafkaConsumerTest {
         };
     }
 
+    private ConsumerRebalanceListener getExceptionConsumerRebalanceListener() {
+        return new ConsumerRebalanceListener() {
+            @Override
+            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+                throw new RuntimeException("boom!");
+            }
+
+            @Override
+            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+                throw new RuntimeException("boom!");
+            }
+        };
+    }
+
     private ConsumerMetadata createMetadata(SubscriptionState subscription) {
         return new ConsumerMetadata(0, Long.MAX_VALUE, false, false,
                                     subscription, new LogContext(), new ClusterResourceListeners());
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java
index 02fb9ff..e762280 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java
@@ -188,14 +188,10 @@ public class RoundRobinAssignorTest {
         partitionsPerTopic.put(topic2, 3);
 
         Map<String, Subscription> consumers = new HashMap<>();
-        Subscription consumer1Subscription = new Subscription(topics(topic1, topic2),
-                                                              null,
-                                                              Collections.emptyList());
+        Subscription consumer1Subscription = new Subscription(topics(topic1, topic2), null);
         consumer1Subscription.setGroupInstanceId(Optional.of(instance1));
         consumers.put(consumer1, consumer1Subscription);
-        Subscription consumer2Subscription = new Subscription(topics(topic1, topic2),
-                                                              null,
-                                                              Collections.emptyList());
+        Subscription consumer2Subscription = new Subscription(topics(topic1, topic2), null);
         consumer2Subscription.setGroupInstanceId(Optional.of(instance2));
         consumers.put(consumer2, consumer2Subscription);
         Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers);
@@ -219,9 +215,7 @@ public class RoundRobinAssignorTest {
 
         Map<String, Subscription> consumers = new HashMap<>();
 
-        Subscription consumer1Subscription = new Subscription(topics(topic1, topic2),
-                                                              null,
-                                                              Collections.emptyList());
+        Subscription consumer1Subscription = new Subscription(topics(topic1, topic2), null);
         consumer1Subscription.setGroupInstanceId(Optional.of(instance1));
         consumers.put(consumer1, consumer1Subscription);
         consumers.put(consumer2, new Subscription(topics(topic1, topic2)));
@@ -257,9 +251,7 @@ public class RoundRobinAssignorTest {
         partitionsPerTopic.put(topic2, 3);
         Map<String, Subscription> consumers = new HashMap<>();
         for (MemberInfo m : staticMemberInfos) {
-            Subscription subscription = new Subscription(topics(topic1, topic2),
-                                                         null,
-                                                         Collections.emptyList());
+            Subscription subscription = new Subscription(topics(topic1, topic2), null);
             subscription.setGroupInstanceId(m.groupInstanceId);
             consumers.put(m.memberId, subscription);
         }
@@ -333,9 +325,7 @@ public class RoundRobinAssignorTest {
         partitionsPerTopic.put(topic2, 3);
         Map<String, Subscription> consumers = new HashMap<>();
         for (MemberInfo m : staticMemberInfos) {
-            Subscription subscription = new Subscription(topics(topic1, topic2),
-                                                         null,
-                                                         Collections.emptyList());
+            Subscription subscription = new Subscription(topics(topic1, topic2), null);
             subscription.setGroupInstanceId(m.groupInstanceId);
             consumers.put(m.memberId, subscription);
         }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 11273cb..91d3529 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -52,6 +52,7 @@ import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest;
@@ -100,6 +101,8 @@ import java.util.regex.Pattern;
 import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
+import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE;
+import static org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol.EAGER;
 import static org.apache.kafka.test.TestUtils.toSet;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -201,18 +204,18 @@ public class ConsumerCoordinatorTest {
     public void testSelectRebalanceProtcol() {
         List<ConsumerPartitionAssignor> assignors = new ArrayList<>();
         assignors.add(new MockPartitionAssignor(Collections.singletonList(ConsumerPartitionAssignor.RebalanceProtocol.EAGER)));
-        assignors.add(new MockPartitionAssignor(Collections.singletonList(ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE)));
+        assignors.add(new MockPartitionAssignor(Collections.singletonList(COOPERATIVE)));
 
         // no commonly supported protocols
         assertThrows(IllegalArgumentException.class, () -> buildCoordinator(rebalanceConfig, new Metrics(), assignors, false));
 
         assignors.clear();
-        assignors.add(new MockPartitionAssignor(Arrays.asList(ConsumerPartitionAssignor.RebalanceProtocol.EAGER, ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE)));
-        assignors.add(new MockPartitionAssignor(Arrays.asList(ConsumerPartitionAssignor.RebalanceProtocol.EAGER, ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE)));
+        assignors.add(new MockPartitionAssignor(Arrays.asList(ConsumerPartitionAssignor.RebalanceProtocol.EAGER, COOPERATIVE)));
+        assignors.add(new MockPartitionAssignor(Arrays.asList(ConsumerPartitionAssignor.RebalanceProtocol.EAGER, COOPERATIVE)));
 
         // select higher indexed (more advanced) protocols
         try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, false)) {
-            assertEquals(ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE, coordinator.getProtocol());
+            assertEquals(COOPERATIVE, coordinator.getProtocol());
         }
     }
 
@@ -247,7 +250,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
-        client.prepareResponse(joinGroupLeaderResponse(0, "memberId", Collections.<String, List<String>>emptyMap(),
+        client.prepareResponse(joinGroupLeaderResponse(0, "memberId", Collections.emptyMap(),
                 Errors.GROUP_AUTHORIZATION_FAILED));
         coordinator.poll(time.timer(Long.MAX_VALUE));
     }
@@ -274,7 +277,7 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test
-    public void testManyInFlightAsyncCommitsWithCoordinatorDisconnect() throws Exception {
+    public void testManyInFlightAsyncCommitsWithCoordinatorDisconnect() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
@@ -302,7 +305,7 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test
-    public void testCoordinatorUnknownInUnsentCallbacksAfterCoordinatorDead() throws Exception {
+    public void testCoordinatorUnknownInUnsentCallbacksAfterCoordinatorDead() {
         // When the coordinator is marked dead, all unsent or in-flight requests are cancelled
         // with a disconnect error. This test case ensures that the corresponding callbacks see
         // the coordinator as unknown which prevents additional retries to the same coordinator.
@@ -389,10 +392,15 @@ public class ConsumerCoordinatorTest {
         assertTrue(future.failed());
         assertEquals(Errors.ILLEGAL_GENERATION.exception(), future.exception());
         assertTrue(coordinator.rejoinNeededOrPending());
+
+        coordinator.poll(time.timer(0));
+
+        assertEquals(1, rebalanceListener.lostCount);
+        assertEquals(Collections.singleton(t1p), rebalanceListener.lost);
     }
 
     @Test
-    public void testUnknownConsumerId() {
+    public void testUnknownMemberId() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
@@ -413,6 +421,11 @@ public class ConsumerCoordinatorTest {
         assertTrue(future.failed());
         assertEquals(Errors.UNKNOWN_MEMBER_ID.exception(), future.exception());
         assertTrue(coordinator.rejoinNeededOrPending());
+
+        coordinator.poll(time.timer(0));
+
+        assertEquals(1, rebalanceListener.lostCount);
+        assertEquals(Collections.singleton(t1p), rebalanceListener.lost);
     }
 
     @Test
@@ -448,7 +461,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
-        client.prepareResponse(joinGroupLeaderResponse(0, consumerId, Collections.<String, List<String>>emptyMap(),
+        client.prepareResponse(joinGroupLeaderResponse(0, consumerId, Collections.emptyMap(),
                 Errors.INVALID_GROUP_ID));
         coordinator.poll(time.timer(Long.MAX_VALUE));
     }
@@ -484,8 +497,8 @@ public class ConsumerCoordinatorTest {
         assertFalse(coordinator.rejoinNeededOrPending());
         assertEquals(toSet(assigned), subscriptions.assignedPartitions());
         assertEquals(subscription, subscriptions.groupSubscription());
-        assertEquals(1, rebalanceListener.revokedCount);
-        assertEquals(getRevoked(owned, assigned), rebalanceListener.revoked);
+        assertEquals(0, rebalanceListener.revokedCount);
+        assertNull(rebalanceListener.revoked);
         assertEquals(1, rebalanceListener.assignedCount);
         assertEquals(getAdded(owned, assigned), rebalanceListener.assigned);
     }
@@ -538,23 +551,13 @@ public class ConsumerCoordinatorTest {
 
         coordinator.poll(time.timer(Long.MAX_VALUE));
 
-        final Collection<TopicPartition> revoked = getRevoked(owned, newAssignment);
         final Collection<TopicPartition> assigned = getAdded(owned, newAssignment);
 
-        int revokeCount = 1;
-        final int addCount = 1;
-
-        // with eager protocol we will call revoke on the old assignment as well
-        if (protocol == ConsumerPartitionAssignor.RebalanceProtocol.EAGER) {
-            revokeCount += 1;
-        }
-
         assertFalse(coordinator.rejoinNeededOrPending());
         assertEquals(toSet(newAssignment), subscriptions.assignedPartitions());
         assertEquals(toSet(newSubscription), subscriptions.groupSubscription());
-        assertEquals(revokeCount, rebalanceListener.revokedCount);
-        assertEquals(revoked, rebalanceListener.revoked);
-        assertEquals(addCount, rebalanceListener.assignedCount);
+        assertEquals(protocol == EAGER ? 1 : 0, rebalanceListener.revokedCount);
+        assertEquals(1, rebalanceListener.assignedCount);
         assertEquals(assigned, rebalanceListener.assigned);
     }
 
@@ -593,8 +596,9 @@ public class ConsumerCoordinatorTest {
         assertEquals(2, subscriptions.numAssignedPartitions());
         assertEquals(2, subscriptions.groupSubscription().size());
         assertEquals(2, subscriptions.subscription().size());
-        assertEquals(1, rebalanceListener.revokedCount);
-        assertEquals(getRevoked(owned, assigned), rebalanceListener.revoked);
+        // callback not triggered at all since there's nothing to be revoked
+        assertEquals(0, rebalanceListener.revokedCount);
+        assertNull(rebalanceListener.revoked);
         assertEquals(1, rebalanceListener.assignedCount);
         assertEquals(getAdded(owned, assigned), rebalanceListener.assigned);
     }
@@ -604,8 +608,6 @@ public class ConsumerCoordinatorTest {
         final String consumerId = "leader";
         final List<TopicPartition> owned = Collections.emptyList();
         final List<TopicPartition> oldAssigned = Arrays.asList(t1p);
-
-
         subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
         client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap(topic1, 1)));
         coordinator.maybeUpdateSubscriptionMetadata();
@@ -635,14 +637,15 @@ public class ConsumerCoordinatorTest {
         assertFalse(coordinator.rejoinNeededOrPending());
         assertEquals(singleton(topic1), subscriptions.subscription());
         assertEquals(toSet(oldAssigned), subscriptions.assignedPartitions());
-        assertEquals(1, rebalanceListener.revokedCount);
-        assertEquals(getRevoked(owned, oldAssigned), rebalanceListener.revoked);
+        // nothing to be revoked and hence no callback triggered
+        assertEquals(0, rebalanceListener.revokedCount);
+        assertNull(rebalanceListener.revoked);
         assertEquals(1, rebalanceListener.assignedCount);
         assertEquals(getAdded(owned, oldAssigned), rebalanceListener.assigned);
 
         List<TopicPartition> newAssigned = Arrays.asList(t1p, t2p);
 
-        Map<String, List<String>> updatedSubscriptions = singletonMap(consumerId, Arrays.asList(topic1, topic2));
+        final Map<String, List<String>> updatedSubscriptions = singletonMap(consumerId, Arrays.asList(topic1, topic2));
         partitionAssignor.prepare(singletonMap(consumerId, newAssigned));
 
         // we expect to see a second rebalance with the new-found topics
@@ -658,17 +661,63 @@ public class ConsumerCoordinatorTest {
             metadata.rewind();
             return subscription.topics().containsAll(updatedSubscription);
         }, joinGroupLeaderResponse(2, consumerId, updatedSubscriptions, Errors.NONE));
-        client.prepareResponse(syncGroupResponse(newAssigned, Errors.NONE));
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            // update the metadata again back to topic1
+            @Override
+            public boolean matches(AbstractRequest body) {
+                client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap(topic1, 1)));
+                return true;
+            }
+        }, syncGroupResponse(newAssigned, Errors.NONE));
 
         coordinator.poll(time.timer(Long.MAX_VALUE));
 
+        Collection<TopicPartition> revoked = getRevoked(oldAssigned, newAssigned);
+        int revokedCount = revoked.isEmpty() ? 0 : 1;
+
         assertFalse(coordinator.rejoinNeededOrPending());
         assertEquals(toSet(updatedSubscription), subscriptions.subscription());
         assertEquals(toSet(newAssigned), subscriptions.assignedPartitions());
-        assertEquals(2, rebalanceListener.revokedCount);
-        assertEquals(getRevoked(oldAssigned, newAssigned), rebalanceListener.revoked);
+        assertEquals(revokedCount, rebalanceListener.revokedCount);
+        assertEquals(revoked.isEmpty() ? null : revoked, rebalanceListener.revoked);
         assertEquals(2, rebalanceListener.assignedCount);
         assertEquals(getAdded(oldAssigned, newAssigned), rebalanceListener.assigned);
+
+        // we expect to see a third rebalance with the new-found topics
+        partitionAssignor.prepare(singletonMap(consumerId, oldAssigned));
+
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                JoinGroupRequest join = (JoinGroupRequest) body;
+                Iterator<JoinGroupRequestData.JoinGroupRequestProtocol> protocolIterator =
+                    join.data().protocols().iterator();
+                assertTrue(protocolIterator.hasNext());
+                JoinGroupRequestData.JoinGroupRequestProtocol protocolMetadata = protocolIterator.next();
+
+                ByteBuffer metadata = ByteBuffer.wrap(protocolMetadata.metadata());
+                ConsumerPartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(metadata);
+                metadata.rewind();
+                return subscription.topics().contains(topic1);
+            }
+        }, joinGroupLeaderResponse(3, consumerId, initialSubscription, Errors.NONE));
+        client.prepareResponse(syncGroupResponse(oldAssigned, Errors.NONE));
+
+        coordinator.poll(time.timer(Long.MAX_VALUE));
+
+        revoked = getRevoked(newAssigned, oldAssigned);
+        assertFalse(revoked.isEmpty());
+        revokedCount += 1;
+        Collection<TopicPartition> added = getAdded(newAssigned, oldAssigned);
+
+        assertFalse(coordinator.rejoinNeededOrPending());
+        assertEquals(singleton(topic1), subscriptions.subscription());
+        assertEquals(toSet(oldAssigned), subscriptions.assignedPartitions());
+        assertEquals(revokedCount, rebalanceListener.revokedCount);
+        assertEquals(revoked.isEmpty() ? null : revoked, rebalanceListener.revoked);
+        assertEquals(3, rebalanceListener.assignedCount);
+        assertEquals(added, rebalanceListener.assigned);
+        assertEquals(0, rebalanceListener.lostCount);
     }
 
     @Test
@@ -745,8 +794,8 @@ public class ConsumerCoordinatorTest {
 
         assertFalse(coordinator.rejoinNeededOrPending());
         assertEquals(toSet(assigned), subscriptions.assignedPartitions());
-        assertEquals(1, rebalanceListener.revokedCount);
-        assertEquals(getRevoked(owned, assigned), rebalanceListener.revoked);
+        assertEquals(0, rebalanceListener.revokedCount);
+        assertNull(rebalanceListener.revoked);
         assertEquals(1, rebalanceListener.assignedCount);
         assertEquals(getAdded(owned, assigned), rebalanceListener.assigned);
     }
@@ -777,8 +826,8 @@ public class ConsumerCoordinatorTest {
         assertFalse(coordinator.rejoinNeededOrPending());
         assertEquals(toSet(assigned), subscriptions.assignedPartitions());
         assertEquals(subscription, subscriptions.groupSubscription());
-        assertEquals(1, rebalanceListener.revokedCount);
-        assertEquals(getRevoked(owned, assigned), rebalanceListener.revoked);
+        assertEquals(0, rebalanceListener.revokedCount);
+        assertNull(rebalanceListener.revoked);
         assertEquals(1, rebalanceListener.assignedCount);
         assertEquals(getAdded(owned, assigned), rebalanceListener.assigned);
     }
@@ -844,8 +893,8 @@ public class ConsumerCoordinatorTest {
         assertFalse(coordinator.rejoinNeededOrPending());
         assertEquals(assigned.size(), subscriptions.numAssignedPartitions());
         assertEquals(subscription, subscriptions.subscription());
-        assertEquals(1, rebalanceListener.revokedCount);
-        assertEquals(getRevoked(owned, assigned), rebalanceListener.revoked);
+        assertEquals(0, rebalanceListener.revokedCount);
+        assertNull(rebalanceListener.revoked);
         assertEquals(1, rebalanceListener.assignedCount);
         assertEquals(getAdded(owned, assigned), rebalanceListener.assigned);
     }
@@ -955,7 +1004,7 @@ public class ConsumerCoordinatorTest {
 
         // join initially, but let coordinator returns unknown member id
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
-        client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.UNKNOWN_MEMBER_ID));
+        client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.UNKNOWN_MEMBER_ID));
 
         // now we should see a new join with the empty UNKNOWN_MEMBER_ID
         client.prepareResponse(body -> {
@@ -981,7 +1030,7 @@ public class ConsumerCoordinatorTest {
 
         // join initially, but let coordinator rebalance on sync
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
-        client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.REBALANCE_IN_PROGRESS));
+        client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.REBALANCE_IN_PROGRESS));
 
         // then let the full join/sync finish successfully
         client.prepareResponse(joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE));
@@ -1004,7 +1053,7 @@ public class ConsumerCoordinatorTest {
 
         // join initially, but let coordinator rebalance on sync
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
-        client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.ILLEGAL_GENERATION));
+        client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.ILLEGAL_GENERATION));
 
         // then let the full join/sync finish successfully
         client.prepareResponse(body -> {
@@ -1140,7 +1189,7 @@ public class ConsumerCoordinatorTest {
         coordinator.poll(time.timer(Long.MAX_VALUE));
 
         assertFalse(coordinator.rejoinNeededOrPending());
-        assertEquals(1, rebalanceListener.revokedCount);
+        assertEquals(0, rebalanceListener.revokedCount);
         assertEquals(2, rebalanceListener.assignedCount);
     }
 
@@ -1180,6 +1229,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.NONE));
         coordinator.poll(time.timer(Long.MAX_VALUE));
         assertFalse(coordinator.rejoinNeededOrPending());
+        // callback not triggered since there's nothing to be assigned
         assertEquals(Collections.emptySet(), rebalanceListener.assigned);
         assertTrue("Metadata refresh not requested for unavailable partitions", metadata.updateRequested());
 
@@ -1242,8 +1292,8 @@ public class ConsumerCoordinatorTest {
         // join the group once
         joinAsFollowerAndReceiveAssignment("consumer", coordinator, assigned);
 
-        assertEquals(1, rebalanceListener.revokedCount);
-        assertEquals(getRevoked(owned, assigned), rebalanceListener.revoked);
+        assertEquals(0, rebalanceListener.revokedCount);
+        assertNull(rebalanceListener.revoked);
         assertEquals(1, rebalanceListener.assignedCount);
         assertEquals(getAdded(owned, assigned), rebalanceListener.assigned);
 
@@ -1255,10 +1305,12 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(syncGroupResponse(assigned, Errors.NONE));
         coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
 
-        assertEquals(2, rebalanceListener.revokedCount);
-        assertEquals(getRevoked(assigned, assigned), rebalanceListener.revoked);
+        Collection<TopicPartition> revoked = getRevoked(assigned, assigned);
+        Collection<TopicPartition> added = getAdded(assigned, assigned);
+        assertEquals(revoked.isEmpty() ? 0 : 1, rebalanceListener.revokedCount);
+        assertEquals(revoked.isEmpty() ? null : revoked, rebalanceListener.revoked);
         assertEquals(2, rebalanceListener.assignedCount);
-        assertEquals(getAdded(assigned, assigned), rebalanceListener.assigned);
+        assertEquals(added, rebalanceListener.assigned);
     }
 
     @Test
@@ -1279,8 +1331,9 @@ public class ConsumerCoordinatorTest {
 
         assertFalse(coordinator.rejoinNeededOrPending());
         assertEquals(toSet(assigned), subscriptions.assignedPartitions());
-        assertEquals(1, rebalanceListener.revokedCount);
-        assertEquals(getRevoked(owned, assigned), rebalanceListener.revoked);
+        // nothing to be revoked hence callback not triggered
+        assertEquals(0, rebalanceListener.revokedCount);
+        assertNull(rebalanceListener.revoked);
         assertEquals(1, rebalanceListener.assignedCount);
         assertEquals(getAdded(owned, assigned), rebalanceListener.assigned);
     }
@@ -1348,8 +1401,7 @@ public class ConsumerCoordinatorTest {
     public void testAutoCommitDynamicAssignment() {
         final String consumerId = "consumer";
 
-        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors,
-            true)
+        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)
         ) {
             subscriptions.subscribe(singleton(topic1), rebalanceListener);
             joinAsFollowerAndReceiveAssignment(consumerId, coordinator, singletonList(t1p));
@@ -1365,8 +1417,7 @@ public class ConsumerCoordinatorTest {
     public void testAutoCommitRetryBackoff() {
         final String consumerId = "consumer";
 
-        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors,
-            true)) {
+        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)) {
             subscriptions.subscribe(singleton(topic1), rebalanceListener);
             joinAsFollowerAndReceiveAssignment(consumerId, coordinator, singletonList(t1p));
 
@@ -1439,8 +1490,7 @@ public class ConsumerCoordinatorTest {
     public void testAutoCommitDynamicAssignmentRebalance() {
         final String consumerId = "consumer";
 
-        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors,
-            true)) {
+        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)) {
             subscriptions.subscribe(singleton(topic1), rebalanceListener);
             client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
             coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -1657,7 +1707,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
-        final List<OffsetAndMetadata> committedOffsets = Collections.synchronizedList(new ArrayList<OffsetAndMetadata>());
+        final List<OffsetAndMetadata> committedOffsets = Collections.synchronizedList(new ArrayList<>());
         final OffsetAndMetadata firstOffset = new OffsetAndMetadata(0L);
         final OffsetAndMetadata secondOffset = new OffsetAndMetadata(1L);
 
@@ -1929,7 +1979,7 @@ public class ConsumerCoordinatorTest {
         assertEquals(Collections.singleton(t1p), subscriptions.missingFetchPositions());
         assertEquals(Collections.emptySet(), subscriptions.partitionsNeedingReset(time.milliseconds()));
         assertFalse(subscriptions.hasAllFetchPositions());
-        assertEquals(null, subscriptions.position(t1p));
+        assertNull(subscriptions.position(t1p));
     }
 
     @Test
@@ -1977,7 +2027,7 @@ public class ConsumerCoordinatorTest {
     public void testThreadSafeAssignedPartitionsMetric() throws Exception {
         // Get the assigned-partitions metric
         final Metric metric = metrics.metric(new MetricName("assigned-partitions", "consumer" + groupId + "-coordinator-metrics",
-                "", Collections.<String, String>emptyMap()));
+                "", Collections.emptyMap()));
 
         // Start polling the metric in the background
         final AtomicBoolean doStop = new AtomicBoolean();
@@ -2203,8 +2253,9 @@ public class ConsumerCoordinatorTest {
             client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
             client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
             coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
-        } else
+        } else {
             subscriptions.assignFromUser(singleton(t1p));
+        }
 
         subscriptions.seek(t1p, 100);
         coordinator.poll(time.timer(Long.MAX_VALUE));
@@ -2277,6 +2328,11 @@ public class ConsumerCoordinatorTest {
         coordinator.close();
         assertTrue("Commit not requested", commitRequested.get());
         assertEquals("leaveGroupRequested should be " + shouldLeaveGroup, shouldLeaveGroup, leaveGroupRequested.get());
+
+        if (shouldLeaveGroup) {
+            assertEquals(1, rebalanceListener.revokedCount);
+            assertEquals(singleton(t1p), rebalanceListener.revoked);
+        }
     }
 
     private ConsumerCoordinator buildCoordinator(final GroupRebalanceConfig rebalanceConfig,
@@ -2488,12 +2544,13 @@ public class ConsumerCoordinatorTest {
     }
 
     private static class MockRebalanceListener implements ConsumerRebalanceListener {
+        public Collection<TopicPartition> lost;
         public Collection<TopicPartition> revoked;
         public Collection<TopicPartition> assigned;
+        public int lostCount = 0;
         public int revokedCount = 0;
         public int assignedCount = 0;
 
-
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
             this.assigned = partitions;
@@ -2506,5 +2563,10 @@ public class ConsumerCoordinatorTest {
             revokedCount++;
         }
 
+        @Override
+        public void onPartitionsLost(Collection<TopicPartition> partitions) {
+            this.lost = partitions;
+            lostCount++;
+        }
     }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index 484b9de..c3ce02a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -62,7 +62,7 @@ public class SubscriptionStateTest {
         state.seek(tp0, 1);
         assertTrue(state.isFetchable(tp0));
         assertEquals(1L, state.position(tp0).offset);
-        state.assignFromUser(Collections.<TopicPartition>emptySet());
+        state.assignFromUser(Collections.emptySet());
         assertTrue(state.assignedPartitions().isEmpty());
         assertEquals(0, state.numAssignedPartitions());
         assertFalse(state.isAssigned(tp0));
@@ -88,7 +88,8 @@ public class SubscriptionStateTest {
         assertTrue(state.assignedPartitions().isEmpty());
         assertEquals(0, state.numAssignedPartitions());
 
-        assertTrue(state.assignFromSubscribed(singleton(t1p0)));
+        assertTrue(state.checkAssignmentMatchedSubscription(singleton(t1p0)));
+        state.assignFromSubscribed(singleton(t1p0));
         // assigned partitions should immediately change
         assertEquals(singleton(t1p0), state.assignedPartitions());
         assertEquals(1, state.numAssignedPartitions());
@@ -116,13 +117,17 @@ public class SubscriptionStateTest {
         assertTrue(state.assignedPartitions().isEmpty());
         assertEquals(0, state.numAssignedPartitions());
 
-        assertTrue(state.assignFromSubscribed(singleton(tp1)));
+        assertTrue(state.checkAssignmentMatchedSubscription(singleton(tp1)));
+        state.assignFromSubscribed(singleton(tp1));
+
         // assigned partitions should immediately change
         assertEquals(singleton(tp1), state.assignedPartitions());
         assertEquals(1, state.numAssignedPartitions());
         assertEquals(singleton(topic), state.subscription());
 
-        assertTrue(state.assignFromSubscribed(Collections.singletonList(t1p0)));
+        assertTrue(state.checkAssignmentMatchedSubscription(singleton(t1p0)));
+        state.assignFromSubscribed(singleton(t1p0));
+
         // assigned partitions should immediately change
         assertEquals(singleton(t1p0), state.assignedPartitions());
         assertEquals(1, state.numAssignedPartitions());
@@ -138,7 +143,9 @@ public class SubscriptionStateTest {
         assertEquals(singleton(t1p0), state.assignedPartitions());
         assertEquals(1, state.numAssignedPartitions());
 
-        assertTrue(state.assignFromSubscribed(Collections.singletonList(tp0)));
+        assertTrue(state.checkAssignmentMatchedSubscription(singleton(tp0)));
+        state.assignFromSubscribed(singleton(tp0));
+
         // assigned partitions should immediately change
         assertEquals(singleton(tp0), state.assignedPartitions());
         assertEquals(1, state.numAssignedPartitions());
@@ -164,7 +171,8 @@ public class SubscriptionStateTest {
 
         Set<TopicPartition> autoAssignment = Utils.mkSet(t1p0);
         state.subscribe(singleton(topic1), rebalanceListener);
-        assertTrue(state.assignFromSubscribed(autoAssignment));
+        assertTrue(state.checkAssignmentMatchedSubscription(autoAssignment));
+        state.assignFromSubscribed(autoAssignment);
         assertEquals(3, state.assignmentId());
         assertEquals(autoAssignment, state.assignedPartitions());
     }
@@ -192,10 +200,14 @@ public class SubscriptionStateTest {
         assertTrue(state.assignedPartitions().isEmpty());
         assertEquals(0, state.numAssignedPartitions());
         assertTrue(state.partitionsAutoAssigned());
-        assertTrue(state.assignFromSubscribed(singleton(tp0)));
+        assertTrue(state.checkAssignmentMatchedSubscription(singleton(tp0)));
+        state.assignFromSubscribed(singleton(tp0));
+
         state.seek(tp0, 1);
         assertEquals(1L, state.position(tp0).offset);
-        assertTrue(state.assignFromSubscribed(singleton(tp1)));
+        assertTrue(state.checkAssignmentMatchedSubscription(singleton(tp1)));
+        state.assignFromSubscribed(singleton(tp1));
+
         assertTrue(state.isAssigned(tp1));
         assertFalse(state.isAssigned(tp0));
         assertFalse(state.isFetchable(tp1));
@@ -217,21 +229,23 @@ public class SubscriptionStateTest {
     @Test(expected = IllegalStateException.class)
     public void invalidPositionUpdate() {
         state.subscribe(singleton(topic), rebalanceListener);
-        assertTrue(state.assignFromSubscribed(singleton(tp0)));
+        assertTrue(state.checkAssignmentMatchedSubscription(singleton(tp0)));
+        state.assignFromSubscribed(singleton(tp0));
+
         state.position(tp0, new SubscriptionState.FetchPosition(0, Optional.empty(), leaderAndEpoch));
     }
 
     @Test
     public void cantAssignPartitionForUnsubscribedTopics() {
         state.subscribe(singleton(topic), rebalanceListener);
-        assertFalse(state.assignFromSubscribed(Collections.singletonList(t1p0)));
+        assertFalse(state.checkAssignmentMatchedSubscription(Collections.singletonList(t1p0)));
     }
 
     @Test
     public void cantAssignPartitionForUnmatchedPattern() {
         state.subscribe(Pattern.compile(".*t"), rebalanceListener);
         state.subscribeFromPattern(new HashSet<>(Collections.singletonList(topic)));
-        assertFalse(state.assignFromSubscribed(Collections.singletonList(t1p0)));
+        assertFalse(state.checkAssignmentMatchedSubscription(Collections.singletonList(t1p0)));
     }
 
     @Test(expected = IllegalStateException.class)
@@ -291,7 +305,9 @@ public class SubscriptionStateTest {
     public void unsubscription() {
         state.subscribe(Pattern.compile(".*"), rebalanceListener);
         state.subscribeFromPattern(new HashSet<>(Arrays.asList(topic, topic1)));
-        assertTrue(state.assignFromSubscribed(singleton(tp1)));
+        assertTrue(state.checkAssignmentMatchedSubscription(singleton(tp1)));
+        state.assignFromSubscribed(singleton(tp1));
+
         assertEquals(singleton(tp1), state.assignedPartitions());
         assertEquals(1, state.numAssignedPartitions());
 
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index b6cd60b..8a603c3 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -354,6 +354,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
                                           customRebalanceListener: Option[ConsumerRebalanceListener])
     extends ConsumerRebalanceListener {
 
+    override def onPartitionsLost(partitions: util.Collection[TopicPartition]) {}
+
     override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]) {
       producer.flush()
       commitOffsets(consumerWrapper)
diff --git a/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala b/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
index 210ceb2..45743b2 100644
--- a/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
@@ -61,6 +61,8 @@ abstract class AbstractConsumerTest extends BaseRequestTest {
   this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
   this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
   this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100")
+  this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "6000")
+
 
   override protected def brokerPropertyOverrides(properties: Properties): Unit = {
     properties.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
@@ -332,28 +334,28 @@ abstract class AbstractConsumerTest extends BaseRequestTest {
     @volatile var thrownException: Option[Throwable] = None
     @volatile var receivedMessages = 0
 
-    @volatile private var partitionAssignment: Set[TopicPartition] = partitionsToAssign
+    @volatile private var partitionAssignment: mutable.Set[TopicPartition] = new mutable.HashSet[TopicPartition]()
     @volatile private var subscriptionChanged = false
     private var topicsSubscription = topicsToSubscribe
 
     val rebalanceListener: ConsumerRebalanceListener = new ConsumerRebalanceListener {
       override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]) = {
-        partitionAssignment = collection.immutable.Set(consumer.assignment().asScala.toArray: _*)
+        partitionAssignment ++= partitions.toArray(new Array[TopicPartition](0))
       }
 
       override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]) = {
-        partitionAssignment = Set.empty[TopicPartition]
+        partitionAssignment --= partitions.toArray(new Array[TopicPartition](0))
       }
     }
 
-    if (partitionAssignment.isEmpty) {
+    if (partitionsToAssign.isEmpty) {
       consumer.subscribe(topicsToSubscribe.asJava, rebalanceListener)
     } else {
-      consumer.assign(partitionAssignment.asJava)
+      consumer.assign(partitionsToAssign.asJava)
     }
 
     def consumerAssignment(): Set[TopicPartition] = {
-      partitionAssignment
+      partitionAssignment.toSet
     }
 
     /**
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 778e3b8..634bcda 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -246,7 +246,9 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
     killBroker(findCoordinator(manualGroup))
 
     val future1 = submitCloseAndValidate(consumer1, Long.MaxValue, None, gracefulCloseTimeMs)
+
     val future2 = submitCloseAndValidate(consumer2, Long.MaxValue, None, gracefulCloseTimeMs)
+
     future1.get
     future2.get
 
@@ -286,7 +288,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
 
     servers.foreach(server => killBroker(server.config.brokerId))
     val closeTimeout = 2000
-    val future1 = submitCloseAndValidate(consumer1, closeTimeout, Some(closeTimeout), Some(closeTimeout))
+    val future1 = submitCloseAndValidate(consumer1, closeTimeout, None, Some(closeTimeout))
     val future2 = submitCloseAndValidate(consumer2, Long.MaxValue, Some(requestTimeout), Some(requestTimeout))
     future1.get
     future2.get
@@ -384,13 +386,8 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
 
     def subscribeAndPoll(consumer: KafkaConsumer[Array[Byte], Array[Byte]], revokeSemaphore: Option[Semaphore] = None): Future[Any] = {
       executor.submit(CoreUtils.runnable {
-          consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener {
-            def onPartitionsAssigned(partitions: Collection[TopicPartition]) {
-            }
-            def onPartitionsRevoked(partitions: Collection[TopicPartition]) {
-              revokeSemaphore.foreach(s => s.release())
-            }
-          })
+        consumer.subscribe(Collections.singletonList(topic))
+        revokeSemaphore.foreach(s => s.release())
         // requires to used deprecated `poll(long)` to trigger metadata update
           consumer.poll(0L)
         }, 0)
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 33c14eb..30b1d13 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -185,14 +185,14 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     // rebalance to get the initial assignment
     awaitRebalance(consumer, listener)
     assertEquals(1, listener.callsToAssigned)
-    assertEquals(1, listener.callsToRevoked)
+    assertEquals(0, listener.callsToRevoked)
 
     Thread.sleep(3500)
 
     // we should fall out of the group and need to rebalance
     awaitRebalance(consumer, listener)
     assertEquals(2, listener.callsToAssigned)
-    assertEquals(2, listener.callsToRevoked)
+    assertEquals(1, listener.callsToRevoked)
   }
 
   @Test
@@ -207,8 +207,9 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     var committedPosition: Long = -1
 
     val listener = new TestConsumerReassignmentListener {
+      override def onPartitionsLost(partitions: util.Collection[TopicPartition]): Unit = {}
       override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = {
-        if (callsToRevoked > 0) {
+        if (!partitions.isEmpty && partitions.contains(tp)) {
           // on the second rebalance (after we have joined the group initially), sleep longer
           // than session timeout and then try a commit. We should still be in the group,
           // so the commit should succeed
@@ -1641,6 +1642,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     // stop polling and close one of the consumers, should trigger partition re-assignment among alive consumers
     timeoutPoller.shutdown()
+    consumerPollers -= timeoutPoller
     if (closeConsumer)
       timeoutConsumer.close()
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index bbf9f9c..412d5e4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -190,8 +190,11 @@ public class KafkaStreams implements AutoCloseable {
      * - Of special importance: If the global stream thread dies, or all stream threads die (or both) then
      *   the instance will be in the ERROR state. The user will need to close it.
      */
+    // TODO: the current transitions from other states directly to RUNNING is due to
+    //       the fact that onPartitionsRevoked may not be triggered. we need to refactor the
+    //       state diagram more thoroughly after we refactor StreamsPartitionAssignor to support COOPERATIVE
     public enum State {
-        CREATED(1, 3), REBALANCING(2, 3, 5), RUNNING(1, 3, 5), PENDING_SHUTDOWN(4), NOT_RUNNING, ERROR(3);
+        CREATED(1, 2, 3), REBALANCING(2, 3, 5), RUNNING(1, 2, 3, 5), PENDING_SHUTDOWN(4), NOT_RUNNING, ERROR(3);
 
         private final Set<Integer> validTransitions = new HashSet<>();
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index d3efa9e..cd5fc83 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -127,7 +127,10 @@ public class StreamThread extends Thread {
      * </ul>
      */
     public enum State implements ThreadStateTransitionValidator {
-        CREATED(1, 5), STARTING(2, 5), PARTITIONS_REVOKED(3, 5), PARTITIONS_ASSIGNED(2, 4, 5), RUNNING(2, 5), PENDING_SHUTDOWN(6), DEAD;
+        // TODO: the current transitions from other states directly to PARTITIONS_REVOKED is due to
+        //       the fact that onPartitionsRevoked may not be triggered. we need to refactor the
+        //       state diagram more thoroughly after we refactor StreamsPartitionAssignor to support COOPERATIVE
+        CREATED(1, 5), STARTING(2, 3, 5), PARTITIONS_REVOKED(3, 5), PARTITIONS_ASSIGNED(2, 3, 4, 5), RUNNING(2, 3, 5), PENDING_SHUTDOWN(6), DEAD;
 
         private final Set<Integer> validTransitions = new HashSet<>();
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
index 772c91d..4ddab57 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
@@ -79,11 +79,10 @@ public class StreamTableJoinIntegrationTest extends AbstractJoinIntegrationTest
         streams.setStreamThreadStateListener(listener);
         streams.start();
 
-        TestUtils.waitForCondition(listener::revokedToPendingShutdownSeen, "Did not seen thread state transited to PENDING_SHUTDOWN");
+        TestUtils.waitForCondition(listener::transitToPendingShutdownSeen, "Did not seen thread state transited to PENDING_SHUTDOWN");
 
         streams.close();
-        assertTrue(listener.createdToRevokedSeen());
-        assertTrue(listener.revokedToPendingShutdownSeen());
+        assertTrue(listener.transitToPendingShutdownSeen());
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 46515aa..a0a61b9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -79,25 +79,18 @@ public class IntegrationTestUtils {
      * Records state transition for StreamThread
      */
     public static class StateListenerStub implements StreamThread.StateListener {
-        boolean startingToRevokedSeen = false;
-        boolean revokedToPendingShutdownSeen = false;
+        boolean toPendingShutdownSeen = false;
         @Override
         public void onChange(final Thread thread,
                              final ThreadStateTransitionValidator newState,
                              final ThreadStateTransitionValidator oldState) {
-            if (oldState == StreamThread.State.STARTING && newState == StreamThread.State.PARTITIONS_REVOKED) {
-                startingToRevokedSeen = true;
-            } else if (oldState == StreamThread.State.PARTITIONS_REVOKED && newState == StreamThread.State.PENDING_SHUTDOWN) {
-                revokedToPendingShutdownSeen = true;
+            if (newState == StreamThread.State.PENDING_SHUTDOWN) {
+                toPendingShutdownSeen = true;
             }
         }
 
-        public boolean revokedToPendingShutdownSeen() {
-            return revokedToPendingShutdownSeen;
-        }
-
-        public boolean createdToRevokedSeen() {
-            return startingToRevokedSeen;
+        public boolean transitToPendingShutdownSeen() {
+            return toPendingShutdownSeen;
         }
     }
 


Mime
View raw message