kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5291; AdminClient should not trigger auto creation of topics
Date Thu, 01 Jun 2017 00:00:42 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 647afeff6 -> 7311dcbc5


KAFKA-5291; AdminClient should not trigger auto creation of topics

- Added a boolean `allow_auto_topic_creation` to MetadataRequest and
bumped the protocol version to V4.

- When connecting to brokers older than 0.11.0.0, the `allow_auto_topic_creation`
field won't be considered, so we send a metadata request for all topics
to keep the behavior consistent.

- Set `allow_auto_topic_creation` to false in the new AdminClient and
StreamsKafkaClient (which exists for the purpose of creating topics
manually); set it to true everywhere else for now. Other clients will eventually
rely on client-side auto topic creation, but that’s not there yet.

- Add `allowAutoTopicCreation` field to `Metadata`, which is used by
`DefaultMetadataUpdater`. This is not strictly needed for the new
`AdminClient`, but it avoids surprises if it ever adds a topic to `Metadata`
via `setTopics` or `addTopic`.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #3098 from ijuma/kafka-5291-admin-client-no-auto-topic-creation


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7311dcbc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7311dcbc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7311dcbc

Branch: refs/heads/trunk
Commit: 7311dcbc5363d59dd0a0de86518dd12a7b259ca4
Parents: 647afef
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Thu Jun 1 01:00:11 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Thu Jun 1 01:00:11 2017 +0100

----------------------------------------------------------------------
 .../java/org/apache/kafka/clients/Metadata.java | 22 +++++-----
 .../org/apache/kafka/clients/NetworkClient.java |  3 +-
 .../apache/kafka/clients/admin/AdminClient.java |  6 ---
 .../kafka/clients/admin/KafkaAdminClient.java   | 22 ++++++++--
 .../kafka/clients/consumer/KafkaConsumer.java   |  5 ++-
 .../kafka/clients/producer/KafkaProducer.java   |  3 +-
 .../apache/kafka/common/protocol/Protocol.java  | 32 ++++++++++-----
 .../kafka/common/requests/MetadataRequest.java  | 39 +++++++++++++-----
 .../org/apache/kafka/clients/MetadataTest.java  | 12 +++---
 .../apache/kafka/clients/NetworkClientTest.java |  8 ++--
 .../clients/admin/MockKafkaAdminClientEnv.java  |  2 +-
 .../clients/consumer/KafkaConsumerTest.java     | 36 +++++++++--------
 .../internals/AbstractCoordinatorTest.java      |  2 +-
 .../internals/ConsumerCoordinatorTest.java      |  2 +-
 .../internals/ConsumerNetworkClientTest.java    |  2 +-
 .../clients/consumer/internals/FetcherTest.java | 14 +++----
 .../clients/producer/KafkaProducerTest.java     |  3 +-
 .../clients/producer/internals/SenderTest.java  |  2 +-
 .../internals/TransactionManagerTest.java       |  2 +-
 .../common/requests/RequestResponseTest.java    |  6 ++-
 .../authenticator/SaslAuthenticatorTest.java    | 11 +++--
 .../runtime/distributed/WorkerGroupMember.java  |  2 +-
 .../distributed/WorkerCoordinatorTest.java      |  2 +-
 .../main/scala/kafka/admin/AdminClient.scala    |  4 +-
 .../src/main/scala/kafka/server/KafkaApis.scala | 10 +++--
 .../kafka/api/AuthorizerIntegrationTest.scala   | 10 +++--
 .../api/KafkaAdminClientIntegrationTest.scala   | 31 +++++++++++----
 .../AbstractCreateTopicsRequestTest.scala       |  4 +-
 .../kafka/server/DeleteTopicsRequestTest.scala  |  2 +-
 .../unit/kafka/server/MetadataRequestTest.scala | 42 +++++++++++++++++---
 .../unit/kafka/server/RequestQuotaTest.scala    |  2 +-
 .../processor/internals/StreamsKafkaClient.java |  8 ++--
 32 files changed, 226 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7311dcbc/clients/src/main/java/org/apache/kafka/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 9ff629d..0963bad 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -66,17 +66,11 @@ public final class Metadata {
     private final List<Listener> listeners;
     private final ClusterResourceListeners clusterResourceListeners;
     private boolean needMetadataForAllTopics;
+    private final boolean allowAutoTopicCreation;
     private final boolean topicExpiryEnabled;
 
-    /**
-     * Create a metadata instance with reasonable defaults
-     */
-    public Metadata() {
-        this(100L, 60 * 60 * 1000L);
-    }
-
-    public Metadata(long refreshBackoffMs, long metadataExpireMs) {
-        this(refreshBackoffMs, metadataExpireMs, false, new ClusterResourceListeners());
+    public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean allowAutoTopicCreation) {
+        this(refreshBackoffMs, metadataExpireMs, allowAutoTopicCreation, false, new ClusterResourceListeners());
     }
 
     /**
@@ -84,12 +78,16 @@ public final class Metadata {
      * @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy
      *        polling
      * @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh
+     * @param allowAutoTopicCreation If this and the broker config 'auto.create.topics.enable' are true, topics that
+     *                               don't exist will be created by the broker when a metadata request is sent
      * @param topicExpiryEnabled If true, enable expiry of unused topics
      * @param clusterResourceListeners List of ClusterResourceListeners which will receive metadata updates.
      */
-    public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean topicExpiryEnabled, ClusterResourceListeners clusterResourceListeners) {
+    public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean allowAutoTopicCreation,
+                    boolean topicExpiryEnabled, ClusterResourceListeners clusterResourceListeners) {
         this.refreshBackoffMs = refreshBackoffMs;
         this.metadataExpireMs = metadataExpireMs;
+        this.allowAutoTopicCreation = allowAutoTopicCreation;
         this.topicExpiryEnabled = topicExpiryEnabled;
         this.lastRefreshMs = 0L;
         this.lastSuccessfulRefreshMs = 0L;
@@ -275,6 +273,10 @@ public final class Metadata {
         return this.lastSuccessfulRefreshMs;
     }
 
+    public boolean allowAutoTopicCreation() {
+        return allowAutoTopicCreation;
+    }
+
     /**
      * Set state to indicate if metadata for all topics in Kafka cluster is required or not.
      * @param needMetadataForAllTopics boolean indicating need for metadata of all topics in cluster.

http://git-wip-us.apache.org/repos/asf/kafka/blob/7311dcbc/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index bfd0eb5..1d4fe58 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -829,7 +829,8 @@ public class NetworkClient implements KafkaClient {
                 if (metadata.needMetadataForAllTopics())
                     metadataRequest = MetadataRequest.Builder.allTopics();
                 else
-                    metadataRequest = new MetadataRequest.Builder(new ArrayList<>(metadata.topics()));
+                    metadataRequest = new MetadataRequest.Builder(new ArrayList<>(metadata.topics()),
+                            metadata.allowAutoTopicCreation());
 
 
                 log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());

http://git-wip-us.apache.org/repos/asf/kafka/blob/7311dcbc/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
index 8ae3249..73ea754 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
@@ -167,12 +167,6 @@ public abstract class AdminClient implements AutoCloseable {
     /**
      * Describe some topics in the cluster.
      *
-     * Note that if auto.create.topics.enable is true on the brokers,
-     * describeTopics(topicName, ...) may create a topic named topicName.
-     * There are two workarounds: either use AdminClient#listTopics and ensure
-     * that the topic is present before describing, or disable
-     * auto.create.topics.enable.
-     *
      * @param topicNames        The names of the topics to describe.
      * @param options           The options to use when describing the topic.
      *

http://git-wip-us.apache.org/repos/asf/kafka/blob/7311dcbc/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 9fa0cad..199b07a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -282,7 +282,7 @@ public class KafkaAdminClient extends AdminClient {
 
         try {
             metadata = new Metadata(config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
-                    config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
+                    config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), false);
             List<MetricsReporter> reporters = config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG,
                 MetricsReporter.class);
             Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
@@ -1144,7 +1144,7 @@ public class KafkaAdminClient extends AdminClient {
         final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = new HashMap<>(topicNames.size());
         final ArrayList<String> topicNamesList = new ArrayList<>();
         for (String topicName : topicNames) {
-            if (topicFutures.get(topicName) == null) {
+            if (!topicFutures.containsKey(topicName)) {
                 topicFutures.put(topicName, new KafkaFutureImpl<TopicDescription>());
                 topicNamesList.add(topicName);
             }
@@ -1153,9 +1153,14 @@ public class KafkaAdminClient extends AdminClient {
         runnable.call(new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()),
             new ControllerNodeProvider()) {
 
+            private boolean supportsDisablingTopicCreation = true;
+
             @Override
             AbstractRequest.Builder createRequest(int timeoutMs) {
-                return new MetadataRequest.Builder(topicNamesList);
+                if (supportsDisablingTopicCreation)
+                    return new MetadataRequest.Builder(topicNamesList, false);
+                else
+                    return MetadataRequest.Builder.allTopics();
             }
 
             @Override
@@ -1190,6 +1195,15 @@ public class KafkaAdminClient extends AdminClient {
             }
 
             @Override
+            boolean handleUnsupportedVersionException(UnsupportedVersionException exception) {
+                if (supportsDisablingTopicCreation) {
+                    supportsDisablingTopicCreation = false;
+                    return true;
+                }
+                return false;
+            }
+
+            @Override
             void handleFailure(Throwable throwable) {
                 completeAllExceptionally(topicFutures.values(), throwable);
             }
@@ -1208,7 +1222,7 @@ public class KafkaAdminClient extends AdminClient {
 
             @Override
             AbstractRequest.Builder createRequest(int timeoutMs) {
-                return new MetadataRequest.Builder(Collections.<String>emptyList());
+                return new MetadataRequest.Builder(Collections.<String>emptyList(), false);
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/7311dcbc/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 52d9456..055712e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -659,7 +659,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                 this.valueDeserializer = valueDeserializer;
             }
             ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keyDeserializer, valueDeserializer, reporters, interceptorList);
-            this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), false, clusterResourceListeners);
+            this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
+                    true, false, clusterResourceListeners);
             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
             this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0);
             String metricGrpPrefix = "consumer";
@@ -1352,7 +1353,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                 return parts;
 
             Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(
-                    new MetadataRequest.Builder(Collections.singletonList(topic)), requestTimeoutMs);
+                    new MetadataRequest.Builder(Collections.singletonList(topic), true), requestTimeoutMs);
             return topicMetadata.get(topic);
         } finally {
             release();

http://git-wip-us.apache.org/repos/asf/kafka/blob/7311dcbc/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index dc6b911..89a18e3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -265,7 +265,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     ProducerInterceptor.class);
             this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);
             ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
-            this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true, clusterResourceListeners);
+            this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
+                    true, true, clusterResourceListeners);
             this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
             this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
             this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));

http://git-wip-us.apache.org/repos/asf/kafka/blob/7311dcbc/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 91391e9..383332b 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -68,6 +68,16 @@ public class Protocol {
     /* The v3 metadata request is the same as v1 and v2. An additional field for throttle time has been added to the v3 metadata response */
     public static final Schema METADATA_REQUEST_V3 = METADATA_REQUEST_V2;
 
+    /* The v4 metadata request has an additional field for allowing auto topic creation. The response is the same as v3. */
+    public static final Schema METADATA_REQUEST_V4 = new Schema(new Field("topics",
+                                                                          ArrayOf.nullable(STRING),
+                                                                         "An array of topics to fetch metadata for. If the topics array is null fetch metadata for all topics."),
+                                                                new Field("allow_auto_topic_creation",
+                                                                          BOOLEAN,
+                                                                          "If this and the broker config 'auto.create.topics.enable' are true, " +
+                                                                          "topics that don't exist will be created by the broker. " +
+                                                                          "Otherwise, no topics will be created by the broker."));
+
     public static final Schema METADATA_BROKER_V0 = new Schema(new Field("node_id", INT32, "The broker id."),
                                                    new Field("host", STRING, "The hostname of the broker."),
                                                    new Field("port", INT32,
@@ -142,8 +152,10 @@ public class Protocol {
              "The broker id of the controller broker."),
          new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V1)));
 
