kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-4631; Request metadata in consumer if topic/partitions unavailable
Date Thu, 16 Mar 2017 22:02:11 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 928789bf7 -> 94938f373


KAFKA-4631; Request metadata in consumer if topic/partitions unavailable

If leader node of one more more partitions in a consumer subscription are temporarily unavailable, request metadata refresh so that partitions skipped for assignment dont have to wait for metadata expiry before reassignment. Metadata refresh is also requested if a subscribe topic or assigned partition doesn't exist.

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Vahid Hashemian <vahidhashemian@us.ibm.com>, Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>

Closes #2622 from rajinisivaram/KAFKA-4631


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/94938f37
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/94938f37
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/94938f37

Branch: refs/heads/0.10.2
Commit: 94938f3734c71c69fe14612052ca16ecc0205d4d
Parents: 928789b
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Authored: Thu Mar 2 17:49:01 2017 -0800
Committer: Jason Gustafson <jason@confluent.io>
Committed: Thu Mar 16 14:45:24 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/kafka/clients/Metadata.java | 18 +++-
 .../org/apache/kafka/clients/NetworkClient.java |  2 +-
 .../kafka/clients/consumer/KafkaConsumer.java   |  2 +-
 .../consumer/internals/ConsumerCoordinator.java |  6 +-
 .../kafka/clients/producer/KafkaProducer.java   |  2 +-
 .../kafka/common/requests/MetadataResponse.java | 24 +++++
 .../org/apache/kafka/clients/MetadataTest.java  | 66 +++++++-------
 .../org/apache/kafka/clients/MockClient.java    | 33 ++++---
 .../apache/kafka/clients/NetworkClientTest.java |  2 +-
 .../clients/consumer/KafkaConsumerTest.java     | 38 ++++----
 .../internals/AbstractCoordinatorTest.java      |  2 +-
 .../internals/ConsumerCoordinatorTest.java      | 94 ++++++++++++++++----
 .../clients/consumer/internals/FetcherTest.java |  4 +-
 .../clients/producer/internals/SenderTest.java  | 14 +--
 .../runtime/distributed/WorkerGroupMember.java  |  3 +-
 .../distributed/WorkerCoordinatorTest.java      |  2 +-
 .../main/scala/kafka/admin/AdminClient.scala    |  2 +-
 .../kafka/api/ConsumerBounceTest.scala          | 59 ++++++++++--
 .../processor/internals/StreamsKafkaClient.java |  4 +-
 19 files changed, 270 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/clients/src/main/java/org/apache/kafka/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 75d48ab..428684d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -195,8 +195,13 @@ public final class Metadata {
     /**
      * Updates the cluster metadata. If topic expiry is enabled, expiry time
      * is set for topics if required and expired topics are removed from the metadata.
+     *
+     * @param cluster the cluster containing metadata for topics with valid metadata
+     * @param unavailableTopics topics which are non-existent or have one or more partitions whose
+     *        leader is not known
+     * @param now current time in milliseconds
      */
-    public synchronized void update(Cluster cluster, long now) {
+    public synchronized void update(Cluster cluster, Set<String> unavailableTopics, long now) {
         Objects.requireNonNull(cluster, "cluster should not be null");
 
         this.needUpdate = false;
@@ -219,7 +224,7 @@ public final class Metadata {
         }
 
         for (Listener listener: listeners)
-            listener.onMetadataUpdate(cluster);
+            listener.onMetadataUpdate(cluster, unavailableTopics);
 
         String previousClusterId = cluster.clusterResource().clusterId();
 
@@ -302,7 +307,14 @@ public final class Metadata {
      * MetadataUpdate Listener
      */
     public interface Listener {
-        void onMetadataUpdate(Cluster cluster);
+        /**
+         * Callback invoked on metadata update.
+         *
+         * @param cluster the cluster containing metadata for topics with valid metadata
+         * @param unavailableTopics topics which are non-existent or have one or more partitions whose
+         *        leader is not known
+         */
+        void onMetadataUpdate(Cluster cluster, Set<String> unavailableTopics);
     }
 
     private synchronized void requestUpdateForNewTopics() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 3a75288..1e8dbd3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -709,7 +709,7 @@ public class NetworkClient implements KafkaClient {
             // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
             // created which means we will get errors and no nodes until it exists
             if (cluster.nodes().size() > 0) {
-                this.metadata.update(cluster, now);
+                this.metadata.update(cluster, response.unavailableTopics(), now);
             } else {
                 log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId());
                 this.metadata.failedUpdate(now);

http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index eac9579..64d64ba 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -652,7 +652,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keyDeserializer, valueDeserializer, reporters, interceptorList);
             this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), false, clusterResourceListeners);
             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
-            this.metadata.update(Cluster.bootstrap(addresses), 0);
+            this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0);
             String metricGrpPrefix = "consumer";
             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
             NetworkClient netClient = new NetworkClient(

http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 8669527..2e37636 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -49,6 +49,7 @@ import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -168,7 +169,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     private void addMetadataListener() {
         this.metadata.addListener(new Metadata.Listener() {
             @Override
-            public void onMetadataUpdate(Cluster cluster) {
+            public void onMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
                 // if we encounter any unauthorized topics, raise an exception to the user
                 if (!cluster.unauthorizedTopics().isEmpty())
                     throw new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics()));
@@ -182,6 +183,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                     if (!snapshot.equals(metadataSnapshot))
                         metadataSnapshot = snapshot;
                 }
+
+                if (!Collections.disjoint(metadata.topics(), unavailableTopics))
+                    metadata.requestUpdate();
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 29defbb..6f1a3b6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -301,7 +301,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     time);
 
             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
-            this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
+            this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
             NetworkClient client = new NetworkClient(
                     new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),

http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index a8baee5..7209db5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -15,6 +15,7 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidMetadataException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
@@ -262,6 +263,29 @@ public class MetadataResponse extends AbstractResponse {
     }
 
     /**
+     * Returns the set of topics with an error indicating invalid metadata
+     * and topics with any partition whose error indicates invalid metadata.
+     * This includes all non-existent topics specified in the metadata request
+     * and any topic returned with one or more partitions whose leader is not known.
+     */
+    public Set<String> unavailableTopics() {
+        Set<String> invalidMetadataTopics = new HashSet<>();
+        for (TopicMetadata topicMetadata : this.topicMetadata) {
+            if (topicMetadata.error.exception() instanceof InvalidMetadataException)
+                invalidMetadataTopics.add(topicMetadata.topic);
+            else {
+                for (PartitionMetadata partitionMetadata : topicMetadata.partitionMetadata) {
+                    if (partitionMetadata.error.exception() instanceof InvalidMetadataException) {
+                        invalidMetadataTopics.add(topicMetadata.topic);
+                        break;
+                    }
+                }
+            }
+        }
+        return invalidMetadataTopics;
+    }
+
+    /**
      * Get a snapshot of the cluster metadata from this response
      * @return the cluster snapshot
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index cfd2a94..db88b77 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -52,7 +52,7 @@ public class MetadataTest {
     @Test
     public void testMetadata() throws Exception {
         long time = 0;
-        metadata.update(Cluster.empty(), time);
+        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
         assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
         metadata.requestUpdate();
         assertFalse("Still no updated needed due to backoff", metadata.timeToNextUpdate(time) == 0);
@@ -67,7 +67,7 @@ public class MetadataTest {
         // This simulates the metadata update sequence in KafkaProducer
         while (t1.isAlive() || t2.isAlive()) {
             if (metadata.timeToNextUpdate(time) == 0) {
-                metadata.update(TestUtils.singletonCluster(topic, 1), time);
+                metadata.update(TestUtils.singletonCluster(topic, 1), Collections.<String>emptySet(), time);
                 time += refreshBackoffMs;
             }
             Thread.sleep(1);
@@ -97,7 +97,7 @@ public class MetadataTest {
         assertEquals(0, metadata.timeToNextUpdate(now));
 
         // lastSuccessfulRefreshMs updated to now.
-        metadata.update(Cluster.empty(), now);
+        metadata.update(Cluster.empty(), Collections.<String>emptySet(), now);
 
         // The last update was successful so the remaining time to expire the current metadata should be returned.
         assertEquals(largerOfBackoffAndExpire, metadata.timeToNextUpdate(now));
@@ -108,7 +108,7 @@ public class MetadataTest {
         assertEquals(refreshBackoffMs, metadata.timeToNextUpdate(now));
 
         // Reset needUpdate to false.
-        metadata.update(Cluster.empty(), now);
+        metadata.update(Cluster.empty(), Collections.<String>emptySet(), now);
         assertEquals(largerOfBackoffAndExpire, metadata.timeToNextUpdate(now));
 
         // Both metadataExpireMs and refreshBackoffMs elapsed.
@@ -152,13 +152,13 @@ public class MetadataTest {
         long now = 10000;
 
         // New topic added to fetch set and update requested. It should allow immediate update.
-        metadata.update(Cluster.empty(), now);
+        metadata.update(Cluster.empty(), Collections.<String>emptySet(), now);
         metadata.add("new-topic");
         assertEquals(0, metadata.timeToNextUpdate(now));
 
         // Even though setTopics called, immediate update isn't necessary if the new topic set isn't
         // containing a new topic,
-        metadata.update(Cluster.empty(), now);
+        metadata.update(Cluster.empty(), Collections.<String>emptySet(), now);
         metadata.setTopics(metadata.topics());
         assertEquals(metadataExpireMs, metadata.timeToNextUpdate(now));
 
@@ -167,12 +167,12 @@ public class MetadataTest {
         assertEquals(0, metadata.timeToNextUpdate(now));
 
         // If metadata requested for all topics it should allow immediate update.
-        metadata.update(Cluster.empty(), now);
+        metadata.update(Cluster.empty(), Collections.<String>emptySet(), now);
         metadata.needMetadataForAllTopics(true);
         assertEquals(0, metadata.timeToNextUpdate(now));
 
         // However if metadata is already capable to serve all topics it shouldn't override backoff.
-        metadata.update(Cluster.empty(), now);
+        metadata.update(Cluster.empty(), Collections.<String>emptySet(), now);
         metadata.needMetadataForAllTopics(true);
         assertEquals(metadataExpireMs, metadata.timeToNextUpdate(now));
     }
@@ -187,7 +187,7 @@ public class MetadataTest {
     @Test
     public void testMetadataUpdateWaitTime() throws Exception {
         long time = 0;
-        metadata.update(Cluster.empty(), time);
+        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
         assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
         // first try with a max wait time of 0 and ensure that this returns back without waiting forever
         try {
@@ -209,7 +209,7 @@ public class MetadataTest {
     @Test
     public void testFailedUpdate() {
         long time = 100;
-        metadata.update(Cluster.empty(), time);
+        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
 
         assertEquals(100, metadata.timeToNextUpdate(1000));
         metadata.failedUpdate(1100);
@@ -218,14 +218,14 @@ public class MetadataTest {
         assertEquals(100, metadata.lastSuccessfulUpdate());
 
         metadata.needMetadataForAllTopics(true);
-        metadata.update(Cluster.empty(), time);
+        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
         assertEquals(100, metadata.timeToNextUpdate(1000));
     }
 
     @Test
     public void testUpdateWithNeedMetadataForAllTopics() {
         long time = 0;
-        metadata.update(Cluster.empty(), time);
+        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
         metadata.needMetadataForAllTopics(true);
 
         final List<String> expectedTopics = Collections.singletonList("topic");
@@ -237,7 +237,7 @@ public class MetadataTest {
                     new PartitionInfo("topic1", 0, null, null, null)),
                 Collections.<String>emptySet(),
                 Collections.<String>emptySet()),
-            100);
+            Collections.<String>emptySet(), 100);
 
         assertArrayEquals("Metadata got updated with wrong set of topics.",
             expectedTopics.toArray(), metadata.topics().toArray());
@@ -255,7 +255,7 @@ public class MetadataTest {
 
         String hostName = "www.example.com";
         Cluster cluster = Cluster.bootstrap(Arrays.asList(new InetSocketAddress(hostName, 9002)));
-        metadata.update(cluster, time);
+        metadata.update(cluster, Collections.<String>emptySet(), time);
         assertFalse("ClusterResourceListener should not called when metadata is updated with bootstrap Cluster",
                 MockClusterResourceListener.IS_ON_UPDATE_CALLED.get());
 
@@ -267,7 +267,7 @@ public class MetadataTest {
                                 new PartitionInfo("topic1", 0, null, null, null)),
                         Collections.<String>emptySet(),
                         Collections.<String>emptySet()),
-                100);
+                Collections.<String>emptySet(), 100);
 
         assertEquals("MockClusterResourceListener did not get cluster metadata correctly",
                 "dummy", mockClusterListener.clusterResource().clusterId());
@@ -279,10 +279,10 @@ public class MetadataTest {
     public void testListenerGetsNotifiedOfUpdate() {
         long time = 0;
         final Set<String> topics = new HashSet<>();
-        metadata.update(Cluster.empty(), time);
+        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
         metadata.addListener(new Metadata.Listener() {
             @Override
-            public void onMetadataUpdate(Cluster cluster) {
+            public void onMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
                 topics.clear();
                 topics.addAll(cluster.topics());
             }
@@ -296,7 +296,7 @@ public class MetadataTest {
                     new PartitionInfo("topic1", 0, null, null, null)),
                 Collections.<String>emptySet(),
                 Collections.<String>emptySet()),
-            100);
+            Collections.<String>emptySet(), 100);
 
         assertEquals("Listener did not update topics list correctly",
             new HashSet<>(Arrays.asList("topic", "topic1")), topics);
@@ -306,10 +306,10 @@ public class MetadataTest {
     public void testListenerCanUnregister() {
         long time = 0;
         final Set<String> topics = new HashSet<>();
-        metadata.update(Cluster.empty(), time);
+        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
         final Metadata.Listener listener = new Metadata.Listener() {
             @Override
-            public void onMetadataUpdate(Cluster cluster) {
+            public void onMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
                 topics.clear();
                 topics.addAll(cluster.topics());
             }
@@ -324,7 +324,7 @@ public class MetadataTest {
                     new PartitionInfo("topic1", 0, null, null, null)),
                 Collections.<String>emptySet(),
                 Collections.<String>emptySet()),
-            100);
+            Collections.<String>emptySet(), 100);
 
         metadata.removeListener(listener);
 
@@ -336,7 +336,7 @@ public class MetadataTest {
                     new PartitionInfo("topic3", 0, null, null, null)),
                 Collections.<String>emptySet(),
                 Collections.<String>emptySet()),
-            100);
+            Collections.<String>emptySet(), 100);
 
         assertEquals("Listener did not update topics list correctly",
             new HashSet<>(Arrays.asList("topic", "topic1")), topics);
@@ -349,17 +349,17 @@ public class MetadataTest {
         // Test that topic is expired if not used within the expiry interval
         long time = 0;
         metadata.add("topic1");
-        metadata.update(Cluster.empty(), time);
+        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
         time += Metadata.TOPIC_EXPIRY_MS;
-        metadata.update(Cluster.empty(), time);
+        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
         assertFalse("Unused topic not expired", metadata.containsTopic("topic1"));
 
         // Test that topic is not expired if used within the expiry interval
         metadata.add("topic2");
-        metadata.update(Cluster.empty(), time);
+        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
         for (int i = 0; i < 3; i++) {
             time += Metadata.TOPIC_EXPIRY_MS / 2;
-            metadata.update(Cluster.empty(), time);
+            metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
             assertTrue("Topic expired even though in use", metadata.containsTopic("topic2"));
             metadata.add("topic2");
         }
@@ -368,9 +368,9 @@ public class MetadataTest {
         HashSet<String> topics = new HashSet<>();
         topics.add("topic4");
         metadata.setTopics(topics);
-        metadata.update(Cluster.empty(), time);
+        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
         time += Metadata.TOPIC_EXPIRY_MS;
-        metadata.update(Cluster.empty(), time);
+        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
         assertFalse("Unused topic not expired", metadata.containsTopic("topic4"));
     }
 
@@ -381,17 +381,17 @@ public class MetadataTest {
         // Test that topic is not expired if not used within the expiry interval
         long time = 0;
         metadata.add("topic1");
-        metadata.update(Cluster.empty(), time);
+        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
         time += Metadata.TOPIC_EXPIRY_MS;
-        metadata.update(Cluster.empty(), time);
+        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
         assertTrue("Unused topic expired when expiry disabled", metadata.containsTopic("topic1"));
 
         // Test that topic is not expired if used within the expiry interval
         metadata.add("topic2");
-        metadata.update(Cluster.empty(), time);
+        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
         for (int i = 0; i < 3; i++) {
             time += Metadata.TOPIC_EXPIRY_MS / 2;
-            metadata.update(Cluster.empty(), time);
+            metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
             assertTrue("Topic expired even though in use", metadata.containsTopic("topic2"));
             metadata.add("topic2");
         }
@@ -401,7 +401,7 @@ public class MetadataTest {
         topics.add("topic4");
         metadata.setTopics(topics);
         time += metadataExpireMs * 2;
-        metadata.update(Cluster.empty(), time);
+        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
         assertTrue("Unused topic expired when expiry disabled", metadata.containsTopic("topic4"));
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 50ed131..df29f31 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -26,6 +26,7 @@ import org.apache.kafka.test.TestUtils;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -63,6 +64,7 @@ public class MockClient implements KafkaClient {
 
     private final Time time;
     private final Metadata metadata;
+    private Set<String> unavailableTopics;
     private int correlation = 0;
     private Node node = null;
     private final Set<String> ready = new HashSet<>();
@@ -72,17 +74,17 @@ public class MockClient implements KafkaClient {
     // Use concurrent queue for responses so that responses may be updated during poll() from a different thread.
     private final Queue<ClientResponse> responses = new ConcurrentLinkedDeque<>();
     private final Queue<FutureResponse> futureResponses = new ArrayDeque<>();
-    private final Queue<Cluster> metadataUpdates = new ArrayDeque<>();
+    private final Queue<MetadataUpdate> metadataUpdates = new ArrayDeque<>();
     private volatile NodeApiVersions nodeApiVersions = NodeApiVersions.create();
 
     public MockClient(Time time) {
-        this.time = time;
-        this.metadata = null;
+        this(time, null);
     }
 
     public MockClient(Time time, Metadata metadata) {
         this.time = time;
         this.metadata = metadata;
+        this.unavailableTopics = Collections.emptySet();
     }
 
     @Override
@@ -167,11 +169,13 @@ public class MockClient implements KafkaClient {
         List<ClientResponse> copy = new ArrayList<>(this.responses);
 
         if (metadata != null && metadata.updateRequested()) {
-            Cluster cluster = metadataUpdates.poll();
-            if (cluster == null)
-                metadata.update(metadata.fetch(), time.milliseconds());
-            else
-                metadata.update(cluster, time.milliseconds());
+            MetadataUpdate metadataUpdate = metadataUpdates.poll();
+            if (metadataUpdate == null)
+                metadata.update(metadata.fetch(), this.unavailableTopics, time.milliseconds());
+            else {
+                this.unavailableTopics = metadataUpdate.unavailableTopics;
+                metadata.update(metadataUpdate.cluster, metadataUpdate.unavailableTopics, time.milliseconds());
+            }
         }
 
         while (!this.responses.isEmpty()) {
@@ -277,8 +281,8 @@ public class MockClient implements KafkaClient {
         metadataUpdates.clear();
     }
 
-    public void prepareMetadataUpdate(Cluster cluster) {
-        metadataUpdates.add(cluster);
+    public void prepareMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
+        metadataUpdates.add(new MetadataUpdate(cluster, unavailableTopics));
     }
 
     public void setNode(Node node) {
@@ -339,4 +343,13 @@ public class MockClient implements KafkaClient {
     public void setNodeApiVersions(NodeApiVersions nodeApiVersions) {
         this.nodeApiVersions = nodeApiVersions;
     }
+
+    private static class MetadataUpdate {
+        final Cluster cluster;
+        final Set<String> unavailableTopics;
+        MetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
+            this.cluster = cluster;
+            this.unavailableTopics = unavailableTopics;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index deaf2cc..a95af83 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -78,7 +78,7 @@ public class NetworkClientTest {
 
     @Before
     public void setup() {
-        metadata.update(cluster, time.milliseconds());
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
     }
 
     @Test(expected = IllegalStateException.class)

http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 4aaa172..f2905a9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -364,7 +364,7 @@ public class KafkaConsumerTest {
         Node node = cluster.nodes().get(0);
 
         Metadata metadata = new Metadata(0, Long.MAX_VALUE);
-        metadata.update(cluster, time.milliseconds());
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
         client.setNode(node);
@@ -405,7 +405,7 @@ public class KafkaConsumerTest {
         Node node = cluster.nodes().get(0);
 
         Metadata metadata = new Metadata(0, Long.MAX_VALUE);
-        metadata.update(cluster, time.milliseconds());
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
         client.setNode(node);
@@ -446,7 +446,7 @@ public class KafkaConsumerTest {
         Node node = cluster.nodes().get(0);
 
         Metadata metadata = new Metadata(0, Long.MAX_VALUE);
-        metadata.update(cluster, time.milliseconds());
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
         client.setNode(node);
@@ -482,7 +482,7 @@ public class KafkaConsumerTest {
         Node node = cluster.nodes().get(0);
 
         Metadata metadata = new Metadata(0, Long.MAX_VALUE);
-        metadata.update(cluster, time.milliseconds());
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
         client.setNode(node);
@@ -530,7 +530,7 @@ public class KafkaConsumerTest {
         Node node = cluster.nodes().get(0);
 
         Metadata metadata = new Metadata(0, Long.MAX_VALUE);
-        metadata.update(cluster, time.milliseconds());
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
         client.setNode(node);
@@ -591,7 +591,7 @@ public class KafkaConsumerTest {
 
         consumer.subscribe(Pattern.compile(topic), getConsumerRebalanceListener(consumer));
 
-        client.prepareMetadataUpdate(cluster);
+        client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
 
         consumer.poll(0);
         assertEquals(singleton(topic), consumer.subscription());
@@ -622,7 +622,7 @@ public class KafkaConsumerTest {
         MockClient client = new MockClient(time, metadata);
         client.setNode(node);
 
-        metadata.update(cluster, time.milliseconds());
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
                 rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, false, autoCommitIntervalMs);
@@ -630,14 +630,14 @@ public class KafkaConsumerTest {
         Node coordinator = prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null);
         consumer.subscribe(Pattern.compile(topic), getConsumerRebalanceListener(consumer));
 
-        client.prepareMetadataUpdate(cluster);
+        client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
 
         consumer.poll(0);
         assertEquals(singleton(topic), consumer.subscription());
 
         consumer.subscribe(Pattern.compile(otherTopic), getConsumerRebalanceListener(consumer));
 
-        client.prepareMetadataUpdate(cluster);
+        client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
 
         prepareRebalance(client, node, singleton(otherTopic), assignor, singletonList(otherTopicPartition), coordinator);
         consumer.poll(0);
@@ -660,7 +660,7 @@ public class KafkaConsumerTest {
         Node node = cluster.nodes().get(0);
 
         Metadata metadata = new Metadata(0, Long.MAX_VALUE);
-        metadata.update(cluster, time.milliseconds());
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
         client.setNode(node);
@@ -705,7 +705,7 @@ public class KafkaConsumerTest {
         final Node node = cluster.nodes().get(0);
 
         Metadata metadata = new Metadata(0, Long.MAX_VALUE);
-        metadata.update(cluster, time.milliseconds());
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         final MockClient client = new MockClient(time, metadata);
         client.setNode(node);
@@ -745,7 +745,7 @@ public class KafkaConsumerTest {
         Node node = cluster.nodes().get(0);
 
         Metadata metadata = new Metadata(0, Long.MAX_VALUE);
-        metadata.update(cluster, time.milliseconds());
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
         client.setNode(node);
@@ -793,7 +793,7 @@ public class KafkaConsumerTest {
         Node node = cluster.nodes().get(0);
 
         Metadata metadata = new Metadata(0, Long.MAX_VALUE);
-        metadata.update(cluster, time.milliseconds());
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
         client.setNode(node);
@@ -913,7 +913,7 @@ public class KafkaConsumerTest {
         Node node = cluster.nodes().get(0);
 
         Metadata metadata = new Metadata(0, Long.MAX_VALUE);
-        metadata.update(cluster, time.milliseconds());
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
         client.setNode(node);
@@ -981,7 +981,7 @@ public class KafkaConsumerTest {
         Node node = cluster.nodes().get(0);
 
         Metadata metadata = new Metadata(0, Long.MAX_VALUE);
-        metadata.update(cluster, time.milliseconds());
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
         client.setNode(node);
@@ -1046,7 +1046,7 @@ public class KafkaConsumerTest {
         Node node = cluster.nodes().get(0);
 
         Metadata metadata = new Metadata(0, Long.MAX_VALUE);
-        metadata.update(cluster, time.milliseconds());
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
         client.setNode(node);
@@ -1107,7 +1107,7 @@ public class KafkaConsumerTest {
         Node node = cluster.nodes().get(0);
 
         Metadata metadata = new Metadata(0, Long.MAX_VALUE);
-        metadata.update(cluster, time.milliseconds());
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
         client.setNode(node);
@@ -1226,7 +1226,7 @@ public class KafkaConsumerTest {
         Node node = cluster.nodes().get(0);
 
         Metadata metadata = new Metadata(0, Long.MAX_VALUE);
-        metadata.update(cluster, time.milliseconds());
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
         client.setNode(node);
@@ -1238,7 +1238,7 @@ public class KafkaConsumerTest {
         consumer.subscribe(Arrays.asList(topic), getConsumerRebalanceListener(consumer));
         Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0), null);
 
-        client.prepareMetadataUpdate(cluster);
+        client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
 
         // Poll with responses
         client.prepareResponseFrom(fetchResponse(tp0, 0, 1), node);

http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index bb617ae..8846b5e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -79,7 +79,7 @@ public class AbstractCoordinatorTest {
         Metrics metrics = new Metrics();
 
         Cluster cluster = TestUtils.singletonCluster("topic", 1);
-        metadata.update(cluster, mockTime.milliseconds());
+        metadata.update(cluster, Collections.<String>emptySet(), mockTime.milliseconds());
         this.node = cluster.nodes().get(0);
         mockClient.setNode(node);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index e11bf30..66fe76d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -120,7 +120,7 @@ public class ConsumerCoordinatorTest {
         this.time = new MockTime();
         this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
         this.metadata = new Metadata(0, Long.MAX_VALUE);
-        this.metadata.update(cluster, time.milliseconds());
+        this.metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
         this.client = new MockClient(time, metadata);
         this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);
         this.metrics = new Metrics(time);
@@ -292,7 +292,7 @@ public class ConsumerCoordinatorTest {
 
         // ensure metadata is up-to-date for leader
         metadata.setTopics(singletonList(topic1));
-        metadata.update(cluster, time.milliseconds());
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
@@ -310,7 +310,7 @@ public class ConsumerCoordinatorTest {
 
         // ensure metadata is up-to-date for leader
         metadata.setTopics(singletonList(topic1));
-        metadata.update(cluster, time.milliseconds());
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
@@ -349,7 +349,7 @@ public class ConsumerCoordinatorTest {
         // partially update the metadata with one topic first,
         // let the leader to refresh metadata during assignment
         metadata.setTopics(singletonList(topic1));
-        metadata.update(TestUtils.singletonCluster(topic1, 1), time.milliseconds());
+        metadata.update(TestUtils.singletonCluster(topic1, 1), Collections.<String>emptySet(), time.milliseconds());
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
@@ -369,7 +369,7 @@ public class ConsumerCoordinatorTest {
             }
         }, syncGroupResponse(Arrays.asList(t1p, t2p), Errors.NONE.code()));
         // expect client to force updating the metadata, if yes gives it both topics
-        client.prepareMetadataUpdate(cluster);
+        client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
 
         coordinator.poll(time.milliseconds());
 
@@ -389,7 +389,7 @@ public class ConsumerCoordinatorTest {
 
         subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
         metadata.needMetadataForAllTopics(true);
-        metadata.update(TestUtils.singletonCluster(topic1, 1), time.milliseconds());
+        metadata.update(TestUtils.singletonCluster(topic1, 1), Collections.<String>emptySet(), time.milliseconds());
 
         assertEquals(singleton(topic1), subscriptions.subscription());
 
@@ -410,7 +410,7 @@ public class ConsumerCoordinatorTest {
                 final Map<String, Integer> updatedPartitions = new HashMap<>();
                 for (String topic : updatedSubscription)
                     updatedPartitions.put(topic, 1);
-                metadata.update(TestUtils.clusterWith(1, updatedPartitions), time.milliseconds());
+                metadata.update(TestUtils.clusterWith(1, updatedPartitions), Collections.<String>emptySet(), time.milliseconds());
                 return true;
             }
         }, syncGroupResponse(singletonList(t1p), Errors.NONE.code()));
@@ -453,7 +453,7 @@ public class ConsumerCoordinatorTest {
 
         // ensure metadata is up-to-date for leader
         metadata.setTopics(singletonList(topic1));
-        metadata.update(cluster, time.milliseconds());
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
@@ -524,7 +524,7 @@ public class ConsumerCoordinatorTest {
         // partially update the metadata with one topic first,
         // let the leader to refresh metadata during assignment
         metadata.setTopics(singletonList(topic1));
-        metadata.update(TestUtils.singletonCluster(topic1, 1), time.milliseconds());
+        metadata.update(TestUtils.singletonCluster(topic1, 1), Collections.<String>emptySet(), time.milliseconds());
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
@@ -541,7 +541,7 @@ public class ConsumerCoordinatorTest {
             }
         }, syncGroupResponse(Arrays.asList(t1p, t2p), Errors.NONE.code()));
         // expect client to force updating the metadata, if yes gives it both topics
-        client.prepareMetadataUpdate(cluster);
+        client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
 
         coordinator.joinGroupIfNeeded();
 
@@ -712,7 +712,7 @@ public class ConsumerCoordinatorTest {
 
         // ensure metadata is up-to-date for leader
         metadata.setTopics(singletonList(topic1));
-        metadata.update(cluster, time.milliseconds());
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
@@ -731,7 +731,7 @@ public class ConsumerCoordinatorTest {
         assertFalse(coordinator.needRejoin());
 
         // a new partition is added to the topic
-        metadata.update(TestUtils.singletonCluster(topic1, 2), time.milliseconds());
+        metadata.update(TestUtils.singletonCluster(topic1, 2), Collections.<String>emptySet(), time.milliseconds());
 
         // we should detect the change and ask for reassignment
         assertTrue(coordinator.needRejoin());
@@ -751,7 +751,7 @@ public class ConsumerCoordinatorTest {
         metadata.setTopics(topics);
 
         // we only have metadata for one topic initially
-        metadata.update(TestUtils.singletonCluster(topic1, 1), time.milliseconds());
+        metadata.update(TestUtils.singletonCluster(topic1, 1), Collections.<String>emptySet(), time.milliseconds());
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
@@ -772,7 +772,7 @@ public class ConsumerCoordinatorTest {
                     Map<String, Integer> topicPartitionCounts = new HashMap<>();
                     topicPartitionCounts.put(topic1, 1);
                     topicPartitionCounts.put(topic2, 1);
-                    metadata.update(TestUtils.singletonCluster(topicPartitionCounts), time.milliseconds());
+                    metadata.update(TestUtils.singletonCluster(topicPartitionCounts), Collections.<String>emptySet(), time.milliseconds());
                     return true;
                 }
                 return false;
@@ -789,12 +789,72 @@ public class ConsumerCoordinatorTest {
         assertEquals(new HashSet<>(Arrays.asList(tp1, tp2)), subscriptions.assignedPartitions());
     }
 
+    @Test
+    public void testRebalanceAfterTopicUnavailableWithSubscribe() {
+        unavailableTopicTest(false, false, Collections.<String>emptySet());
+    }
+
+    @Test
+    public void testRebalanceAfterTopicUnavailableWithPatternSubscribe() {
+        unavailableTopicTest(true, false, Collections.<String>emptySet());
+    }
+
+    @Test
+    public void testRebalanceAfterNotMatchingTopicUnavailableWithPatternSSubscribe() {
+        unavailableTopicTest(true, false, Collections.<String>singleton("notmatching"));
+    }
+
+    @Test
+    public void testAssignWithTopicUnavailable() {
+        unavailableTopicTest(true, false, Collections.<String>emptySet());
+    }
+
+    private void unavailableTopicTest(boolean patternSubscribe, boolean assign, Set<String> unavailableTopicsInLastMetadata) {
+        final String consumerId = "consumer";
+
+        metadata.setTopics(singletonList(topic1));
+        client.prepareMetadataUpdate(Cluster.empty(), Collections.singleton("test1"));
+
+        if (assign)
+            subscriptions.assignFromUser(singleton(t1p));
+        else if (patternSubscribe)
+            subscriptions.subscribe(Pattern.compile("test.*"), rebalanceListener);
+        else
+            subscriptions.subscribe(singleton(topic1), rebalanceListener);
+
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorReady();
+
+        Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, singletonList(topic1));
+        partitionAssignor.prepare(Collections.<String, List<TopicPartition>>emptyMap());
+
+        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.NONE.code()));
+        coordinator.poll(time.milliseconds());
+        if (!assign) {
+            assertFalse(coordinator.needRejoin());
+            assertEquals(Collections.<TopicPartition>emptySet(), rebalanceListener.assigned);
+        }
+        assertTrue("Metadata refresh not requested for unavailable partitions", metadata.updateRequested());
+
+        client.prepareMetadataUpdate(cluster, unavailableTopicsInLastMetadata);
+        client.poll(0, time.milliseconds());
+        client.prepareResponse(joinGroupLeaderResponse(2, consumerId, memberSubscriptions, Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE.code()));
+        coordinator.poll(time.milliseconds());
+
+        assertFalse("Metadata refresh requested unnecessarily", metadata.updateRequested());
+        if (!assign) {
+            assertFalse(coordinator.needRejoin());
+            assertEquals(singleton(t1p), rebalanceListener.assigned);
+        }
+    }
 
     @Test
     public void testExcludeInternalTopicsConfigOption() {
         subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
 
-        metadata.update(TestUtils.singletonCluster(TestUtils.GROUP_METADATA_TOPIC_NAME, 2), time.milliseconds());
+        metadata.update(TestUtils.singletonCluster(TestUtils.GROUP_METADATA_TOPIC_NAME, 2), Collections.<String>emptySet(), time.milliseconds());
 
         assertFalse(subscriptions.subscription().contains(TestUtils.GROUP_METADATA_TOPIC_NAME));
     }
@@ -804,7 +864,7 @@ public class ConsumerCoordinatorTest {
         coordinator = buildCoordinator(new Metrics(), assignors, false, false);
         subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
 
-        metadata.update(TestUtils.singletonCluster(TestUtils.GROUP_METADATA_TOPIC_NAME, 2), time.milliseconds());
+        metadata.update(TestUtils.singletonCluster(TestUtils.GROUP_METADATA_TOPIC_NAME, 2), Collections.<String>emptySet(), time.milliseconds());
 
         assertTrue(subscriptions.subscription().contains(TestUtils.GROUP_METADATA_TOPIC_NAME));
     }
@@ -1030,7 +1090,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE.code()));
 
-        client.prepareMetadataUpdate(cluster);
+        client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
 
         coordinator.joinGroupIfNeeded();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index ecaade2..4d388e6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -108,7 +108,7 @@ public class FetcherTest {
 
     @Before
     public void setup() throws Exception {
-        metadata.update(cluster, time.milliseconds());
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
         client.setNode(node);
 
         MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 1L);
@@ -837,7 +837,7 @@ public class FetcherTest {
         TopicPartition tp1 = new TopicPartition(topicName, 1);
         // Ensure metadata has both partition.
         Cluster cluster = TestUtils.clusterWith(2, topicName, 2);
-        metadata.update(cluster, time.milliseconds());
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         // First try should fail due to metadata error.
         client.prepareResponseFrom(listOffsetResponse(tp0, errorForTp0, offsetForTp0, offsetForTp0), cluster.leaderFor(tp0));

http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 7f5fe15..a3dae66 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -84,7 +84,7 @@ public class SenderTest {
                             time,
                             REQUEST_TIMEOUT);
 
-        metadata.update(cluster, time.milliseconds());
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
     }
 
     @After
@@ -197,7 +197,7 @@ public class SenderTest {
 
             // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
             Cluster cluster1 = TestUtils.clusterWith(2, "test", 2);
-            metadata.update(cluster1, time.milliseconds());
+            metadata.update(cluster1, Collections.<String>emptySet(), time.milliseconds());
 
             // Send the first message.
             TopicPartition tp2 = new TopicPartition("test", 1);
@@ -216,7 +216,7 @@ public class SenderTest {
 
             // Update metadata before sender receives response from broker 0. Now partition 2 moves to broker 0
             Cluster cluster2 = TestUtils.singletonCluster("test", 2);
-            metadata.update(cluster2, time.milliseconds());
+            metadata.update(cluster2, Collections.<String>emptySet(), time.milliseconds());
             // Sender should not send the second message to node 0.
             sender.run(time.milliseconds());
             assertEquals(1, client.inFlightRequestCount());
@@ -232,12 +232,12 @@ public class SenderTest {
     @Test
     public void testMetadataTopicExpiry() throws Exception {
         long offset = 0;
-        metadata.update(Cluster.empty(), time.milliseconds());
+        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time.milliseconds());
 
         Future<RecordMetadata> future = accumulator.append(tp, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());
         assertTrue("Topic not added to metadata", metadata.containsTopic(tp.topic()));
-        metadata.update(cluster, time.milliseconds());
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
         sender.run(time.milliseconds());  // send produce request
         client.respond(produceResponse(tp, offset++, Errors.NONE.code(), 0));
         sender.run(time.milliseconds());
@@ -247,12 +247,12 @@ public class SenderTest {
 
         assertTrue("Topic not retained in metadata list", metadata.containsTopic(tp.topic()));
         time.sleep(Metadata.TOPIC_EXPIRY_MS);
-        metadata.update(Cluster.empty(), time.milliseconds());
+        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time.milliseconds());
         assertFalse("Unused topic has not been expired", metadata.containsTopic(tp.topic()));
         future = accumulator.append(tp, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());
         assertTrue("Topic not added to metadata", metadata.containsTopic(tp.topic()));
-        metadata.update(cluster, time.milliseconds());
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
         sender.run(time.milliseconds());  // send produce request
         client.respond(produceResponse(tp, offset++, Errors.NONE.code(), 0));
         sender.run(time.milliseconds());

http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index ac13472..f6ff665 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -38,6 +38,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -88,7 +89,7 @@ public class WorkerGroupMember {
             this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
             this.metadata = new Metadata(retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG));
             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
-            this.metadata.update(Cluster.bootstrap(addresses), 0);
+            this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0);
             String metricGrpPrefix = "connect";
             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
             NetworkClient netClient = new NetworkClient(

http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index 92393a1..282e175 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -91,7 +91,7 @@ public class WorkerCoordinatorTest {
         this.time = new MockTime();
         this.client = new MockClient(time);
         this.metadata = new Metadata(0, Long.MAX_VALUE);
-        this.metadata.update(cluster, time.milliseconds());
+        this.metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
         this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);
         this.metrics = new Metrics(time);
         this.rebalanceListener = new MockRebalanceListener();

http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 680c5e1..6ccfe94 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -231,7 +231,7 @@ object AdminClient {
     val brokerUrls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
     val brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls)
     val bootstrapCluster = Cluster.bootstrap(brokerAddresses)
-    metadata.update(bootstrapCluster, 0)
+    metadata.update(bootstrapCluster, Collections.emptySet(), 0)
 
     val selector = new Selector(
       DefaultConnectionMaxIdleMs,

http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index e848e28..4ec77a1 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -43,12 +43,13 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
 
   // Time to process commit and leave group requests in tests when brokers are available
   val gracefulCloseTimeMs = 1000
-  val executor = Executors.newFixedThreadPool(2)
+  val executor = Executors.newScheduledThreadPool(2)
 
   // configure the servers and clients
   this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset
   this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
   this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "10") // set small enough session timeout
+  this.serverConfig.setProperty(KafkaConfig.AutoCreateTopicsEnableProp, "false")
   this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
   this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
   this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
@@ -161,6 +162,52 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
   }
 
   @Test
+  def testSubscribeWhenTopicUnavailable() {
+    val numRecords = 1000
+    val newtopic = "newtopic"
+
+    val consumer = this.consumers.head
+    consumer.subscribe(Collections.singleton(newtopic))
+    executor.schedule(new Runnable {
+        def run() = TestUtils.createTopic(zkUtils, newtopic, serverCount, serverCount, servers)
+      }, 2, TimeUnit.SECONDS)
+    consumer.poll(0)
+
+    def sendRecords(numRecords: Int, topic: String = this.topic) {
+      var remainingRecords = numRecords
+      val endTimeMs = System.currentTimeMillis + 20000
+      while (remainingRecords > 0 && System.currentTimeMillis < endTimeMs) {
+        val futures = (0 until remainingRecords).map { i =>
+          this.producers.head.send(new ProducerRecord(topic, part, i.toString.getBytes, i.toString.getBytes))
+        }
+        futures.map { future =>
+          try {
+            future.get
+            remainingRecords -= 1
+          } catch {
+            case _: Exception =>
+          }
+        }
+      }
+      assertEquals(0, remainingRecords)
+    }
+
+    sendRecords(numRecords, newtopic)
+    receiveRecords(consumer, numRecords, newtopic, 10000)
+
+    servers.foreach(server => killBroker(server.config.brokerId))
+    Thread.sleep(500)
+    restartDeadBrokers()
+
+    val future = executor.submit(new Runnable {
+      def run() = receiveRecords(consumer, numRecords, newtopic, 10000)
+    })
+    sendRecords(numRecords, newtopic)
+    future.get
+  }
+
+
+  @Test
   def testClose() {
     val numRecords = 10
     sendRecords(numRecords)
@@ -311,10 +358,12 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
     consumer
   }
 
-  private def receiveRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int) {
-    var received = 0
-    while (received < numRecords)
+  private def receiveRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int, topic: String = this.topic, timeoutMs: Long = 60000) {
+    var received = 0L
+    val endTimeMs = System.currentTimeMillis + timeoutMs
+    while (received < numRecords && System.currentTimeMillis < endTimeMs)
       received += consumer.poll(1000).count()
+    assertEquals(numRecords, received)
   }
 
   private def submitCloseAndValidate(consumer: KafkaConsumer[Array[Byte], Array[Byte]],
@@ -372,7 +421,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
     }
   }
 
-  private def sendRecords(numRecords: Int) {
+  private def sendRecords(numRecords: Int, topic: String = this.topic) {
     val futures = (0 until numRecords).map { i =>
       this.producers.head.send(new ProducerRecord(topic, part, i.toString.getBytes, i.toString.getBytes))
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
index 94d8854..60dd416 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
@@ -95,7 +95,7 @@ public class StreamsKafkaClient {
             streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG)
         );
         final List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(streamsConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
-        metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
+        metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
 
         final MetricConfig metricConfig = new MetricConfig().samples(streamsConfig.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
                 .timeWindow(streamsConfig.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
@@ -223,7 +223,7 @@ public class StreamsKafkaClient {
             streamsConfig.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG),
             streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG));
         final List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(streamsConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
-        metadata.update(Cluster.bootstrap(addresses), Time.SYSTEM.milliseconds());
+        metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), Time.SYSTEM.milliseconds());
 
         final List<Node> nodes = metadata.fetch().nodes();
         return ensureOneNodeIsReady(nodes);


Mime
View raw message