-    public static final Schema[] METADATA_REQUEST = {METADATA_REQUEST_V0, METADATA_REQUEST_V1, METADATA_REQUEST_V2, METADATA_REQUEST_V3};
-    public static final Schema[] METADATA_RESPONSE = {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2, METADATA_RESPONSE_V3};
+    public static final Schema METADATA_RESPONSE_V4 = METADATA_RESPONSE_V3;
+
+    public static final Schema[] METADATA_REQUEST = {METADATA_REQUEST_V0, METADATA_REQUEST_V1, METADATA_REQUEST_V2, METADATA_REQUEST_V3, METADATA_REQUEST_V4};
+    public static final Schema[] METADATA_RESPONSE = {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2, METADATA_RESPONSE_V3, METADATA_RESPONSE_V4};
 
     /* Produce api */
 
@@ -1181,8 +1193,8 @@ public class Protocol {
             new Field("api_versions", new ArrayOf(API_VERSIONS_V0), "API versions supported by the broker."),
             newThrottleTimeField());
 
-    public static final Schema[] API_VERSIONS_REQUEST = new Schema[]{API_VERSIONS_REQUEST_V0, API_VERSIONS_REQUEST_V1};
-    public static final Schema[] API_VERSIONS_RESPONSE = new Schema[]{API_VERSIONS_RESPONSE_V0, API_VERSIONS_RESPONSE_V1};
+    public static final Schema[] API_VERSIONS_REQUEST = {API_VERSIONS_REQUEST_V0, API_VERSIONS_REQUEST_V1};
+    public static final Schema[] API_VERSIONS_RESPONSE = {API_VERSIONS_RESPONSE_V0, API_VERSIONS_RESPONSE_V1};
 
     /* Admin requests common */
     public static final Schema CONFIG_ENTRY = new Schema(new Field("config_name", STRING, "Configuration name"),
@@ -1636,8 +1648,8 @@ public class Protocol {
             new ArrayOf(DESCRIBE_ACLS_RESOURCE),
             "The resources and their associated ACLs."));
 
-    public static final Schema[] DESCRIBE_ACLS_REQUEST = new Schema[] {DESCRIBE_ACLS_REQUEST_V0};
-    public static final Schema[] DESCRIBE_ACLS_RESPONSE  = new Schema[] {DESCRIBE_ACLS_RESPONSE_V0};
+    public static final Schema[] DESCRIBE_ACLS_REQUEST = {DESCRIBE_ACLS_REQUEST_V0};
+    public static final Schema[] DESCRIBE_ACLS_RESPONSE  = {DESCRIBE_ACLS_RESPONSE_V0};
 
     public static final Schema CREATE_ACLS_REQUEST_V0 = new Schema(
         new Field("creations",
@@ -1658,8 +1670,8 @@ public class Protocol {
                 new Field("error_message", NULLABLE_STRING, "The error message.")
             ))));
 
-    public static final Schema[] CREATE_ACLS_REQUEST = new Schema[] {CREATE_ACLS_REQUEST_V0};
-    public static final Schema[] CREATE_ACLS_RESPONSE = new Schema[] {CREATE_ACLS_RESPONSE_V0};
+    public static final Schema[] CREATE_ACLS_REQUEST = {CREATE_ACLS_REQUEST_V0};
+    public static final Schema[] CREATE_ACLS_RESPONSE = {CREATE_ACLS_RESPONSE_V0};
 
     public static final Schema DELETE_ACLS_REQUEST_V0 = new Schema(
         new Field("filters",
@@ -1691,8 +1703,8 @@ public class Protocol {
                 new Field("error_message", NULLABLE_STRING, "The error message."),
                 new Field("matching_acls", new ArrayOf(MATCHING_ACL), "The matching ACLs")))));
 
-    public static final Schema[] DELETE_ACLS_REQUEST = new Schema[] {DELETE_ACLS_REQUEST_V0};
-    public static final Schema[] DELETE_ACLS_RESPONSE = new Schema[] {DELETE_ACLS_RESPONSE_V0};
+    public static final Schema[] DELETE_ACLS_REQUEST = {DELETE_ACLS_REQUEST_V0};
+    public static final Schema[] DELETE_ACLS_RESPONSE = {DELETE_ACLS_RESPONSE_V0};
 
     /* an array of all requests and responses with all schema versions; a null value in the inner array means that the
      * particular version is not supported */

http://git-wip-us.apache.org/repos/asf/kafka/blob/7311dcbc/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index 3c20139..0493f3d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -30,19 +30,26 @@ import java.util.List;
 
 public class MetadataRequest extends AbstractRequest {
 
+    private static final String TOPICS_KEY_NAME = "topics";
+    private static final String ALLOW_AUTO_TOPIC_CREATION_KEY_NAME = "allow_auto_topic_creation";
+
     public static class Builder extends AbstractRequest.Builder<MetadataRequest> {
         private static final List<String> ALL_TOPICS = null;
 
         // The list of topics, or null if we want to request metadata about all topics.
         private final List<String> topics;
+        private final boolean allowAutoTopicCreation;
 
         public static Builder allTopics() {
-            return new Builder(ALL_TOPICS);
+            // This never causes auto-creation, but we set the boolean to true because that is the default value when
+            // deserializing V2 and older. This way, the value is consistent after serialization and deserialization.
+            return new Builder(ALL_TOPICS, true);
         }
 
-        public Builder(List<String> topics) {
+        public Builder(List<String> topics, boolean allowAutoTopicCreation) {
             super(ApiKeys.METADATA);
             this.topics = topics;
+            this.allowAutoTopicCreation = allowAutoTopicCreation;
         }
 
         public List<String> topics() {
@@ -55,11 +62,12 @@ public class MetadataRequest extends AbstractRequest {
 
         @Override
         public MetadataRequest build(short version) {
-            if (version < 1) {
-                throw new UnsupportedVersionException("MetadataRequest " +
-                        "versions older than 1 are not supported.");
-            }
-            return new MetadataRequest(this.topics, version);
+            if (version < 1)
+                throw new UnsupportedVersionException("MetadataRequest versions older than 1 are not supported.");
+            if (!allowAutoTopicCreation && version < 4)
+                throw new UnsupportedVersionException("MetadataRequest versions older than 4 don't support the " +
+                        "allowAutoTopicCreation field");
+            return new MetadataRequest(this.topics, allowAutoTopicCreation, version);
         }
 
         @Override
@@ -77,18 +85,18 @@ public class MetadataRequest extends AbstractRequest {
         }
     }
 
-    private static final String TOPICS_KEY_NAME = "topics";
-
     private final List<String> topics;
+    private final boolean allowAutoTopicCreation;
 
     /**
      * In v0 null is not allowed and an empty list indicates requesting all topics.
      * Note: modern clients do not support sending v0 requests.
      * In v1 null indicates requesting all topics, and an empty list indicates requesting no topics.
      */
-    public MetadataRequest(List<String> topics, short version) {
+    public MetadataRequest(List<String> topics, boolean allowAutoTopicCreation, short version) {
         super(version);
         this.topics = topics;
+        this.allowAutoTopicCreation = allowAutoTopicCreation;
     }
 
     public MetadataRequest(Struct struct, short version) {
@@ -102,6 +110,10 @@ public class MetadataRequest extends AbstractRequest {
         } else {
             topics = null;
         }
+        if (struct.hasField(ALLOW_AUTO_TOPIC_CREATION_KEY_NAME))
+            allowAutoTopicCreation = struct.getBoolean(ALLOW_AUTO_TOPIC_CREATION_KEY_NAME);
+        else
+            allowAutoTopicCreation = true;
     }
 
     @Override
@@ -122,6 +134,7 @@ public class MetadataRequest extends AbstractRequest {
             case 2:
                 return new MetadataResponse(Collections.<Node>emptyList(), null, MetadataResponse.NO_CONTROLLER_ID, topicMetadatas);
             case 3:
+            case 4:
                 return new MetadataResponse(throttleTimeMs, Collections.<Node>emptyList(), null, MetadataResponse.NO_CONTROLLER_ID, topicMetadatas);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
@@ -137,6 +150,10 @@ public class MetadataRequest extends AbstractRequest {
         return topics;
     }
 
+    public boolean allowAutoTopicCreation() {
+        return allowAutoTopicCreation;
+    }
+
     public static MetadataRequest parse(ByteBuffer buffer, short version) {
         return new MetadataRequest(ApiKeys.METADATA.parseRequest(version, buffer), version);
     }
@@ -148,6 +165,8 @@ public class MetadataRequest extends AbstractRequest {
             struct.set(TOPICS_KEY_NAME, null);
         else
             struct.set(TOPICS_KEY_NAME, topics.toArray());
+        if (struct.hasField(ALLOW_AUTO_TOPIC_CREATION_KEY_NAME))
+            struct.set(ALLOW_AUTO_TOPIC_CREATION_KEY_NAME, allowAutoTopicCreation);
         return struct;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7311dcbc/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 0c87fc7..407eb9f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -45,8 +45,8 @@ public class MetadataTest {
 
     private long refreshBackoffMs = 100;
     private long metadataExpireMs = 1000;
-    private Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs);
-    private AtomicReference<String> backgroundError = new AtomicReference<String>();
+    private Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true);
+    private AtomicReference<String> backgroundError = new AtomicReference<>();
 
     @After
     public void tearDown() {
@@ -96,7 +96,7 @@ public class MetadataTest {
         }
 
         long largerOfBackoffAndExpire = Math.max(refreshBackoffMs, metadataExpireMs);
-        Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs);
+        Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true);
 
         assertEquals(0, metadata.timeToNextUpdate(now));
 
@@ -255,7 +255,7 @@ public class MetadataTest {
         MockClusterResourceListener mockClusterListener = new MockClusterResourceListener();
         ClusterResourceListeners listeners = new ClusterResourceListeners();
         listeners.maybeAdd(mockClusterListener);
-        metadata = new Metadata(refreshBackoffMs, metadataExpireMs, false, listeners);
+        metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true, false, listeners);
 
         String hostName = "www.example.com";
         Cluster cluster = Cluster.bootstrap(Arrays.asList(new InetSocketAddress(hostName, 9002)));
@@ -348,7 +348,7 @@ public class MetadataTest {
 
     @Test
     public void testTopicExpiry() throws Exception {
-        metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true, new ClusterResourceListeners());
+        metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true, true, new ClusterResourceListeners());
 
         // Test that topic is expired if not used within the expiry interval
         long time = 0;
@@ -380,7 +380,7 @@ public class MetadataTest {
 
     @Test
     public void testNonExpiringMetadata() throws Exception {
-        metadata = new Metadata(refreshBackoffMs, metadataExpireMs, false, new ClusterResourceListeners());
+        metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true, false, new ClusterResourceListeners());
 
         // Test that topic is not expired if not used within the expiry interval
         long time = 0;

http://git-wip-us.apache.org/repos/asf/kafka/blob/7311dcbc/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index c87acd7..0de76a1 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -49,7 +49,7 @@ public class NetworkClientTest {
     protected final int requestTimeoutMs = 1000;
     protected final MockTime time = new MockTime();
     protected final MockSelector selector = new MockSelector(time);
-    protected final Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+    protected final Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
     protected final int nodeId = 1;
     protected final Cluster cluster = TestUtils.singletonCluster("test", nodeId);
     protected final Node node = cluster.nodes().get(0);
@@ -86,8 +86,7 @@ public class NetworkClientTest {
 
     @Test(expected = IllegalStateException.class)
     public void testSendToUnreadyNode() {
-        MetadataRequest.Builder builder =
-                new MetadataRequest.Builder(Arrays.asList("test"));
+        MetadataRequest.Builder builder = new MetadataRequest.Builder(Arrays.asList("test"), true);
         long now = time.milliseconds();
         ClientRequest request = client.newClientRequest("5", builder, now, false);
         client.send(request, now);
@@ -251,8 +250,7 @@ public class NetworkClientTest {
         // metadata request when the remote node disconnects with the request in-flight.
         awaitReady(client, node);
 
-        MetadataRequest.Builder builder =
-                new MetadataRequest.Builder(Collections.<String>emptyList());
+        MetadataRequest.Builder builder = new MetadataRequest.Builder(Collections.<String>emptyList(), true);
         long now = time.milliseconds();
         ClientRequest request = client.newClientRequest(node.idString(), builder, now, true);
         client.send(request, now);

http://git-wip-us.apache.org/repos/asf/kafka/blob/7311dcbc/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
index ba7b528..6c1fd17 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
@@ -50,7 +50,7 @@ public class MockKafkaAdminClientEnv implements AutoCloseable {
         this.adminClientConfig = new AdminClientConfig(config);
         this.cluster = cluster;
         this.metadata = new Metadata(adminClientConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
-                adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
+                adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), false);
         this.mockClient = new MockClient(Time.SYSTEM, this.metadata);
         this.client = KafkaAdminClient.createInternal(adminClientConfig, mockClient, metadata);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7311dcbc/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 1249896..f918a34 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -366,7 +366,7 @@ public class KafkaConsumerTest {
         Cluster cluster = TestUtils.singletonCluster(topic, 1);
         Node node = cluster.nodes().get(0);
 
-        Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+        Metadata metadata = createMetadata();
         metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
@@ -407,7 +407,7 @@ public class KafkaConsumerTest {
         Cluster cluster = TestUtils.singletonCluster(topic, 1);
         Node node = cluster.nodes().get(0);
 
-        Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+        Metadata metadata = createMetadata();
         metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
@@ -448,7 +448,7 @@ public class KafkaConsumerTest {
         Cluster cluster = TestUtils.singletonCluster(topic, 1);
         Node node = cluster.nodes().get(0);
 
-        Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+        Metadata metadata = createMetadata();
         metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
@@ -484,7 +484,7 @@ public class KafkaConsumerTest {
         Cluster cluster = TestUtils.singletonCluster(topic, 1);
         Node node = cluster.nodes().get(0);
 
-        Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+        Metadata metadata = createMetadata();
         metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
@@ -534,7 +534,7 @@ public class KafkaConsumerTest {
         Cluster cluster = TestUtils.singletonCluster(topic, 1);
         Node node = cluster.nodes().get(0);
 
-        Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+        Metadata metadata = createMetadata();
         metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
@@ -581,7 +581,7 @@ public class KafkaConsumerTest {
         topicMetadata.put(unmatchedTopic, 1);
 
         Cluster cluster = TestUtils.clusterWith(1, topicMetadata);
-        Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+        Metadata metadata = createMetadata();
         Node node = cluster.nodes().get(0);
 
         MockClient client = new MockClient(time, metadata);
@@ -621,7 +621,7 @@ public class KafkaConsumerTest {
         topicMetadata.put(otherTopic, 1);
 
         Cluster cluster = TestUtils.clusterWith(1, topicMetadata);
-        Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+        Metadata metadata = createMetadata();
         Node node = cluster.nodes().get(0);
 
         MockClient client = new MockClient(time, metadata);
@@ -664,7 +664,7 @@ public class KafkaConsumerTest {
         Cluster cluster = TestUtils.singletonCluster(topic, 1);
         Node node = cluster.nodes().get(0);
 
-        Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+        Metadata metadata = createMetadata();
         metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
@@ -720,7 +720,7 @@ public class KafkaConsumerTest {
         Cluster cluster = TestUtils.singletonCluster(topic, 1);
         final Node node = cluster.nodes().get(0);
 
-        Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+        Metadata metadata = createMetadata();
         metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         final MockClient client = new MockClient(time, metadata);
@@ -760,7 +760,7 @@ public class KafkaConsumerTest {
         Cluster cluster = TestUtils.singletonCluster(singletonMap(topic, 1));
         Node node = cluster.nodes().get(0);
 
-        Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+        Metadata metadata = createMetadata();
         metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
@@ -808,7 +808,7 @@ public class KafkaConsumerTest {
         Cluster cluster = TestUtils.singletonCluster(tpCounts);
         Node node = cluster.nodes().get(0);
 
-        Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+        Metadata metadata = createMetadata();
         metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
@@ -928,7 +928,7 @@ public class KafkaConsumerTest {
         Cluster cluster = TestUtils.singletonCluster(tpCounts);
         Node node = cluster.nodes().get(0);
 
-        Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+        Metadata metadata = createMetadata();
         metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
@@ -996,7 +996,7 @@ public class KafkaConsumerTest {
         Cluster cluster = TestUtils.singletonCluster(tpCounts);
         Node node = cluster.nodes().get(0);
 
-        Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+        Metadata metadata = createMetadata();
         metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
@@ -1061,7 +1061,7 @@ public class KafkaConsumerTest {
         Cluster cluster = TestUtils.singletonCluster(tpCounts);
         Node node = cluster.nodes().get(0);
 
-        Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+        Metadata metadata = createMetadata();
         metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
@@ -1122,7 +1122,7 @@ public class KafkaConsumerTest {
         Cluster cluster = TestUtils.singletonCluster(topic, 2);
         Node node = cluster.nodes().get(0);
 
-        Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+        Metadata metadata = createMetadata();
         metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
@@ -1268,7 +1268,7 @@ public class KafkaConsumerTest {
         Cluster cluster = TestUtils.singletonCluster(topic, 1);
         Node node = cluster.nodes().get(0);
 
-        Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+        Metadata metadata = createMetadata();
         metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
@@ -1372,6 +1372,10 @@ public class KafkaConsumerTest {
         };
     }
 
+    private Metadata createMetadata() {
+        return new Metadata(0, Long.MAX_VALUE, true);
+    }
+
     private Node prepareRebalance(MockClient client, Node node, final Set<String> subscribedTopics, PartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) {
         if (coordinator == null) {
             // lookup coordinator

http://git-wip-us.apache.org/repos/asf/kafka/blob/7311dcbc/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 4779f43..8a93439 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -73,7 +73,7 @@ public class AbstractCoordinatorTest {
         this.mockTime = new MockTime();
         this.mockClient = new MockClient(mockTime);
 
-        Metadata metadata = new Metadata();
+        Metadata metadata = new Metadata(100L, 60 * 60 * 1000L, true);
         this.consumerClient = new ConsumerNetworkClient(mockClient, metadata, mockTime,
                 RETRY_BACKOFF_MS, REQUEST_TIMEOUT_MS);
         Metrics metrics = new Metrics();

http://git-wip-us.apache.org/repos/asf/kafka/blob/7311dcbc/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 770d4f7..7d22351 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -119,7 +119,7 @@ public class ConsumerCoordinatorTest {
     public void setup() {
         this.time = new MockTime();
         this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
-        this.metadata = new Metadata(0, Long.MAX_VALUE);
+        this.metadata = new Metadata(0, Long.MAX_VALUE, true);
         this.metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
         this.client = new MockClient(time, metadata);
         this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);

http://git-wip-us.apache.org/repos/asf/kafka/blob/7311dcbc/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index 2a6b228..b46b657 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -46,7 +46,7 @@ public class ConsumerNetworkClientTest {
     private MockClient client = new MockClient(time);
     private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
     private Node node = cluster.nodes().get(0);
-    private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+    private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
     private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/7311dcbc/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 97a6259..24eeb6f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -118,7 +118,7 @@ public class FetcherTest {
     private int fetchSize = 1000;
     private long retryBackoffMs = 100;
     private MockTime time = new MockTime(1);
-    private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+    private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
     private MockClient client = new MockClient(time, metadata);
     private Cluster cluster = TestUtils.singletonCluster(topicName, 2);
     private Node node = cluster.nodes().get(0);
@@ -1068,16 +1068,15 @@ public class FetcherTest {
     public void testGetTopicMetadataInvalidTopic() {
         client.prepareResponse(newMetadataResponse(topicName, Errors.INVALID_TOPIC_EXCEPTION));
         fetcher.getTopicMetadata(
-            new MetadataRequest.Builder(Collections.singletonList(topicName)), 5000L);
+                new MetadataRequest.Builder(Collections.singletonList(topicName), true), 5000L);
     }
 
     @Test
     public void testGetTopicMetadataUnknownTopic() {
         client.prepareResponse(newMetadataResponse(topicName, Errors.UNKNOWN_TOPIC_OR_PARTITION));
 
-        Map<String, List<PartitionInfo>> topicMetadata =
-                fetcher.getTopicMetadata(
-                        new MetadataRequest.Builder(Collections.singletonList(topicName)), 5000L);
+        Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(
+                new MetadataRequest.Builder(Collections.singletonList(topicName), true), 5000L);
         assertNull(topicMetadata.get(topicName));
     }
 
@@ -1086,9 +1085,8 @@ public class FetcherTest {
         client.prepareResponse(newMetadataResponse(topicName, Errors.LEADER_NOT_AVAILABLE));
         client.prepareResponse(newMetadataResponse(topicName, Errors.NONE));
 
-        Map<String, List<PartitionInfo>> topicMetadata =
-                fetcher.getTopicMetadata(
-                        new MetadataRequest.Builder(Collections.singletonList(topicName)), 5000L);
+        Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(
+                new MetadataRequest.Builder(Collections.singletonList(topicName), true), 5000L);
         assertTrue(topicMetadata.containsKey(topicName));
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7311dcbc/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 3a6426a..e2fe614 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -325,7 +325,8 @@ public class KafkaProducerTest {
         KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
         long refreshBackoffMs = 500L;
         long metadataExpireMs = 60000L;
-        final Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true, new ClusterResourceListeners());
+        final Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true,
+                true, new ClusterResourceListeners());
         final Time time = new MockTime();
         MemberModifier.field(KafkaProducer.class, "metadata").set(producer, metadata);
         MemberModifier.field(KafkaProducer.class, "time").set(producer, time);

http://git-wip-us.apache.org/repos/asf/kafka/blob/7311dcbc/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index faa6ea5..c1c5a2e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -87,7 +87,7 @@ public class SenderTest {
     private MockTime time = new MockTime();
     private MockClient client = new MockClient(time);
     private int batchSize = 16 * 1024;
-    private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true, new ClusterResourceListeners());
+    private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true, true, new ClusterResourceListeners());
     private ApiVersions apiVersions = new ApiVersions();
     private Cluster cluster = TestUtils.singletonCluster("test", 2);
     private Metrics metrics = null;

http://git-wip-us.apache.org/repos/asf/kafka/blob/7311dcbc/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index ed7ec84..3e3f785 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -92,7 +92,7 @@ public class TransactionManagerTest {
     private MockTime time = new MockTime();
     private MockClient client = new MockClient(time);
 
-    private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true, new ClusterResourceListeners());
+    private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true, true, new ClusterResourceListeners());
     private ApiVersions apiVersions = new ApiVersions();
     private Cluster cluster = TestUtils.singletonCluster("test", 2);
     private RecordAccumulator accumulator = null;

http://git-wip-us.apache.org/repos/asf/kafka/blob/7311dcbc/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 56f0215..a05b680 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -117,6 +117,10 @@ public class RequestResponseTest {
         checkErrorResponse(createMetadataRequest(1, singletonList("topic1")), new UnknownServerException());
         checkResponse(createMetadataResponse(), 2);
         checkErrorResponse(createMetadataRequest(2, singletonList("topic1")), new UnknownServerException());
+        checkResponse(createMetadataResponse(), 3);
+        checkErrorResponse(createMetadataRequest(3, singletonList("topic1")), new UnknownServerException());
+        checkResponse(createMetadataResponse(), 4);
+        checkErrorResponse(createMetadataRequest(4, singletonList("topic1")), new UnknownServerException());
         checkRequest(createOffsetCommitRequest(2));
         checkErrorResponse(createOffsetCommitRequest(2), new UnknownServerException());
         checkResponse(createOffsetCommitResponse(), 0);
@@ -703,7 +707,7 @@ public class RequestResponseTest {
     }
 
     private MetadataRequest createMetadataRequest(int version, List<String> topics) {
-        return new MetadataRequest.Builder(topics).build((short) version);
+        return new MetadataRequest.Builder(topics, true).build((short) version);
     }
 
     private MetadataResponse createMetadataResponse() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7311dcbc/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 28402f0..631ae08 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -548,10 +548,10 @@ public class SaslAuthenticatorTest {
         // Send metadata request before Kafka SASL handshake request
         String node1 = "invalid1";
         createClientConnection(SecurityProtocol.PLAINTEXT, node1);
-        MetadataRequest metadataRequest1 =
-                new MetadataRequest.Builder(Collections.singletonList("sometopic")).build();
-        RequestHeader metadataRequestHeader1 = new RequestHeader(ApiKeys.METADATA.id,
-                metadataRequest1.version(), "someclient", 1);
+        MetadataRequest metadataRequest1 = new MetadataRequest.Builder(Collections.singletonList("sometopic"),
+                true).build();
+        RequestHeader metadataRequestHeader1 = new RequestHeader(ApiKeys.METADATA.id, metadataRequest1.version(),
+                "someclient", 1);
         selector.send(metadataRequest1.toSend(node1, metadataRequestHeader1));
         NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY);
         selector.close();
@@ -563,8 +563,7 @@ public class SaslAuthenticatorTest {
         String node2 = "invalid2";
         createClientConnection(SecurityProtocol.PLAINTEXT, node2);
         sendHandshakeRequestReceiveResponse(node2);
-        MetadataRequest metadataRequest2 =
-                new MetadataRequest.Builder(Collections.singletonList("sometopic")).build();
+        MetadataRequest metadataRequest2 = new MetadataRequest.Builder(Collections.singletonList("sometopic"), true).build();
         RequestHeader metadataRequestHeader2 = new RequestHeader(ApiKeys.METADATA.id,
                 metadataRequest2.version(), "someclient", 2);
         selector.send(metadataRequest2.toSend(node2, metadataRequestHeader2));

http://git-wip-us.apache.org/repos/asf/kafka/blob/7311dcbc/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index 24d321e..62e2fc1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -87,7 +87,7 @@ public class WorkerGroupMember {
             reporters.add(new JmxReporter(JMX_PREFIX));
             this.metrics = new Metrics(metricConfig, reporters, time);
             this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
-            this.metadata = new Metadata(retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG));
+            this.metadata = new Metadata(retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG), true);
             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
             this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0);
             String metricGrpPrefix = "connect";

http://git-wip-us.apache.org/repos/asf/kafka/blob/7311dcbc/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index ab042de..edef7dc 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -90,7 +90,7 @@ public class WorkerCoordinatorTest {
     public void setup() {
         this.time = new MockTime();
         this.client = new MockClient(time);
-        this.metadata = new Metadata(0, Long.MAX_VALUE);
+        this.metadata = new Metadata(0, Long.MAX_VALUE, true);
         this.metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
         this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);
         this.metrics = new Metrics(time);

http://git-wip-us.apache.org/repos/asf/kafka/blob/7311dcbc/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 50198a7..4410e94 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -213,7 +213,7 @@ class AdminClient(val time: Time,
    */
 
   def deleteRecordsBefore(offsets: Map[TopicPartition, Long]): Future[Map[TopicPartition, DeleteRecordsResult]] = {
-    val metadataRequest = new MetadataRequest.Builder(offsets.keys.map(_.topic()).toSet.toList.asJava)
+    val metadataRequest = new MetadataRequest.Builder(offsets.keys.map(_.topic).toSet.toList.asJava, true)
     val response = sendAnyNode(ApiKeys.METADATA, metadataRequest).asInstanceOf[MetadataResponse]
     val errors = response.errors
     if (!errors.isEmpty)
@@ -425,7 +425,7 @@ object AdminClient {
   def create(config: AdminConfig): AdminClient = {
     val time = Time.SYSTEM
     val metrics = new Metrics(time)
-    val metadata = new Metadata
+    val metadata = new Metadata(100L, 60 * 60 * 1000L, true)
     val channelBuilder = ClientUtils.createChannelBuilder(config)
     val requestTimeoutMs = config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG)
     val retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG)

http://git-wip-us.apache.org/repos/asf/kafka/blob/7311dcbc/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index b780823..eb0bf3b 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -886,7 +886,8 @@ class KafkaApis(val requestChannel: RequestChannel,
     topicMetadata.headOption.getOrElse(createInternalTopic(topic))
   }
 
-  private def getTopicMetadata(topics: Set[String], listenerName: ListenerName, errorUnavailableEndpoints: Boolean): Seq[MetadataResponse.TopicMetadata] = {
+  private def getTopicMetadata(allowAutoTopicCreation: Boolean, topics: Set[String], listenerName: ListenerName,
+                               errorUnavailableEndpoints: Boolean): Seq[MetadataResponse.TopicMetadata] = {
     val topicResponses = metadataCache.getTopicMetadata(topics, listenerName, errorUnavailableEndpoints)
     if (topics.isEmpty || topicResponses.size == topics.size) {
       topicResponses
@@ -899,7 +900,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             new MetadataResponse.TopicMetadata(Errors.INVALID_REPLICATION_FACTOR, topic, true, java.util.Collections.emptyList())
           else
             topicMetadata
-        } else if (config.autoCreateTopicsEnable) {
+        } else if (allowAutoTopicCreation && config.autoCreateTopicsEnable) {
           createTopic(topic, config.numPartitions, config.defaultReplicationFactor)
         } else {
           new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, java.util.Collections.emptyList())
@@ -937,7 +938,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     if (authorizedTopics.nonEmpty) {
       val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics)
-      if (config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) {
+      if (metadataRequest.allowAutoTopicCreation && config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) {
         if (!authorize(request.session, Create, Resource.ClusterResource)) {
           authorizedTopics --= nonExistingTopics
           unauthorizedForCreateTopics ++= nonExistingTopics
@@ -965,7 +966,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       if (authorizedTopics.isEmpty)
         Seq.empty[MetadataResponse.TopicMetadata]
       else
-        getTopicMetadata(authorizedTopics, request.listenerName, errorUnavailableEndpoints)
+        getTopicMetadata(metadataRequest.allowAutoTopicCreation, authorizedTopics, request.listenerName,
+          errorUnavailableEndpoints)
 
     val completeTopicMetadata = topicMetadata ++ unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7311dcbc/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 017c57f..dce5da2 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -222,8 +222,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     super.tearDown()
   }
 
-  private def createMetadataRequest = {
-    new requests.MetadataRequest.Builder(List(topic).asJava).build()
+  private def createMetadataRequest(allowAutoTopicCreation: Boolean) = {
+    new requests.MetadataRequest.Builder(List(topic).asJava, allowAutoTopicCreation).build()
   }
 
   private def createProduceRequest = {
@@ -328,7 +328,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   @Test
   def testAuthorizationWithTopicExisting() {
     val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
-      ApiKeys.METADATA -> createMetadataRequest,
+      ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = true),
       ApiKeys.PRODUCE -> createProduceRequest,
       ApiKeys.FETCH -> createFetchRequest,
       ApiKeys.LIST_OFFSETS -> createListOffsetsRequest,
@@ -381,6 +381,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     TestUtils.verifyTopicDeletion(zkUtils, deleteTopic, 1, servers)
 
     val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
+      ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = true),
+      ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = false),
       ApiKeys.PRODUCE -> createProduceRequest,
       ApiKeys.FETCH -> createFetchRequest,
       ApiKeys.LIST_OFFSETS -> createListOffsetsRequest,
@@ -397,7 +399,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       val resourceToAcls = requestKeysToAcls(key)
       resourceToAcls.get(topicResource).foreach { acls =>
         val describeAcls = topicDescribeAcl(topicResource)
-        val isAuthorized =  describeAcls == acls
+        val isAuthorized = describeAcls == acls
         addAndVerifyAcls(describeAcls, topicResource)
         sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized, isAuthorizedTopicDescribe = true, topicExists = false)
         removeAllAcls()

http://git-wip-us.apache.org/repos/asf/kafka/blob/7311dcbc/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
index 065759f..0e21da7 100644
--- a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
@@ -30,8 +30,7 @@ import org.apache.kafka.clients.admin.NewTopic
 import org.apache.kafka.common.KafkaFuture
 import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
 import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.errors.{InvalidRequestException, SecurityDisabledException, TopicExistsException}
-import org.apache.kafka.common.errors.{InvalidRequestException, SecurityDisabledException, TimeoutException, TopicExistsException}
+import org.apache.kafka.common.errors.{InvalidRequestException, SecurityDisabledException, TimeoutException, TopicExistsException, UnknownTopicOrPartitionException}
 import org.apache.kafka.common.protocol.ApiKeys
 import org.junit.{After, Before, Rule, Test}
 import org.apache.kafka.common.requests.MetadataResponse
@@ -80,7 +79,7 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin
 
   def waitForTopics(client: AdminClient, expectedPresent: Seq[String], expectedMissing: Seq[String]): Unit = {
     TestUtils.waitUntilTrue(() => {
-        val topics = client.listTopics().names().get()
+        val topics = client.listTopics.names.get()
         expectedPresent.forall(topicName => topics.contains(topicName)) &&
           expectedMissing.forall(topicName => !topics.contains(topicName))
       }, "timed out waiting for topics")
@@ -123,21 +122,39 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin
     val topics = Seq("mytopic", "mytopic2")
     val newTopics = topics.map(new NewTopic(_, 1, 1))
     client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all.get()
-    waitForTopics(client, List(), List("mytopic", "mytopic2"))
+    waitForTopics(client, List(), topics)
 
     client.createTopics(newTopics.asJava).all.get()
-    waitForTopics(client, List("mytopic", "mytopic2"), List())
+    waitForTopics(client, topics, List())
 
     val results = client.createTopics(newTopics.asJava).results()
     assertTrue(results.containsKey("mytopic"))
     assertFutureExceptionTypeEquals(results.get("mytopic"), classOf[TopicExistsException])
     assertTrue(results.containsKey("mytopic2"))
     assertFutureExceptionTypeEquals(results.get("mytopic2"), classOf[TopicExistsException])
-    val topicsFromDescribe = client.describeTopics(Seq("mytopic", "mytopic2").asJava).all.get().asScala.keys
+    val topicsFromDescribe = client.describeTopics(topics.asJava).all.get().asScala.keys
     assertEquals(topics.toSet, topicsFromDescribe)
 
     client.deleteTopics(topics.asJava).all.get()
-    waitForTopics(client, List(), List("mytopic", "mytopic2"))
+    waitForTopics(client, List(), topics)
+  }
+
+  /**
+    * describe should not auto create topics
+    */
+  @Test
+  def testDescribeNonExistingTopic(): Unit = {
+    client = AdminClient.create(createConfig())
+
+    val existingTopic = "existing-topic"
+    client.createTopics(Seq(existingTopic).map(new NewTopic(_, 1, 1)).asJava).all.get()
+    waitForTopics(client, Seq(existingTopic), List())
+
+    val nonExistingTopic = "non-existing"
+    val results = client.describeTopics(Seq(nonExistingTopic, existingTopic).asJava).results
+    assertEquals(existingTopic, results.get(existingTopic).get.name)
+    intercept[ExecutionException](results.get(nonExistingTopic).get).getCause.isInstanceOf[UnknownTopicOrPartitionException]
+    assertEquals(None, zkUtils.getTopicPartitionCount(nonExistingTopic))
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/7311dcbc/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
index 8e6b11d..0ef3405 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
@@ -43,7 +43,7 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest {
 
       def verifyMetadata(socketServer: SocketServer) = {
         val metadata = sendMetadataRequest(
-          new MetadataRequest.Builder(List(topic).asJava).build()).topicMetadata.asScala
+          new MetadataRequest.Builder(List(topic).asJava, true).build()).topicMetadata.asScala
         val metadataForTopic = metadata.filter(_.topic == topic).head
 
         val partitions = if (!details.replicasAssignments.isEmpty)
@@ -127,7 +127,7 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest {
   protected def validateTopicExists(topic: String): Unit = {
     TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
     val metadata = sendMetadataRequest(
-      new MetadataRequest.Builder(List(topic).asJava).build()).topicMetadata.asScala
+      new MetadataRequest.Builder(List(topic).asJava, true).build()).topicMetadata.asScala
     assertTrue("The topic should be created", metadata.exists(p => p.topic.equals(topic) && p.error == Errors.NONE))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7311dcbc/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
index 9cd53d8..881bf8e 100644
--- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
@@ -106,7 +106,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
 
   private def validateTopicIsDeleted(topic: String): Unit = {
     val metadata = sendMetadataRequest(new MetadataRequest.
-        Builder(List(topic).asJava).build).topicMetadata.asScala
+        Builder(List(topic).asJava, true).build).topicMetadata.asScala
     TestUtils.waitUntilTrue (() => !metadata.exists(p => p.topic.equals(topic) && p.error == Errors.NONE),
       s"The topic $topic should not exist")
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7311dcbc/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index fdc9a95..177a9ee 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -108,19 +108,49 @@ class MetadataRequestTest extends BaseRequestTest {
 
     // v0, Doesn't support a "no topics" request
     // v1, Empty list represents "no topics"
-    val metadataResponse = sendMetadataRequest(new MetadataRequest(List[String]().asJava, 1.toShort))
+    val metadataResponse = sendMetadataRequest(new MetadataRequest(List[String]().asJava, true, 1.toShort))
     assertTrue("Response should have no errors", metadataResponse.errors.isEmpty)
     assertTrue("Response should have no topics", metadataResponse.topicMetadata.isEmpty)
   }
 
   @Test
+  def testAutoTopicCreation(): Unit = {
+    def checkAutoCreatedTopic(existingTopic: String, autoCreatedTopic: String, response: MetadataResponse): Unit = {
+      assertNull(response.errors.get(existingTopic))
+      assertEquals(Errors.LEADER_NOT_AVAILABLE, response.errors.get(autoCreatedTopic))
+      assertEquals(Some(servers.head.config.numPartitions), zkUtils.getTopicPartitionCount(autoCreatedTopic))
+      for (i <- 0 until servers.head.config.numPartitions)
+        TestUtils.waitUntilMetadataIsPropagated(servers, autoCreatedTopic, i)
+    }
+
+    val topic1 = "t1"
+    val topic2 = "t2"
+    val topic3 = "t3"
+    val topic4 = "t4"
+    TestUtils.createTopic(zkUtils, topic1, 1, 1, servers)
+
+    val response1 = sendMetadataRequest(new MetadataRequest(Seq(topic1, topic2).asJava, true, ApiKeys.METADATA.latestVersion))
+    checkAutoCreatedTopic(topic1, topic2, response1)
+
+    // V3 doesn't support a configurable allowAutoTopicCreation, so the fact that we set it to `false` has no effect
+    val response2 = sendMetadataRequest(new MetadataRequest(Seq(topic2, topic3).asJava, false, 3))
+    checkAutoCreatedTopic(topic2, topic3, response2)
+
+    // V4 and higher support a configurable allowAutoTopicCreation
+    val response3 = sendMetadataRequest(new MetadataRequest(Seq(topic3, topic4).asJava, false, 4))
+    assertNull(response3.errors.get(topic3))
+    assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response3.errors.get(topic4))
+    assertEquals(None, zkUtils.getTopicPartitionCount(topic4))
+  }
+
+  @Test
   def testAllTopicsRequest() {
     // create some topics
     TestUtils.createTopic(zkUtils, "t1", 3, 2, servers)
     TestUtils.createTopic(zkUtils, "t2", 3, 2, servers)
 
     // v0, Empty list represents all topics
-    val metadataResponseV0 = sendMetadataRequest(new MetadataRequest(List[String]().asJava, 0.toShort))
+    val metadataResponseV0 = sendMetadataRequest(new MetadataRequest(List[String]().asJava, true, 0.toShort))
     assertTrue("V0 Response should have no errors", metadataResponseV0.errors.isEmpty)
     assertEquals("V0 Response should have 2 (all) topics", 2, metadataResponseV0.topicMetadata.size())
 
@@ -139,7 +169,7 @@ class MetadataRequestTest extends BaseRequestTest {
     TestUtils.createTopic(zkUtils, replicaDownTopic, 1, replicaCount, servers)
 
     // Kill a replica node that is not the leader
-    val metadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, 1.toShort))
+    val metadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, true, 1.toShort))
     val partitionMetadata = metadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
     val downNode = servers.find { server =>
       val serverId = server.apis.brokerId
@@ -150,14 +180,14 @@ class MetadataRequestTest extends BaseRequestTest {
     downNode.shutdown()
 
     TestUtils.waitUntilTrue(() => {
-      val response = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, 1.toShort))
+      val response = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, true, 1.toShort))
       val metadata = response.topicMetadata.asScala.head.partitionMetadata.asScala.head
       val replica = metadata.replicas.asScala.find(_.id == downNode.apis.brokerId).get
       replica.host == "" & replica.port == -1
     }, "Replica was not found down", 5000)
 
     // Validate version 0 still filters unavailable replicas and contains error
-    val v0MetadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, 0.toShort))
+    val v0MetadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, true, 0.toShort))
     val v0BrokerIds = v0MetadataResponse.brokers().asScala.map(_.id).toSeq
     assertTrue("Response should have no errors", v0MetadataResponse.errors.isEmpty)
     assertFalse(s"The downed broker should not be in the brokers list", v0BrokerIds.contains(downNode))
@@ -167,7 +197,7 @@ class MetadataRequestTest extends BaseRequestTest {
     assertTrue(s"Response should have ${replicaCount - 1} replicas", v0PartitionMetadata.replicas.size == replicaCount - 1)
 
     // Validate version 1 returns unavailable replicas with no error
-    val v1MetadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, 1.toShort))
+    val v1MetadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, true, 1.toShort))
     val v1BrokerIds = v1MetadataResponse.brokers().asScala.map(_.id).toSeq
     assertTrue("Response should have no errors", v1MetadataResponse.errors.isEmpty)
     assertFalse(s"The downed broker should not be in the brokers list", v1BrokerIds.contains(downNode))

http://git-wip-us.apache.org/repos/asf/kafka/blob/7311dcbc/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index b261cb2..3b0e93c 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -173,7 +173,7 @@ class RequestQuotaTest extends BaseRequestTest {
           FetchRequest.Builder.forConsumer(0, 0, partitionMap)
 
         case ApiKeys.METADATA =>
-          new MetadataRequest.Builder(List(topic).asJava)
+          new MetadataRequest.Builder(List(topic).asJava, true)
 
         case ApiKeys.LIST_OFFSETS =>
           ListOffsetRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)

http://git-wip-us.apache.org/repos/asf/kafka/blob/7311dcbc/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
index 44f7900..b1c3f2b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
@@ -101,7 +101,8 @@ public class StreamsKafkaClient {
 
         final Metadata metadata = new Metadata(streamsConfig.getLong(
             StreamsConfig.RETRY_BACKOFF_MS_CONFIG),
-            streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG)
+            streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG),
+            false
         );
         final List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(streamsConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
         metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
@@ -241,7 +242,8 @@ public class StreamsKafkaClient {
     private String getAnyReadyBrokerId() {
         final Metadata metadata = new Metadata(
             streamsConfig.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG),
-            streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG));
+            streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG),
+            false);
         final List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(streamsConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
         metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), Time.SYSTEM.milliseconds());
 
@@ -289,7 +291,7 @@ public class StreamsKafkaClient {
 
         final ClientRequest clientRequest = kafkaClient.newClientRequest(
             getAnyReadyBrokerId(),
-            new MetadataRequest.Builder(null),
+            MetadataRequest.Builder.allTopics(),
             Time.SYSTEM.milliseconds(),
             true);
         final ClientResponse clientResponse = sendRequest(clientRequest);


Mime
View raw message